use crate::core::{ClContextPtr, ClNullEventPtr};
use crate::error::{Error as OclError, Result as OclResult};
use crate::r#async::qutex::{QrwLock, QrwRequest, RequestKind};
use crate::{Event, EventList};
use futures::sync::oneshot::{self, Receiver};
use futures::{Async, Future, Poll};
use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
const PRINT_DEBUG: bool = false;
pub type FutureReadGuard<V> = FutureGuard<V, ReadGuard<V>>;
pub type FutureWriteGuard<V> = FutureGuard<V, WriteGuard<V>>;
fn print_debug(id: usize, msg: &str) {
if PRINT_DEBUG {
println!(
"###### [{}] {} (thread: {})",
id,
msg,
::std::thread::current().name().unwrap_or("<unnamed>")
);
}
}
unsafe fn extract_order_lock<V, G: OrderGuard<V>>(guard: G) -> OrderLock<V> {
let order_lock = ::std::ptr::read(guard.order_lock());
guard.forget();
order_lock
}
pub trait OrderGuard<V>
where
Self: ::std::marker::Sized,
{
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> Self;
fn order_lock(&self) -> &OrderLock<V>;
unsafe fn forget(self) {
::std::mem::forget(self);
}
}
#[derive(Debug)]
pub struct ReadGuard<V> {
order_lock: OrderLock<V>,
release_event: Option<Event>,
}
impl<V> ReadGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> ReadGuard<V> {
print_debug(order_lock.id(), "ReadGuard::new: read lock acquired");
ReadGuard {
order_lock,
release_event,
}
}
pub fn release_event(guard: &ReadGuard<V>) -> Option<&Event> {
guard.release_event.as_ref()
}
pub fn release(mut guard: ReadGuard<V>) -> OrderLock<V> {
print_debug(
guard.order_lock.id(),
"WriteGuard::release: releasing read lock",
);
unsafe {
Self::release_components(&mut guard);
extract_order_lock(guard)
}
}
fn complete_release_event(guard: &mut ReadGuard<V>) {
if let Some(ref e) = guard.release_event.take() {
if !e.is_complete().expect("ReadGuard::complete_release_event") {
print_debug(
guard.order_lock.id(),
"ReadGuard::complete_release_event: \
setting release event complete",
);
e.set_complete().expect("ReadGuard::complete_release_event");
}
}
}
unsafe fn release_components(guard: &mut ReadGuard<V>) {
guard.order_lock.lock.release_read_lock();
Self::complete_release_event(guard);
}
}
impl<V> Deref for ReadGuard<V> {
type Target = V;
fn deref(&self) -> &V {
unsafe { &*self.order_lock.lock.as_ptr() }
}
}
impl<V> Drop for ReadGuard<V> {
fn drop(&mut self) {
print_debug(self.order_lock.id(), "dropping and releasing ReadGuard");
unsafe { Self::release_components(self) }
}
}
impl<V> OrderGuard<V> for ReadGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> ReadGuard<V> {
ReadGuard::new(order_lock, release_event)
}
fn order_lock(&self) -> &OrderLock<V> {
&self.order_lock
}
}
#[derive(Debug)]
pub struct WriteGuard<V> {
order_lock: OrderLock<V>,
release_event: Option<Event>,
}
impl<V> WriteGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> WriteGuard<V> {
print_debug(order_lock.id(), "WriteGuard::new: Write lock acquired");
WriteGuard {
order_lock,
release_event,
}
}
pub fn release_event(guard: &WriteGuard<V>) -> Option<&Event> {
guard.release_event.as_ref()
}
pub fn release(mut guard: WriteGuard<V>) -> OrderLock<V> {
print_debug(
guard.order_lock.id(),
"WriteGuard::release: Releasing write lock",
);
unsafe {
Self::release_components(&mut guard);
extract_order_lock(guard)
}
}
fn complete_release_event(guard: &mut WriteGuard<V>) {
if let Some(ref e) = guard.release_event.take() {
if !e.is_complete().expect("WriteGuard::complete_release_event") {
print_debug(
guard.order_lock.id(),
"WriteGuard::complete_release_event: \
Setting release event complete",
);
e.set_complete()
.expect("WriteGuard::complete_release_event");
}
}
}
unsafe fn release_components(guard: &mut WriteGuard<V>) {
guard.order_lock.lock.release_write_lock();
Self::complete_release_event(guard);
}
}
impl<V> Deref for WriteGuard<V> {
type Target = V;
fn deref(&self) -> &V {
unsafe { &*self.order_lock.lock.as_ptr() }
}
}
impl<V> DerefMut for WriteGuard<V> {
fn deref_mut(&mut self) -> &mut V {
unsafe { &mut *self.order_lock.lock.as_mut_ptr() }
}
}
impl<V> Drop for WriteGuard<V> {
fn drop(&mut self) {
print_debug(
self.order_lock.id(),
"WriteGuard::drop: Dropping and releasing WriteGuard",
);
unsafe { Self::release_components(self) }
}
}
impl<V> OrderGuard<V> for WriteGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> WriteGuard<V> {
WriteGuard::new(order_lock, release_event)
}
fn order_lock(&self) -> &OrderLock<V> {
&self.order_lock
}
}
#[derive(Debug, PartialEq)]
enum Stage {
WaitEvents,
LockQueue,
Command,
Upgrade,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureGuard<V, G>
where
G: OrderGuard<V>,
{
order_lock: Option<OrderLock<V>>,
lock_rx: Option<Receiver<()>>,
wait_events: Option<EventList>,
lock_event: Option<Event>,
command_event: Option<Event>,
upgrade_after_command: bool,
upgrade_rx: Option<Receiver<()>>,
release_event: Option<Event>,
stage: Stage,
_guard: PhantomData<G>,
}
impl<V, G> FutureGuard<V, G>
where
G: OrderGuard<V>,
{
fn new(order_lock: OrderLock<V>, lock_rx: Receiver<()>) -> FutureGuard<V, G> {
FutureGuard {
order_lock: Some(order_lock),
lock_rx: Some(lock_rx),
wait_events: None,
lock_event: None,
command_event: None,
upgrade_after_command: false,
upgrade_rx: None,
release_event: None,
stage: Stage::WaitEvents,
_guard: PhantomData,
}
}
pub fn set_lock_wait_events<L: Into<EventList>>(&mut self, wait_events: L) {
assert!(
self.wait_events.is_none(),
"Wait list has already been set."
);
self.wait_events = Some(wait_events.into());
}
pub fn ewait_lock<L: Into<EventList>>(mut self, wait_events: L) -> FutureGuard<V, G> {
self.set_lock_wait_events(wait_events);
self
}
pub fn create_lock_event<C: ClContextPtr>(&mut self, context: C) -> OclResult<&Event> {
assert!(
self.lock_event.is_none(),
"Lock event has already been created."
);
self.lock_event = Some(Event::user(context)?);
Ok(self.lock_event.as_mut().unwrap())
}
pub fn enew_lock<C, En>(mut self, context: C, mut enew: En) -> FutureGuard<V, G>
where
C: ClContextPtr,
En: ClNullEventPtr,
{
{
let lock_event = self
.create_lock_event(context)
.expect("FutureGuard::enew_lock");
unsafe {
enew.clone_from(lock_event);
}
}
self
}
pub fn set_command_wait_event(&mut self, command_event: Event) {
assert!(
self.command_event.is_none(),
"Command completion event has already been set."
);
self.command_event = Some(command_event);
}
pub fn ewait_command(mut self, command_event: Event) -> FutureGuard<V, G> {
self.set_command_wait_event(command_event);
self
}
pub fn create_release_event<C: ClContextPtr>(&mut self, context: C) -> OclResult<&Event> {
assert!(
self.release_event.is_none(),
"Release event has already been created."
);
self.release_event = Some(Event::user(context)?);
Ok(self.release_event.as_ref().unwrap())
}
pub fn enew_release<C, En>(mut self, context: C, mut enew: En) -> FutureGuard<V, G>
where
C: ClContextPtr,
En: ClNullEventPtr,
{
{
let release_event = self
.create_release_event(context)
.expect("FutureGuard::enew_release");
unsafe {
enew.clone_from(release_event);
}
}
self
}
pub fn lock_event(&self) -> Option<&Event> {
self.lock_event.as_ref()
}
pub fn release_event(&self) -> Option<&Event> {
self.release_event.as_ref()
}
pub fn wait(self) -> OclResult<G> {
<Self as Future>::wait(self)
}
pub fn as_ptr(&self) -> *const V {
self.order_lock
.as_ref()
.map(|order_lock| order_lock.lock.as_ptr())
.expect("FutureGuard::as_ptr: No OrderLock found.")
}
pub fn as_mut_ptr(&self) -> *mut V {
self.order_lock
.as_ref()
.map(|order_lock| order_lock.lock.as_mut_ptr())
.expect("FutureGuard::as_mut_ptr: No OrderLock found.")
}
pub fn order_lock(&self) -> &OrderLock<V> {
self.order_lock
.as_ref()
.expect("FutureGuard::order_lock: No OrderLock found.")
}
fn poll_wait_events(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::WaitEvents);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_wait_events: Called",
);
if let Some(ref mut wait_events) = self.wait_events {
if let Async::NotReady = wait_events.poll()? {
return Ok(Async::NotReady);
}
}
self.stage = Stage::LockQueue;
self.poll_lock()
}
#[cfg(not(feature = "async_block"))]
fn poll_lock(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::LockQueue);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_lock: Called",
);
unsafe {
self.order_lock.as_ref().unwrap().lock.process_queues();
}
if let Some(ref mut lock_rx) = self.lock_rx {
match lock_rx.poll() {
Ok(status) => {
if PRINT_DEBUG {
println!(
"###### [{}] FutureGuard::poll_lock: status: {:?}, \
(thread: {}).",
self.order_lock.as_ref().unwrap().id(),
status,
::std::thread::current().name().unwrap_or("<unnamed>")
);
}
match status {
Async::Ready(_) => {
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete()?
}
self.stage = Stage::Command;
}
Async::NotReady => return Ok(Async::NotReady),
}
}
Err(e) => panic!("FutureGuard::poll_lock: {:?}", e),
}
} else {
unreachable!();
}
self.poll_command()
}
#[cfg(feature = "async_block")]
fn poll_lock(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::LockQueue);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_lock: Called",
);
unsafe {
self.order_lock.as_ref().unwrap().lock.process_queues();
}
self.lock_rx.take().wait()?;
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete()?
}
self.stage = Stage::Command;
return self.poll_command();
}
fn poll_command(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Command);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_command: Called",
);
if let Some(ref mut command_event) = self.command_event {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_command: Event exists",
);
if let Async::NotReady = command_event.poll()? {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_command: Event not ready",
);
return Ok(Async::NotReady);
}
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_command: Event is ready",
);
}
self.command_event = None;
if self.upgrade_after_command {
self.stage = Stage::Upgrade;
self.poll_upgrade()
} else {
Ok(Async::Ready(self.into_guard()))
}
}
#[cfg(not(feature = "async_block"))]
fn poll_upgrade(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Upgrade);
debug_assert!(self.upgrade_after_command);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Called",
);
if self.upgrade_rx.is_none() {
match unsafe { self.order_lock.as_ref().unwrap().lock.upgrade_read_lock() } {
Ok(_) => {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Write lock acquired. Upgrading immediately.",
);
Ok(Async::Ready(self.into_guard()))
}
Err(rx) => {
self.upgrade_rx = Some(rx);
match self.upgrade_rx.as_mut().unwrap().poll() {
Ok(res) => {
match res {
Async::Ready(_) => {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Channel completed. Upgrading.",
);
Ok(Async::Ready(self.into_guard()))
}
Async::NotReady => {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Upgrade rx not ready.",
);
Ok(Async::NotReady)
}
}
}
Err(e) => panic!("FutureGuard::poll_upgrade: {:?}", e),
}
}
}
} else {
match self.upgrade_rx.as_mut().unwrap().poll() {
Ok(status) => {
print_debug(
self.order_lock.as_ref().unwrap().id(),
&format!("FutureGuard::poll_upgrade: Status: {:?}", status),
);
Ok(status.map(|_| self.into_guard()))
}
Err(e) => panic!("FutureGuard::poll_upgrade: {:?}", e),
}
}
}
#[cfg(feature = "async_block")]
fn poll_upgrade(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Upgrade);
debug_assert!(self.upgrade_after_command);
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Called",
);
match unsafe { self.order_lock.as_ref().unwrap().lock.upgrade_read_lock() } {
Ok(_) => Ok(Async::Ready(self.into_guard())),
Err(rx) => {
self.upgrade_rx = Some(rx);
self.upgrade_rx.take().unwrap().wait()?;
Ok(Async::Ready(self.into_guard()))
}
}
}
fn into_guard(&mut self) -> G {
print_debug(
self.order_lock.as_ref().unwrap().id(),
"FutureGuard::into_guard: All polling complete",
);
G::new(self.order_lock.take().unwrap(), self.release_event.take())
}
}
impl<V, G> Future for FutureGuard<V, G>
where
G: OrderGuard<V>,
{
type Item = G;
type Error = OclError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.order_lock.is_some() {
match self.stage {
Stage::WaitEvents => self.poll_wait_events(),
Stage::LockQueue => self.poll_lock(),
Stage::Command => self.poll_command(),
Stage::Upgrade => self.poll_upgrade(),
}
} else {
Err("FutureGuard::poll: Task already completed.".into())
}
}
}
impl<V, G> Drop for FutureGuard<V, G>
where
G: OrderGuard<V>,
{
fn drop(&mut self) {
if cfg!(feature = "future_guard_drop_panic") {
if let Some(ref _order_lock) = self.order_lock {
panic!(
"FutureGuard dropped before being polled. Not polling a FutureGuard \
can cause deadlocks. Call '.wait()' before dropping if necessary."
);
}
}
if let Some(ref mut lock_rx) = self.lock_rx {
lock_rx.close();
match lock_rx.poll() {
Ok(status) => {
match status {
Async::Ready(_) => {
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete().ok();
}
let _guard =
G::new(self.order_lock.take().unwrap(), self.release_event.take());
}
Async::NotReady => (),
}
}
Err(_) => (),
}
}
if let Some(ref ccev) = self.command_event {
ccev.wait_for().expect(
"Error waiting on command completion event \
while dropping 'FutureGuard'",
);
}
if let Some(ref rev) = self.release_event {
rev.set_complete().expect(
"Error setting release event complete \
while dropping 'FutureGuard'",
);
}
}
}
impl<V> FutureGuard<V, ReadGuard<V>> {
pub fn upgrade_after_command(self) -> FutureGuard<V, WriteGuard<V>> {
use std::ptr::read;
let future_guard = unsafe {
FutureGuard {
order_lock: read(&self.order_lock),
lock_rx: read(&self.lock_rx),
wait_events: read(&self.wait_events),
lock_event: read(&self.lock_event),
upgrade_after_command: true,
upgrade_rx: None,
command_event: read(&self.command_event),
release_event: read(&self.release_event),
stage: read(&self.stage),
_guard: PhantomData,
}
};
::std::mem::forget(self);
future_guard
}
}
#[derive(Debug)]
pub struct OrderLock<V> {
lock: QrwLock<V>,
}
impl<V> OrderLock<V> {
#[inline]
pub fn new(data: V) -> OrderLock<V> {
OrderLock {
lock: QrwLock::new(data),
}
}
pub fn read(self) -> FutureGuard<V, ReadGuard<V>> {
print_debug(self.id(), "OrderLock::read: Read lock requested");
let (tx, rx) = oneshot::channel();
unsafe {
self.lock
.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Read));
}
FutureGuard::new(self.into(), rx)
}
pub fn write(self) -> FutureGuard<V, WriteGuard<V>> {
print_debug(self.id(), "OrderLock::write: Write lock requested");
let (tx, rx) = oneshot::channel();
unsafe {
self.lock
.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Write));
}
FutureGuard::new(self.into(), rx)
}
#[inline]
pub fn as_ptr(&self) -> *const V {
self.lock.as_ptr()
}
#[inline]
pub fn as_mut_ptr(&self) -> *mut V {
self.lock.as_mut_ptr()
}
fn id(&self) -> usize {
self.lock.as_ptr() as usize
}
}
impl<V> From<QrwLock<V>> for OrderLock<V> {
fn from(q: QrwLock<V>) -> OrderLock<V> {
OrderLock { lock: q }
}
}
impl<V> From<V> for OrderLock<V> {
fn from(vec: V) -> OrderLock<V> {
OrderLock {
lock: QrwLock::new(vec),
}
}
}
impl<V> Clone for OrderLock<V> {
#[inline]
fn clone(&self) -> OrderLock<V> {
OrderLock {
lock: self.lock.clone(),
}
}
}