#![deny(
unused_extern_crates,
missing_debug_implementations,
missing_docs,
unreachable_pub
)]
#![cfg_attr(test, deny(warnings))]
use futures::{task, try_ready, Async, AsyncSink, Future, Poll, Sink, StartSend, Stream};
use std::cell::UnsafeCell;
use std::sync::Arc;
use std::{fmt, mem};
use tokio_sync::semaphore;
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub struct Closed;
#[derive(Debug)]
pub struct Aption<T> {
inner: Arc<Inner<T>>,
permit: semaphore::Permit,
}
impl<T> Clone for Aption<T> {
fn clone(&self) -> Self {
Aption {
inner: self.inner.clone(),
permit: semaphore::Permit::new(),
}
}
}
#[allow(missing_docs)]
pub fn new<T>() -> Aption<T> {
let m = Arc::new(Inner {
semaphore: semaphore::Semaphore::new(1),
value: UnsafeCell::new(CellValue::None),
put_task: task::AtomicTask::new(),
take_task: task::AtomicTask::new(),
});
Aption {
inner: m.clone(),
permit: semaphore::Permit::new(),
}
}
enum CellValue<T> {
Some(T),
None,
Fin(Option<T>),
}
impl<T> CellValue<T> {
fn is_none(&self) -> bool {
if let CellValue::None = *self {
true
} else {
false
}
}
fn take(&mut self) -> Option<T> {
match mem::replace(self, CellValue::None) {
CellValue::None => None,
CellValue::Some(t) => Some(t),
CellValue::Fin(f) => {
mem::replace(self, CellValue::Fin(None));
f
}
}
}
}
struct Inner<T> {
semaphore: semaphore::Semaphore,
value: UnsafeCell<CellValue<T>>,
put_task: task::AtomicTask,
take_task: task::AtomicTask,
}
unsafe impl<T: Send> Sync for Inner<T> {}
unsafe impl<T: Send> Send for Inner<T> {}
impl<T> fmt::Debug for Inner<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "AptionInner")
}
}
struct TakeFuture<T>(Option<Aption<T>>);
impl<T> Future for TakeFuture<T> {
type Item = (Aption<T>, T);
type Error = Closed;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let t = try_ready!(self
.0
.as_mut()
.expect("called poll after future resolved")
.poll_take());
Ok(Async::Ready((self.0.take().unwrap(), t)))
}
}
struct PutFuture<T>(Option<Aption<T>>, Option<T>);
impl<T> Future for PutFuture<T> {
type Item = Aption<T>;
type Error = T;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let t = self.1.take().expect("called poll after future resolved");
match self
.0
.as_mut()
.expect("called poll after future resolved")
.poll_put(t)
{
Ok(AsyncSink::Ready) => Ok(Async::Ready(self.0.take().unwrap())),
Ok(AsyncSink::NotReady(t)) => {
self.1 = Some(t);
Ok(Async::NotReady)
}
Err(t) => Err(t),
}
}
}
impl<T> Aption<T> {
pub fn take(self) -> impl Future<Item = (Self, T), Error = Closed> {
TakeFuture(Some(self))
}
pub fn put(self, t: T) -> impl Future<Item = Self, Error = T> {
PutFuture(Some(self), Some(t))
}
}
impl<T> Aption<T> {
pub fn poll_take(&mut self) -> Poll<T, Closed> {
try_ready!(self
.permit
.poll_acquire(&self.inner.semaphore)
.map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
let value = unsafe { &mut *self.inner.value.get() };
let v = value.take();
if v.is_none() {
if let CellValue::Fin(None) = *value {
self.permit.release(&self.inner.semaphore);
return Err(Closed);
}
self.inner.take_task.register();
}
self.permit.release(&self.inner.semaphore);
if let Some(t) = v {
self.inner.put_task.notify();
Ok(Async::Ready(t))
} else {
Ok(Async::NotReady)
}
}
pub fn poll_put(&mut self, t: T) -> Result<AsyncSink<T>, T> {
match self.permit.poll_acquire(&self.inner.semaphore) {
Ok(Async::Ready(())) => {}
Ok(Async::NotReady) => {
return Ok(AsyncSink::NotReady(t));
}
Err(_) => {
unreachable!("semaphore dropped while we have an Arc to it");
}
}
let value = unsafe { &mut *self.inner.value.get() };
if let CellValue::Fin(_) = *value {
self.permit.release(&self.inner.semaphore);
return Err(t);
}
if value.is_none() {
*value = CellValue::Some(t);
self.permit.release(&self.inner.semaphore);
self.inner.take_task.notify();
Ok(AsyncSink::Ready)
} else {
self.inner.put_task.register();
self.permit.release(&self.inner.semaphore);
Ok(AsyncSink::NotReady(t))
}
}
pub fn poll_close(&mut self) -> Poll<(), ()> {
try_ready!(self
.permit
.poll_acquire(&self.inner.semaphore)
.map_err(|_| unreachable!("semaphore dropped while we have an Arc to it")));
let value = unsafe { &mut *self.inner.value.get() };
let v = value.take();
*value = CellValue::Fin(v);
let ret = if let CellValue::Fin(None) = *value {
Async::Ready(())
} else {
self.inner.put_task.register();
Async::NotReady
};
self.permit.release(&self.inner.semaphore);
self.inner.take_task.notify();
Ok(ret)
}
}
impl<T> Sink for Aption<T> {
type SinkItem = T;
type SinkError = T;
fn start_send(&mut self, item: Self::SinkItem) -> StartSend<Self::SinkItem, Self::SinkError> {
self.poll_put(item)
}
fn poll_complete(&mut self) -> Poll<(), Self::SinkError> {
self.poll_close()
.map_err(|_| unreachable!("failed to close because already closed elsewhere"))
}
}
impl<T> Stream for Aption<T> {
type Item = T;
type Error = ();
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
match self.poll_take() {
Ok(Async::Ready(v)) => Ok(Async::Ready(Some(v))),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(Closed) => {
Ok(Async::Ready(None))
}
}
}
}
#[cfg(test)]
mod test {
use super::*;
use tokio_mock_task::MockTask;
#[test]
fn basic() {
let mut mt = MockTask::new();
let mut a = new::<usize>();
assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_put(42)), Ok(AsyncSink::Ready));
assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(42)));
assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_put(43)), Ok(AsyncSink::Ready));
assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::NotReady(44)));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(43)));
assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_put(44)), Ok(AsyncSink::Ready));
assert!(mt.is_notified());
assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::NotReady));
assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::Ready(44)));
assert!(mt.is_notified()); assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
}
#[test]
fn sink_stream() {
use tokio::prelude::*;
let a = new::<usize>();
let (mut tx, rx) = tokio_sync::mpsc::unbounded_channel();
tokio::run(future::lazy(move || {
tokio::spawn(
rx.forward(a.clone().sink_map_err(|_| unreachable!()))
.map(|_| ())
.map_err(|_| unreachable!()),
);
tx.try_send(1).unwrap();
tx.try_send(2).unwrap();
tx.try_send(3).unwrap();
tx.try_send(4).unwrap();
tx.try_send(5).unwrap();
drop(tx);
a.collect()
.inspect(|v| {
assert_eq!(v, &[1, 2, 3, 4, 5]);
})
.map(|_| ())
}));
}
#[test]
fn futures() {
use tokio::prelude::*;
let a = new::<usize>();
tokio::run(future::lazy(move || {
a.put(42)
.map_err(|_| unreachable!())
.and_then(|a| a.take())
.map_err(|_| unreachable!())
.and_then(|(a, v)| {
assert_eq!(v, 42);
a.put(43)
})
.map_err(|_| unreachable!())
.and_then(|a| a.take())
.map_err(|_| unreachable!())
.inspect(|(_, v)| {
assert_eq!(*v, 43);
})
.map(|_| ())
}));
}
#[test]
fn notified_on_empty_drop() {
let mut mt = MockTask::new();
let mut a = new::<usize>();
assert_eq!(mt.enter(|| a.poll_take()), Ok(Async::NotReady));
assert!(!mt.is_notified());
assert_eq!(mt.enter(|| a.poll_close()), Ok(Async::Ready(())));
assert!(mt.is_notified());
assert_eq!(mt.enter(|| a.poll_take()), Err(Closed));
}
}