use futures::{Async, Future, Poll};
use futures::sync::oneshot;
use std::cell::UnsafeCell;
use std::clone::Clone;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync;
use super::FutState;
pub struct MutexGuard<T: ?Sized> {
mutex: Mutex<T>
}
impl<T: ?Sized> Drop for MutexGuard<T> {
fn drop(&mut self) {
self.mutex.unlock();
}
}
impl<T: ?Sized> Deref for MutexGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe {&*self.mutex.inner.data.get()}
}
}
impl<T: ?Sized> DerefMut for MutexGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe {&mut *self.mutex.inner.data.get()}
}
}
pub struct MutexFut<T: ?Sized> {
state: FutState,
mutex: Mutex<T>,
}
impl<T: ?Sized> MutexFut<T> {
fn new(state: FutState, mutex: Mutex<T>) -> Self {
MutexFut{state, mutex}
}
}
impl<T: ?Sized> Drop for MutexFut<T> {
fn drop(&mut self) {
match &mut self.state {
&mut FutState::New => {
},
&mut FutState::Pending(ref mut rx) => {
rx.close();
match rx.poll() {
Ok(Async::Ready(())) => {
self.mutex.unlock()
},
Ok(Async::NotReady) => {
},
Err(oneshot::Canceled) => {
}
}
},
&mut FutState::Acquired => {
}
}
}
}
impl<T> Future for MutexFut<T> {
type Item = MutexGuard<T>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (result, new_state) = match &mut self.state {
&mut FutState::New => {
let mut mtx_data = self.mutex.inner.mutex.lock()
.expect("sync::Mutex::lock");
if mtx_data.owned {
let (tx, mut rx) = oneshot::channel::<()>();
mtx_data.waiters.push_back(tx);
assert!(rx.poll().unwrap().is_not_ready());
(Ok(Async::NotReady), FutState::Pending(rx))
} else {
mtx_data.owned = true;
let guard = MutexGuard{mutex: self.mutex.clone()};
(Ok(Async::Ready(guard)), FutState::Acquired)
}
},
&mut FutState::Pending(ref mut rx) => {
match rx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(_) => unreachable!(),
Ok(Async::Ready(_)) => {
let state = FutState::Acquired;
let result = Ok(Async::Ready(
MutexGuard{mutex: self.mutex.clone()}));
(result, state)
} }
},
&mut FutState::Acquired => panic!("Double-poll of ready Future")
};
self.state = new_state;
result
}
}
#[derive(Debug)]
struct MutexData {
owned: bool,
waiters: VecDeque<oneshot::Sender<()>>,
}
#[derive(Debug)]
struct Inner<T: ?Sized> {
mutex: sync::Mutex<MutexData>,
data: UnsafeCell<T>,
}
#[derive(Debug)]
pub struct Mutex<T: ?Sized> {
inner: sync::Arc<Inner<T>>,
}
impl<T: ?Sized> Clone for Mutex<T> {
fn clone(&self) -> Mutex<T> {
Mutex { inner: self.inner.clone()}
}
}
impl<T> Mutex<T> {
pub fn new(t: T) -> Mutex<T> {
let mutex_data = MutexData {
owned: false,
waiters: VecDeque::new(),
};
let inner = Inner {
mutex: sync::Mutex::new(mutex_data),
data: UnsafeCell::new(t)
}; Mutex { inner: sync::Arc::new(inner)}
}
pub fn try_unwrap(self) -> Result<T, Mutex<T>> {
match sync::Arc::try_unwrap(self.inner) {
Ok(inner) => Ok({
#[allow(unused_unsafe)]
unsafe { inner.data.into_inner() }
}),
Err(arc) => Err(Mutex {inner: arc})
}
}
}
impl<T: ?Sized> Mutex<T> {
pub fn get_mut(&mut self) -> Option<&mut T> {
if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
let lock_data = inner.mutex.get_mut().unwrap();
let data = unsafe { inner.data.get().as_mut() }.unwrap();
debug_assert!(!lock_data.owned);
Some(data)
} else {
None
}
}
pub fn lock(&self) -> MutexFut<T> {
MutexFut::new(FutState::New, self.clone())
}
pub fn try_lock(&self) -> Result<MutexGuard<T>, ()> {
let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
if mtx_data.owned {
Err(())
} else {
mtx_data.owned = true;
Ok(MutexGuard{mutex: self.clone()})
}
}
fn unlock(&self) {
let mut mtx_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
assert!(mtx_data.owned);
if let Some(tx) = mtx_data.waiters.pop_front() {
tx.send(()).expect("Sender::send");
} else {
mtx_data.owned = false;
}
}
}
unsafe impl<T: ?Sized + Send> Send for Mutex<T> {}
unsafe impl<T: ?Sized + Send> Sync for Mutex<T> {}