use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use {Future, Poll, Async};
use slot::{Slot, Token};
use lock::Lock;
use task::{self, Task};
#[must_use = "futures do nothing unless polled"]
pub struct Oneshot<T> {
inner: Arc<Inner<T>>,
cancel_token: Option<Token>,
}
pub struct Complete<T> {
inner: Arc<Inner<T>>,
completed: bool,
}
struct Inner<T> {
slot: Slot<Option<T>>,
oneshot_gone: AtomicBool,
notify_cancel: Lock<Option<Task>>,
}
pub fn oneshot<T>() -> (Complete<T>, Oneshot<T>) {
let inner = Arc::new(Inner {
slot: Slot::new(None),
oneshot_gone: AtomicBool::new(false),
notify_cancel: Lock::new(None),
});
let oneshot = Oneshot {
inner: inner.clone(),
cancel_token: None,
};
let complete = Complete {
inner: inner,
completed: false,
};
(complete, oneshot)
}
impl<T> Complete<T> {
pub fn complete(mut self, t: T) {
self.completed = true;
self.send(Some(t))
}
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
return Ok(Async::Ready(()))
}
let handle = task::park();
match self.inner.notify_cancel.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Ok(Async::Ready(())),
}
if self.inner.oneshot_gone.load(Ordering::SeqCst) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn send(&mut self, t: Option<T>) {
if let Err(e) = self.inner.slot.try_produce(t) {
self.inner.slot.on_empty(Some(e.into_inner()), |slot, item| {
slot.try_produce(item.unwrap()).ok()
.expect("advertised as empty but wasn't");
});
}
}
}
impl<T> Drop for Complete<T> {
fn drop(&mut self) {
if !self.completed {
self.send(None);
}
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl<T> Future for Oneshot<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self) -> Poll<T, Canceled> {
if let Some(cancel_token) = self.cancel_token.take() {
self.inner.slot.cancel(cancel_token);
}
match self.inner.slot.try_consume() {
Ok(Some(e)) => Ok(Async::Ready(e)),
Ok(None) => Err(Canceled),
Err(_) => {
let task = task::park();
self.cancel_token = Some(self.inner.slot.on_full(move |_| {
task.unpark();
}));
Ok(Async::NotReady)
}
}
}
}
impl<T> Drop for Oneshot<T> {
fn drop(&mut self) {
if let Some(cancel_token) = self.cancel_token.take() {
self.inner.slot.cancel(cancel_token)
}
self.inner.oneshot_gone.store(true, Ordering::SeqCst);
if let Some(mut handle) = self.inner.notify_cancel.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.unpark()
}
}
}
}