use std::marker::PhantomData;
use std::ops::{Deref, DerefMut};
use futures::{Future, Poll, Async};
use futures::sync::oneshot::{self, Receiver};
use crate::core::{ClContextPtr, ClNullEventPtr};
use crate::error::{Error as OclError, Result as OclResult};
use crate::{Event, EventList};
use crate::r#async::qutex::{QrwLock, QrwRequest, RequestKind};
const PRINT_DEBUG: bool = false;
pub type FutureReadGuard<V> = FutureGuard<V, ReadGuard<V>>;
pub type FutureWriteGuard<V> = FutureGuard<V, WriteGuard<V>>;
fn print_debug(id: usize, msg: &str) {
if PRINT_DEBUG {
println!("###### [{}] {} (thread: {})", id, msg,
::std::thread::current().name().unwrap_or("<unnamed>"));
}
}
unsafe fn extract_order_lock<V, G: OrderGuard<V>>(guard: G) -> OrderLock<V> {
let order_lock = ::std::ptr::read(guard.order_lock());
guard.forget();
order_lock
}
pub trait OrderGuard<V> where Self: ::std::marker::Sized {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> Self;
fn order_lock(&self) -> &OrderLock<V>;
unsafe fn forget(self) {
::std::mem::forget(self);
}
}
#[derive(Debug)]
pub struct ReadGuard<V> {
order_lock: OrderLock<V>,
release_event: Option<Event>,
}
impl<V> ReadGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> ReadGuard<V> {
print_debug(order_lock.id(), "ReadGuard::new: read lock acquired");
ReadGuard {
order_lock,
release_event,
}
}
pub fn release_event(guard: &ReadGuard<V>) -> Option<&Event> {
guard.release_event.as_ref()
}
pub fn release(mut guard: ReadGuard<V>) -> OrderLock<V> {
print_debug(guard.order_lock.id(), "WriteGuard::release: releasing read lock");
unsafe {
Self::release_components(&mut guard);
extract_order_lock(guard)
}
}
fn complete_release_event(guard: &mut ReadGuard<V>) {
if let Some(ref e) = guard.release_event.take() {
if !e.is_complete().expect("ReadGuard::complete_release_event") {
print_debug(guard.order_lock.id(), "ReadGuard::complete_release_event: \
setting release event complete");
e.set_complete().expect("ReadGuard::complete_release_event");
}
}
}
unsafe fn release_components(guard: &mut ReadGuard<V>) {
guard.order_lock.lock.release_read_lock();
Self::complete_release_event(guard);
}
}
impl<V> Deref for ReadGuard<V> {
type Target = V;
fn deref(&self) -> &V {
unsafe { &*self.order_lock.lock.as_ptr() }
}
}
impl<V> Drop for ReadGuard<V> {
fn drop(&mut self) {
print_debug(self.order_lock.id(), "dropping and releasing ReadGuard");
unsafe { Self::release_components(self) }
}
}
impl<V> OrderGuard<V> for ReadGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> ReadGuard<V> {
ReadGuard::new(order_lock, release_event)
}
fn order_lock(&self) -> &OrderLock<V> {
&self.order_lock
}
}
#[derive(Debug)]
pub struct WriteGuard<V> {
order_lock: OrderLock<V>,
release_event: Option<Event>,
}
impl<V> WriteGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> WriteGuard<V> {
print_debug(order_lock.id(), "WriteGuard::new: Write lock acquired");
WriteGuard {
order_lock,
release_event,
}
}
pub fn release_event(guard: &WriteGuard<V>) -> Option<&Event> {
guard.release_event.as_ref()
}
pub fn release(mut guard: WriteGuard<V>) -> OrderLock<V> {
print_debug(guard.order_lock.id(), "WriteGuard::release: Releasing write lock");
unsafe {
Self::release_components(&mut guard);
extract_order_lock(guard)
}
}
fn complete_release_event(guard: &mut WriteGuard<V>) {
if let Some(ref e) = guard.release_event.take() {
if !e.is_complete().expect("WriteGuard::complete_release_event") {
print_debug(guard.order_lock.id(), "WriteGuard::complete_release_event: \
Setting release event complete");
e.set_complete().expect("WriteGuard::complete_release_event");
}
}
}
unsafe fn release_components(guard: &mut WriteGuard<V>) {
guard.order_lock.lock.release_write_lock();
Self::complete_release_event(guard);
}
}
impl<V> Deref for WriteGuard<V> {
type Target = V;
fn deref(&self) -> &V {
unsafe { &*self.order_lock.lock.as_ptr() }
}
}
impl<V> DerefMut for WriteGuard<V> {
fn deref_mut(&mut self) -> &mut V {
unsafe { &mut *self.order_lock.lock.as_mut_ptr() }
}
}
impl<V> Drop for WriteGuard<V> {
fn drop(&mut self) {
print_debug(self.order_lock.id(), "WriteGuard::drop: Dropping and releasing WriteGuard");
unsafe { Self::release_components(self) }
}
}
impl<V> OrderGuard<V> for WriteGuard<V> {
fn new(order_lock: OrderLock<V>, release_event: Option<Event>) -> WriteGuard<V> {
WriteGuard::new(order_lock, release_event)
}
fn order_lock(&self) -> &OrderLock<V> {
&self.order_lock
}
}
#[derive(Debug, PartialEq)]
enum Stage {
WaitEvents,
LockQueue,
Command,
Upgrade,
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureGuard<V, G> where G: OrderGuard<V> {
order_lock: Option<OrderLock<V>>,
lock_rx: Option<Receiver<()>>,
wait_events: Option<EventList>,
lock_event: Option<Event>,
command_event: Option<Event>,
upgrade_after_command: bool,
upgrade_rx: Option<Receiver<()>>,
release_event: Option<Event>,
stage: Stage,
_guard: PhantomData<G>,
}
impl<V, G> FutureGuard<V, G> where G: OrderGuard<V> {
fn new(order_lock: OrderLock<V>, lock_rx: Receiver<()>) -> FutureGuard<V, G> {
FutureGuard {
order_lock: Some(order_lock),
lock_rx: Some(lock_rx),
wait_events: None,
lock_event: None,
command_event: None,
upgrade_after_command: false,
upgrade_rx: None,
release_event: None,
stage: Stage::WaitEvents,
_guard: PhantomData,
}
}
pub fn set_lock_wait_events<L: Into<EventList>>(&mut self, wait_events: L) {
assert!(self.wait_events.is_none(), "Wait list has already been set.");
self.wait_events = Some(wait_events.into());
}
pub fn ewait_lock<L: Into<EventList>>(mut self, wait_events: L) -> FutureGuard<V, G> {
self.set_lock_wait_events(wait_events);
self
}
pub fn create_lock_event<C: ClContextPtr>(&mut self, context: C) -> OclResult<&Event> {
assert!(self.lock_event.is_none(), "Lock event has already been created.");
self.lock_event = Some(Event::user(context)?);
Ok(self.lock_event.as_mut().unwrap())
}
pub fn enew_lock<C, En>(mut self, context: C, mut enew: En) -> FutureGuard<V, G>
where C: ClContextPtr, En: ClNullEventPtr {
{
let lock_event = self.create_lock_event(context).expect("FutureGuard::enew_lock");
unsafe { enew.clone_from(lock_event); }
}
self
}
pub fn set_command_wait_event(&mut self, command_event: Event) {
assert!(self.command_event.is_none(), "Command completion event has already been set.");
self.command_event = Some(command_event);
}
pub fn ewait_command(mut self, command_event: Event) -> FutureGuard<V, G> {
self.set_command_wait_event(command_event);
self
}
pub fn create_release_event<C: ClContextPtr>(&mut self, context: C) -> OclResult<&Event> {
assert!(self.release_event.is_none(), "Release event has already been created.");
self.release_event = Some(Event::user(context)?);
Ok(self.release_event.as_ref().unwrap())
}
pub fn enew_release<C, En>(mut self, context: C, mut enew: En) -> FutureGuard<V, G>
where C: ClContextPtr, En: ClNullEventPtr {
{
let release_event = self.create_release_event(context).expect("FutureGuard::enew_release");
unsafe { enew.clone_from(release_event); }
}
self
}
pub fn lock_event(&self) -> Option<&Event> {
self.lock_event.as_ref()
}
pub fn release_event(&self) -> Option<&Event> {
self.release_event.as_ref()
}
pub fn wait(self) -> OclResult<G> {
<Self as Future>::wait(self)
}
pub fn as_ptr(&self) -> *const V {
self.order_lock.as_ref().map(|order_lock| order_lock.lock.as_ptr())
.expect("FutureGuard::as_ptr: No OrderLock found.")
}
pub fn as_mut_ptr(&self) -> *mut V {
self.order_lock.as_ref().map(|order_lock| order_lock.lock.as_mut_ptr())
.expect("FutureGuard::as_mut_ptr: No OrderLock found.")
}
pub fn order_lock(&self) -> &OrderLock<V> {
self.order_lock.as_ref().expect("FutureGuard::order_lock: No OrderLock found.")
}
fn poll_wait_events(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::WaitEvents);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_wait_events: Called");
if let Some(ref mut wait_events) = self.wait_events {
if let Async::NotReady = wait_events.poll()? {
return Ok(Async::NotReady);
}
}
self.stage = Stage::LockQueue;
self.poll_lock()
}
#[cfg(not(feature = "async_block"))]
fn poll_lock(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::LockQueue);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_lock: Called");
unsafe { self.order_lock.as_ref().unwrap().lock.process_queues(); }
if let Some(ref mut lock_rx) = self.lock_rx {
match lock_rx.poll() {
Ok(status) => {
if PRINT_DEBUG { println!("###### [{}] FutureGuard::poll_lock: status: {:?}, \
(thread: {}).", self.order_lock.as_ref().unwrap().id(), status,
::std::thread::current().name().unwrap_or("<unnamed>")); }
match status {
Async::Ready(_) => {
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete()?
}
self.stage = Stage::Command;
},
Async::NotReady => return Ok(Async::NotReady),
}
},
Err(e) => panic!("FutureGuard::poll_lock: {:?}", e),
}
} else {
unreachable!();
}
self.poll_command()
}
#[cfg(feature = "async_block")]
fn poll_lock(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::LockQueue);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_lock: Called");
unsafe { self.order_lock.as_ref().unwrap().lock.process_queues(); }
self.lock_rx.take().wait()?;
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete()?
}
self.stage = Stage::Command;
return self.poll_command();
}
fn poll_command(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Command);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_command: Called");
if let Some(ref mut command_event) = self.command_event {
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_command: Event exists");
if let Async::NotReady = command_event.poll()? {
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_command: Event not ready");
return Ok(Async::NotReady);
}
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_command: Event is ready");
}
self.command_event = None;
if self.upgrade_after_command {
self.stage = Stage::Upgrade;
self.poll_upgrade()
} else {
Ok(Async::Ready(self.into_guard()))
}
}
#[cfg(not(feature = "async_block"))]
fn poll_upgrade(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Upgrade);
debug_assert!(self.upgrade_after_command);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_upgrade: Called");
if self.upgrade_rx.is_none() {
match unsafe { self.order_lock.as_ref().unwrap().lock.upgrade_read_lock() } {
Ok(_) => {
print_debug(self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Write lock acquired. Upgrading immediately.");
Ok(Async::Ready(self.into_guard()))
},
Err(rx) => {
self.upgrade_rx = Some(rx);
match self.upgrade_rx.as_mut().unwrap().poll() {
Ok(res) => {
match res {
Async::Ready(_) => {
print_debug(self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Channel completed. Upgrading.");
Ok(Async::Ready(self.into_guard()))
},
Async::NotReady => {
print_debug(self.order_lock.as_ref().unwrap().id(),
"FutureGuard::poll_upgrade: Upgrade rx not ready.");
Ok(Async::NotReady)
},
}
},
Err(e) => panic!("FutureGuard::poll_upgrade: {:?}", e),
}
},
}
} else {
match self.upgrade_rx.as_mut().unwrap().poll() {
Ok(status) => {
print_debug(self.order_lock.as_ref().unwrap().id(),
&format!("FutureGuard::poll_upgrade: Status: {:?}", status));
Ok(status.map(|_| self.into_guard()))
},
Err(e) => panic!("FutureGuard::poll_upgrade: {:?}", e),
}
}
}
#[cfg(feature = "async_block")]
fn poll_upgrade(&mut self) -> OclResult<Async<G>> {
debug_assert!(self.stage == Stage::Upgrade);
debug_assert!(self.upgrade_after_command);
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::poll_upgrade: Called");
match unsafe { self.order_lock.as_ref().unwrap().lock.upgrade_read_lock() } {
Ok(_) => Ok(Async::Ready(self.into_guard())),
Err(rx) => {
self.upgrade_rx = Some(rx);
self.upgrade_rx.take().unwrap().wait()?;
Ok(Async::Ready(self.into_guard()))
}
}
}
fn into_guard(&mut self) -> G {
print_debug(self.order_lock.as_ref().unwrap().id(), "FutureGuard::into_guard: All polling complete");
G::new(self.order_lock.take().unwrap(), self.release_event.take())
}
}
impl<V, G> Future for FutureGuard<V, G> where G: OrderGuard<V> {
type Item = G;
type Error = OclError;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.order_lock.is_some() {
match self.stage {
Stage::WaitEvents => self.poll_wait_events(),
Stage::LockQueue => self.poll_lock(),
Stage::Command => self.poll_command(),
Stage::Upgrade => self.poll_upgrade(),
}
} else {
Err("FutureGuard::poll: Task already completed.".into())
}
}
}
impl<V, G> Drop for FutureGuard<V, G> where G: OrderGuard<V> {
fn drop(&mut self) {
if cfg!(feature = "future_guard_drop_panic") {
if let Some(ref _order_lock) = self.order_lock {
panic!("FutureGuard dropped before being polled. Not polling a FutureGuard \
can cause deadlocks. Call '.wait()' before dropping if necessary.");
}
}
if let Some(ref mut lock_rx) = self.lock_rx {
lock_rx.close();
match lock_rx.poll() {
Ok(status) => {
match status {
Async::Ready(_) => {
if let Some(ref lock_event) = self.lock_event {
lock_event.set_complete().ok();
}
let _guard = G::new(self.order_lock.take().unwrap(),
self.release_event.take());
},
Async::NotReady => (),
}
},
Err(_) => (),
}
}
if let Some(ref ccev) = self.command_event {
ccev.wait_for().expect("Error waiting on command completion event \
while dropping 'FutureGuard'");
}
if let Some(ref rev) = self.release_event {
rev.set_complete().expect("Error setting release event complete \
while dropping 'FutureGuard'");
}
}
}
impl<V> FutureGuard<V, ReadGuard<V>> {
pub fn upgrade_after_command(self) -> FutureGuard<V, WriteGuard<V>> {
use std::ptr::read;
let future_guard = unsafe {
FutureGuard {
order_lock: read(&self.order_lock),
lock_rx: read(&self.lock_rx),
wait_events: read(&self.wait_events),
lock_event: read(&self.lock_event),
upgrade_after_command: true,
upgrade_rx: None,
command_event: read(&self.command_event),
release_event: read(&self.release_event),
stage: read(&self.stage),
_guard: PhantomData,
}
};
::std::mem::forget(self);
future_guard
}
}
#[derive(Debug)]
pub struct OrderLock<V> {
lock: QrwLock<V>,
}
impl<V> OrderLock<V> {
#[inline]
pub fn new(data: V) -> OrderLock<V> {
OrderLock {
lock: QrwLock::new(data)
}
}
pub fn read(self) -> FutureGuard<V, ReadGuard<V>> {
print_debug(self.id(), "OrderLock::read: Read lock requested");
let (tx, rx) = oneshot::channel();
unsafe { self.lock.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Read)); }
FutureGuard::new(self.into(), rx)
}
pub fn write(self) -> FutureGuard<V, WriteGuard<V>> {
print_debug(self.id(), "OrderLock::write: Write lock requested");
let (tx, rx) = oneshot::channel();
unsafe { self.lock.enqueue_lock_request(QrwRequest::new(tx, RequestKind::Write)); }
FutureGuard::new(self.into(), rx)
}
#[inline]
pub fn as_ptr(&self) -> *const V {
self.lock.as_ptr()
}
#[inline]
pub fn as_mut_ptr(&self) -> *mut V {
self.lock.as_mut_ptr()
}
fn id(&self) -> usize {
self.lock.as_ptr() as usize
}
}
impl<V> From<QrwLock<V>> for OrderLock<V> {
fn from(q: QrwLock<V>) -> OrderLock<V> {
OrderLock { lock: q }
}
}
impl<V> From<V> for OrderLock<V> {
fn from(vec: V) -> OrderLock<V> {
OrderLock { lock: QrwLock::new(vec) }
}
}
impl<V> Clone for OrderLock<V> {
#[inline]
fn clone(&self) -> OrderLock<V> {
OrderLock {
lock: self.lock.clone(),
}
}
}