use crossbeam::queue::SegQueue;
use futures::sync::oneshot::{self, Canceled, Receiver, Sender};
use futures::{Future, Poll};
use std::cell::UnsafeCell;
use std::ops::{Deref, DerefMut};
use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::Arc;
#[derive(Debug)]
pub struct Guard<T> {
qutex: Qutex<T>,
}
impl<T> Guard<T> {
pub fn unlock(guard: Guard<T>) -> Qutex<T> {
let qutex = unsafe { ::std::ptr::read(&guard.qutex) };
::std::mem::forget(guard);
unsafe { qutex.direct_unlock() }
qutex
}
}
impl<T> Deref for Guard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.qutex.inner.cell.get() }
}
}
impl<T> DerefMut for Guard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.qutex.inner.cell.get() }
}
}
impl<T> Drop for Guard<T> {
fn drop(&mut self) {
unsafe { self.qutex.direct_unlock() }
}
}
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct FutureGuard<T> {
qutex: Option<Qutex<T>>,
rx: Receiver<()>,
}
impl<T> FutureGuard<T> {
fn new(qutex: Qutex<T>, rx: Receiver<()>) -> FutureGuard<T> {
FutureGuard {
qutex: Some(qutex),
rx: rx,
}
}
#[inline]
pub fn wait(self) -> Result<Guard<T>, Canceled> {
<Self as Future>::wait(self)
}
}
impl<T> Future for FutureGuard<T> {
type Item = Guard<T>;
type Error = Canceled;
#[inline]
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.qutex.is_some() {
unsafe { self.qutex.as_ref().unwrap().process_queue() }
match self.rx.poll() {
Ok(status) => Ok(status.map(|_| Guard {
qutex: self.qutex.take().unwrap(),
})),
Err(e) => Err(e.into()),
}
} else {
panic!("FutureGuard::poll: Task already completed.");
}
}
}
impl<T> Drop for FutureGuard<T> {
fn drop(&mut self) {
if let Some(qutex) = self.qutex.take() {
self.rx.close();
match self.rx.try_recv() {
Ok(status) => {
if status.is_some() {
unsafe {
qutex.direct_unlock();
}
}
}
Err(_) => (),
}
}
}
}
#[derive(Debug)]
pub struct Request {
tx: Sender<()>,
}
impl Request {
pub fn new(tx: Sender<()>) -> Request {
Request { tx: tx }
}
}
#[derive(Debug)]
struct Inner<T> {
state: AtomicUsize,
cell: UnsafeCell<T>,
queue: SegQueue<Request>,
}
impl<T> From<T> for Inner<T> {
#[inline]
fn from(val: T) -> Inner<T> {
Inner {
state: AtomicUsize::new(0),
cell: UnsafeCell::new(val),
queue: SegQueue::new(),
}
}
}
unsafe impl<T: Send> Send for Inner<T> {}
unsafe impl<T: Send> Sync for Inner<T> {}
#[derive(Debug)]
pub struct Qutex<T> {
inner: Arc<Inner<T>>,
}
impl<T> Qutex<T> {
#[inline]
pub fn new(val: T) -> Qutex<T> {
Qutex {
inner: Arc::new(Inner::from(val)),
}
}
pub fn lock(self) -> FutureGuard<T> {
let (tx, rx) = oneshot::channel();
unsafe {
self.push_request(Request::new(tx));
}
FutureGuard::new(self, rx)
}
#[inline]
pub unsafe fn push_request(&self, req: Request) {
self.inner.queue.push(req);
}
#[inline]
pub fn get_mut(&mut self) -> Option<&mut T> {
Arc::get_mut(&mut self.inner).map(|inn| unsafe { &mut *inn.cell.get() })
}
#[inline]
pub fn as_ptr(&self) -> *const T {
self.inner.cell.get()
}
#[inline]
pub fn as_mut_ptr(&self) -> *mut T {
self.inner.cell.get()
}
pub unsafe fn process_queue(&self) {
match self.inner.state.compare_and_swap(0, 1, SeqCst) {
0 => {
loop {
if let Some(req) = self.inner.queue.pop().ok() {
if req.tx.send(()).is_err() {
continue;
} else {
break;
}
} else {
self.inner.state.store(0, SeqCst);
break;
}
}
}
1 => (),
n => panic!("Qutex::process_queue: inner.state: {}.", n),
}
}
pub unsafe fn direct_unlock(&self) {
self.inner.state.store(0, SeqCst);
self.process_queue()
}
}
impl<T> From<T> for Qutex<T> {
#[inline]
fn from(val: T) -> Qutex<T> {
Qutex::new(val)
}
}
impl<T> Clone for Qutex<T> {
#[inline]
fn clone(&self) -> Qutex<T> {
Qutex {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::Future;
#[test]
fn simple() {
let val = Qutex::from(999i32);
println!("Reading val...");
{
let future_guard = val.clone().lock();
let guard = future_guard.wait().unwrap();
println!("val: {}", *guard);
}
println!("Storing new val...");
{
let future_guard = val.clone().lock();
let mut guard = future_guard.wait().unwrap();
*guard = 5;
}
println!("Reading val...");
{
let future_guard = val.clone().lock();
let guard = future_guard.wait().unwrap();
println!("val: {}", *guard);
}
}
#[test]
fn concurrent() {
use std::thread;
let thread_count = 20;
let mut threads = Vec::with_capacity(thread_count);
let start_val = 0i32;
let qutex = Qutex::new(start_val);
for i in 0..thread_count {
let future_guard = qutex.clone().lock();
let future_write = future_guard.and_then(|mut guard| {
*guard += 1;
Ok(())
});
threads.push(
thread::Builder::new()
.name(format!("test_thread_{}", i))
.spawn(|| future_write.wait().unwrap())
.unwrap(),
);
}
for i in 0..thread_count {
let future_guard = qutex.clone().lock();
threads.push(
thread::Builder::new()
.name(format!("test_thread_{}", i + thread_count))
.spawn(|| {
let mut guard = future_guard.wait().unwrap();
*guard -= 1;
})
.unwrap(),
)
}
for thread in threads {
thread.join().unwrap();
}
let guard = qutex.clone().lock().wait().unwrap();
assert_eq!(*guard, start_val);
}
#[test]
fn future_guard_drop() {
let lock = Qutex::from(true);
let _future_guard_0 = lock.clone().lock();
let _future_guard_1 = lock.clone().lock();
let _future_guard_2 = lock.clone().lock();
}
#[test]
fn explicit_unlock() {
let lock = Qutex::from(true);
let mut guard_0 = lock.clone().lock().wait().unwrap();
*guard_0 = false;
let _ = Guard::unlock(guard_0);
let guard_1 = lock.clone().lock().wait().unwrap();
assert!(*guard_1 == false);
}
}