// vim: tw=80
use futures::{Async, Future, Poll};
use futures::sync::oneshot;
use std::cell::UnsafeCell;
use std::clone::Clone;
use std::collections::VecDeque;
use std::ops::{Deref, DerefMut};
use std::sync;
use super::FutState;
/// An RAII guard, much like `std::sync::RwLockReadGuard`. The wrapped data can
/// be accessed via its `Deref` implementation.
pub struct RwLockReadGuard<T: ?Sized> {
rwlock: RwLock<T>
}
impl<T: ?Sized> Deref for RwLockReadGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe {&*self.rwlock.inner.data.get()}
}
}
impl<T: ?Sized> Drop for RwLockReadGuard<T> {
fn drop(&mut self) {
self.rwlock.unlock_reader();
}
}
/// An RAII guard, much like `std::sync::RwLockWriteGuard`. The wrapped data
/// can be accessed via its `Deref` and `DerefMut` implementations.
pub struct RwLockWriteGuard<T: ?Sized> {
rwlock: RwLock<T>
}
impl<T: ?Sized> Deref for RwLockWriteGuard<T> {
type Target = T;
fn deref(&self) -> &T {
unsafe {&*self.rwlock.inner.data.get()}
}
}
impl<T: ?Sized> DerefMut for RwLockWriteGuard<T> {
fn deref_mut(&mut self) -> &mut T {
unsafe {&mut *self.rwlock.inner.data.get()}
}
}
impl<T: ?Sized> Drop for RwLockWriteGuard<T> {
fn drop(&mut self) {
self.rwlock.unlock_writer();
}
}
/// A `Future` representing a pending `RwLock` shared acquisition.
pub struct RwLockReadFut<T: ?Sized> {
state: FutState,
rwlock: RwLock<T>,
}
impl<T: ?Sized> RwLockReadFut<T> {
fn new(state: FutState, rwlock: RwLock<T>) -> Self {
RwLockReadFut{state, rwlock}
}
}
impl<T: ?Sized> Drop for RwLockReadFut<T> {
fn drop(&mut self) {
match &mut self.state {
&mut FutState::New => {
// RwLock hasn't yet been modified; nothing to do
},
&mut FutState::Pending(ref mut rx) => {
rx.close();
// TODO: futures-0.2.0 introduces a try_recv method that is
// better to use here than poll. Use it after upgrading to
// futures >= 0.2.0
match rx.poll() {
Ok(Async::Ready(())) => {
// This future received ownership of the lock, but got
// dropped before it was ever polled. Release the
// lock.
self.rwlock.unlock_reader()
},
Ok(Async::NotReady) => {
// Dropping the Future before it acquires the lock is
// equivalent to cancelling it.
},
Err(oneshot::Canceled) => {
// Never received ownership of the lock
}
}
},
&mut FutState::Acquired => {
// The RwLockReadGuard will take care of releasing the RwLock
}
}
}
}
impl<T> Future for RwLockReadFut<T> {
type Item = RwLockReadGuard<T>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (result, new_state) = match &mut self.state {
&mut FutState::New => {
let mut lock_data = self.rwlock.inner.mutex.lock()
.expect("sync::Mutex::lock");
if lock_data.exclusive {
let (tx, mut rx) = oneshot::channel::<()>();
lock_data.read_waiters.push_back(tx);
// Even though we know it isn't ready, we need to poll the
// receiver in order to register our task for notification.
assert!(rx.poll().unwrap().is_not_ready());
(Ok(Async::NotReady), FutState::Pending(rx))
} else {
lock_data.num_readers += 1;
let guard = RwLockReadGuard{rwlock: self.rwlock.clone()};
(Ok(Async::Ready(guard)), FutState::Acquired)
}
},
&mut FutState::Pending(ref mut rx) => {
match rx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// It's impossible for receiver.poll() to return an error.
// The only way that would happen is if the sender got
// dropped. But that can't happen because the RwLock owns
// the sender, and the Fut retains a clone of the RwLock.
Err(_) => unreachable!(),
Ok(Async::Ready(_)) => {
let state = FutState::Acquired;
let result = Ok(Async::Ready(
RwLockReadGuard{rwlock: self.rwlock.clone()}));
(result, state)
} // LCOV_EXCL_LINE kcov false negative
}
},
&mut FutState::Acquired => panic!("Double-poll of ready Future")
};
self.state = new_state;
result
}
}
/// A `Future` representing a pending `RwLock` exclusive acquisition.
pub struct RwLockWriteFut<T: ?Sized> {
state: FutState,
rwlock: RwLock<T>,
}
impl<T: ?Sized> RwLockWriteFut<T> {
fn new(state: FutState, rwlock: RwLock<T>) -> Self {
RwLockWriteFut{state, rwlock}
}
}
impl<T: ?Sized> Drop for RwLockWriteFut<T> {
fn drop(&mut self) {
match &mut self.state {
&mut FutState::New => {
// RwLock hasn't yet been modified; nothing to do
},
&mut FutState::Pending(ref mut rx) => {
rx.close();
// TODO: futures-0.2.0 introduces a try_recv method that is
// better to use here than poll. Use it after upgrading to
// futures >= 0.2.0
match rx.poll() {
Ok(Async::Ready(())) => {
// This future received ownership of the lock, but got
// dropped before it was ever polled. Release the
// lock.
self.rwlock.unlock_writer()
},
Ok(Async::NotReady) => {
// Dropping the Future before it acquires the lock is
// equivalent to cancelling it.
},
Err(oneshot::Canceled) => {
// Never received ownership of the lock
}
}
},
&mut FutState::Acquired => {
// The RwLockWriteGuard will take care of releasing the RwLock
}
}
}
}
impl<T> Future for RwLockWriteFut<T> {
type Item = RwLockWriteGuard<T>;
type Error = ();
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let (result, new_state) = match &mut self.state {
&mut FutState::New => {
let mut lock_data = self.rwlock.inner.mutex.lock()
.expect("sync::Mutex::lock");
if lock_data.exclusive || lock_data.num_readers > 0 {
let (tx, mut rx) = oneshot::channel::<()>();
lock_data.write_waiters.push_back(tx);
// Even though we know it isn't ready, we need to poll the
// receiver in order to register our task for notification.
assert!(rx.poll().unwrap().is_not_ready());
(Ok(Async::NotReady), FutState::Pending(rx))
} else {
lock_data.exclusive = true;
let guard = RwLockWriteGuard{rwlock: self.rwlock.clone()};
(Ok(Async::Ready(guard)), FutState::Acquired)
}
},
&mut FutState::Pending(ref mut rx) => {
match rx.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
// It's impossible for receiver.poll() to return an error.
// The only way that would happen is if the sender got
// dropped. But that can't happen because the RwLock owns
// the sender, and the Fut retains a clone of the RwLock.
Err(_) => unreachable!(),
Ok(Async::Ready(_)) => {
let state = FutState::Acquired;
let result = Ok(Async::Ready(
RwLockWriteGuard{rwlock: self.rwlock.clone()}));
(result, state)
} // LCOV_EXCL_LINE kcov false negative
}
},
&mut FutState::Acquired => panic!("Double-poll of ready Future")
};
self.state = new_state;
result
}
}
// LCOV_EXCL_START
#[derive(Debug)]
struct RwLockData {
/// True iff the `RwLock` is currently exclusively owned
exclusive: bool,
/// The number of tasks that currently have shared ownership of the RwLock
num_readers: u32,
// FIFO queue of waiting readers
read_waiters: VecDeque<oneshot::Sender<()>>,
// FIFO queue of waiting writers
write_waiters: VecDeque<oneshot::Sender<()>>,
}
// LCOV_EXCL_STOP
// LCOV_EXCL_START
#[derive(Debug)]
struct Inner<T: ?Sized> {
mutex: sync::Mutex<RwLockData>,
data: UnsafeCell<T>,
}
// LCOV_EXCL_STOP
/// A Futures-aware RwLock.
///
/// `std::sync::RwLock` cannot be used in an asynchronous environment like
/// Tokio, because an acquisition can block an entire reactor. This class can
/// be used instead. It functions much like `std::sync::RwLock`. Unlike that
/// class, it also has a builtin `Arc`, making it accessible from multiple
/// threads. It's also safe to `clone`. Also unlike `std::sync::RwLock`, this
/// class does not detect lock poisoning.
// LCOV_EXCL_START
#[derive(Debug)]
pub struct RwLock<T: ?Sized> {
inner: sync::Arc<Inner<T>>,
}
// LCOV_EXCL_STOP
impl<T: ?Sized> Clone for RwLock<T> {
fn clone(&self) -> RwLock<T> {
RwLock { inner: self.inner.clone()}
}
}
impl<T> RwLock<T> {
/// Create a new `RwLock` in the unlocked state.
pub fn new(t: T) -> RwLock<T> {
let lock_data = RwLockData {
exclusive: false,
num_readers: 0,
read_waiters: VecDeque::new(),
write_waiters: VecDeque::new(),
}; // LCOV_EXCL_LINE kcov false negative
let inner = Inner {
mutex: sync::Mutex::new(lock_data),
data: UnsafeCell::new(t)
}; // LCOV_EXCL_LINE kcov false negative
RwLock { inner: sync::Arc::new(inner)}
}
/// Consumes the `RwLock` and returns the wrapped data. If the `RwLock`
/// still has multiple references (not necessarily locked), returns a copy
/// of `self` instead.
pub fn try_unwrap(self) -> Result<T, RwLock<T>> {
match sync::Arc::try_unwrap(self.inner) {
Ok(inner) => Ok({
// `unsafe` is no longer needed as of somewhere around 1.25.0.
// https://github.com/rust-lang/rust/issues/35067
#[allow(unused_unsafe)]
unsafe { inner.data.into_inner() }
}),
Err(arc) => Err(RwLock {inner: arc})
}
}
}
impl<T: ?Sized> RwLock<T> {
/// Returns a reference to the underlying data, if there are no other
/// clones of the `RwLock`.
///
/// Since this call borrows the `RwLock` mutably, no actual locking takes
/// place -- the mutable borrow statically guarantees no locks exist.
/// However, if the `RwLock` has already been cloned, then `None` will be
/// returned instead.
///
/// # Examples
///
/// ```
/// # extern crate futures_locks;
/// # use futures_locks::*;
/// # fn main() {
/// let mut lock = RwLock::<u32>::new(0);
/// *lock.get_mut().unwrap() += 5;
/// assert_eq!(lock.try_unwrap().unwrap(), 5);
/// # }
/// ```
pub fn get_mut(&mut self) -> Option<&mut T> {
if let Some(inner) = sync::Arc::get_mut(&mut self.inner) {
let lock_data = inner.mutex.get_mut().unwrap();
let data = unsafe { inner.data.get().as_mut() }.unwrap();
debug_assert!(!lock_data.exclusive);
debug_assert_eq!(lock_data.num_readers, 0);
Some(data)
} else {
None
}
}
/// Acquire the `RwLock` nonexclusively, read-only, blocking the task in the
/// meantime.
///
/// When the returned `Future` is ready, then this task will have read-only
/// access to the protected data.
///
/// # Examples
/// ```
/// # extern crate futures;
/// # extern crate futures_locks;
/// # use futures_locks::*;
/// # use futures::executor::{Spawn, spawn};
/// # use futures::Future;
/// # fn main() {
/// let rwlock = RwLock::<u32>::new(42);
/// let fut = rwlock.read().map(|mut guard| { *guard });
/// assert_eq!(spawn(fut).wait_future(), Ok(42));
/// # }
///
/// ```
pub fn read(&self) -> RwLockReadFut<T> {
return RwLockReadFut::new(FutState::New, self.clone())
}
/// Acquire the `RwLock` exclusively, read-write, blocking the task in the
/// meantime.
///
/// When the returned `Future` is ready, then this task will have read-write
/// access to the protected data.
///
/// # Examples
/// ```
/// # extern crate futures;
/// # extern crate futures_locks;
/// # use futures_locks::*;
/// # use futures::executor::{Spawn, spawn};
/// # use futures::Future;
/// # fn main() {
/// let rwlock = RwLock::<u32>::new(42);
/// let fut = rwlock.write().map(|mut guard| { *guard = 5;});
/// spawn(fut).wait_future().expect("spawn");
/// assert_eq!(rwlock.try_unwrap().unwrap(), 5);
/// # }
///
/// ```
pub fn write(&self) -> RwLockWriteFut<T> {
return RwLockWriteFut::new(FutState::New, self.clone())
}
/// Attempts to acquire the `RwLock` nonexclusively.
///
/// If the operation would block, returns `Err` instead. Otherwise, returns
/// a guard (not a `Future`).
///
/// # Examples
/// ```
/// # extern crate futures_locks;
/// # use futures_locks::*;
/// # fn main() {
/// let mut lock = RwLock::<u32>::new(5);
/// let r = match lock.try_read() {
/// Ok(guard) => *guard,
/// Err(()) => panic!("Better luck next time!")
/// };
/// assert_eq!(5, r);
/// # }
/// ```
pub fn try_read(&self) -> Result<RwLockReadGuard<T>, ()> {
let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
if lock_data.exclusive {
Err(())
} else {
lock_data.num_readers += 1;
Ok(RwLockReadGuard{rwlock: self.clone()})
}
}
/// Attempts to acquire the `RwLock` exclusively.
///
/// If the operation would block, returns `Err` instead. Otherwise, returns
/// a guard (not a `Future`).
///
/// # Examples
/// ```
/// # extern crate futures_locks;
/// # use futures_locks::*;
/// # fn main() {
/// let mut lock = RwLock::<u32>::new(5);
/// match lock.try_write() {
/// Ok(mut guard) => *guard += 5,
/// Err(()) => panic!("Better luck next time!")
/// }
/// assert_eq!(10, lock.try_unwrap().unwrap());
/// # }
/// ```
pub fn try_write(&self) -> Result<RwLockWriteGuard<T>, ()> {
let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
if lock_data.exclusive || lock_data.num_readers > 0 {
Err(())
} else {
lock_data.exclusive = true;
Ok(RwLockWriteGuard{rwlock: self.clone()})
}
}
/// Release a shared lock of an `RwLock`.
fn unlock_reader(&self) {
let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
assert!(lock_data.num_readers > 0);
assert!(!lock_data.exclusive);
assert_eq!(lock_data.read_waiters.len(), 0);
if lock_data.num_readers == 1 {
if let Some(tx) = lock_data.write_waiters.pop_front() {
lock_data.num_readers -= 1;
lock_data.exclusive = true;
tx.send(()).expect("Sender::send");
return;
}
}
lock_data.num_readers -= 1;
}
/// Release an exclusive lock of an `RwLock`.
fn unlock_writer(&self) {
let mut lock_data = self.inner.mutex.lock().expect("sync::Mutex::lock");
assert!(lock_data.num_readers == 0);
assert!(lock_data.exclusive);
if let Some(tx) = lock_data.write_waiters.pop_front() {
tx.send(()).expect("Sender::send");
} else {
lock_data.exclusive = false;
lock_data.num_readers += lock_data.read_waiters.len() as u32;
for tx in lock_data.read_waiters.drain(..) {
tx.send(()).expect("Sender::send");
}
}
}
}
unsafe impl<T: ?Sized + Send> Send for RwLock<T> {}
unsafe impl<T: ?Sized + Send> Sync for RwLock<T> {}