use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::atomic::Ordering::SeqCst;
use std::error::Error;
use std::fmt;
use {Future, Poll, Async};
use future::{lazy, Lazy, Executor, IntoFuture};
use lock::Lock;
use task::{self, Task};
#[must_use = "futures do nothing unless polled"]
#[derive(Debug)]
pub struct Receiver<T> {
inner: Arc<Inner<T>>,
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Arc<Inner<T>>,
}
#[derive(Debug)]
struct Inner<T> {
complete: AtomicBool,
data: Lock<Option<T>>,
rx_task: Lock<Option<Task>>,
tx_task: Lock<Option<Task>>,
}
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Arc::new(Inner::new());
let receiver = Receiver {
inner: inner.clone(),
};
let sender = Sender {
inner: inner,
};
(sender, receiver)
}
impl<T> Inner<T> {
fn new() -> Inner<T> {
Inner {
complete: AtomicBool::new(false),
data: Lock::new(None),
rx_task: Lock::new(None),
tx_task: Lock::new(None),
}
}
fn send(&self, t: T) -> Result<(), T> {
if self.complete.load(SeqCst) {
return Err(t)
}
if let Some(mut slot) = self.data.try_lock() {
assert!(slot.is_none());
*slot = Some(t);
drop(slot);
if self.complete.load(SeqCst) {
if let Some(mut slot) = self.data.try_lock() {
if let Some(t) = slot.take() {
return Err(t);
}
}
}
Ok(())
} else {
Err(t)
}
}
fn poll_cancel(&self) -> Poll<(), ()> {
if self.complete.load(SeqCst) {
return Ok(Async::Ready(()))
}
let handle = task::current();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Ok(Async::Ready(())),
}
if self.complete.load(SeqCst) {
Ok(Async::Ready(()))
} else {
Ok(Async::NotReady)
}
}
fn is_canceled(&self) -> bool {
self.complete.load(SeqCst)
}
fn drop_tx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut slot) = self.rx_task.try_lock() {
if let Some(task) = slot.take() {
drop(slot);
task.notify();
}
}
}
fn close_rx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
}
}
}
fn recv(&self) -> Poll<T, Canceled> {
let mut done = false;
if self.complete.load(SeqCst) {
done = true;
} else {
let task = task::current();
match self.rx_task.try_lock() {
Some(mut slot) => *slot = Some(task),
None => done = true,
}
}
if done || self.complete.load(SeqCst) {
if let Some(mut slot) = self.data.try_lock() {
if let Some(data) = slot.take() {
return Ok(data.into());
}
}
Err(Canceled)
} else {
Ok(Async::NotReady)
}
}
fn drop_rx(&self) {
self.complete.store(true, SeqCst);
if let Some(mut slot) = self.rx_task.try_lock() {
let task = slot.take();
drop(slot);
drop(task);
}
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
}
}
}
}
impl<T> Sender<T> {
#[deprecated(note = "renamed to `send`", since = "0.1.11")]
#[doc(hidden)]
#[cfg(feature = "with-deprecated")]
pub fn complete(self, t: T) {
drop(self.send(t));
}
pub fn send(self, t: T) -> Result<(), T> {
self.inner.send(t)
}
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
self.inner.poll_cancel()
}
pub fn is_canceled(&self) -> bool {
self.inner.is_canceled()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
self.inner.drop_tx()
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
impl fmt::Display for Canceled {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
write!(fmt, "oneshot canceled")
}
}
impl Error for Canceled {
fn description(&self) -> &str {
"oneshot canceled"
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
self.inner.close_rx()
}
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self) -> Poll<T, Canceled> {
self.inner.recv()
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.drop_rx()
}
}
pub struct SpawnHandle<T, E> {
rx: Arc<ExecuteInner<Result<T, E>>>,
}
struct ExecuteInner<T> {
inner: Inner<T>,
keep_running: AtomicBool,
}
pub struct Execute<F: Future> {
future: F,
tx: Arc<ExecuteInner<Result<F::Item, F::Error>>>,
}
pub fn spawn<F, E>(future: F, executor: &E) -> SpawnHandle<F::Item, F::Error>
where F: Future,
E: Executor<Execute<F>>,
{
let data = Arc::new(ExecuteInner {
inner: Inner::new(),
keep_running: AtomicBool::new(false),
});
executor.execute(Execute {
future: future,
tx: data.clone(),
}).expect("failed to spawn future");
SpawnHandle { rx: data }
}
pub fn spawn_fn<F, R, E>(f: F, executor: &E) -> SpawnHandle<R::Item, R::Error>
where F: FnOnce() -> R,
R: IntoFuture,
E: Executor<Execute<Lazy<F, R>>>,
{
spawn(lazy(f), executor)
}
impl<T, E> SpawnHandle<T, E> {
pub fn forget(self) {
self.rx.keep_running.store(true, SeqCst);
}
}
impl<T, E> Future for SpawnHandle<T, E> {
type Item = T;
type Error = E;
fn poll(&mut self) -> Poll<T, E> {
match self.rx.inner.recv() {
Ok(Async::Ready(Ok(t))) => Ok(t.into()),
Ok(Async::Ready(Err(e))) => Err(e),
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(_) => panic!("future was canceled before completion"),
}
}
}
impl<T: fmt::Debug, E: fmt::Debug> fmt::Debug for SpawnHandle<T, E> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("SpawnHandle")
.finish()
}
}
impl<T, E> Drop for SpawnHandle<T, E> {
fn drop(&mut self) {
self.rx.inner.drop_rx();
}
}
impl<F: Future> Future for Execute<F> {
type Item = ();
type Error = ();
fn poll(&mut self) -> Poll<(), ()> {
if self.tx.inner.poll_cancel().unwrap().is_ready() {
if !self.tx.keep_running.load(SeqCst) {
return Ok(().into())
}
}
let result = match self.future.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(t)) => Ok(t),
Err(e) => Err(e),
};
drop(self.tx.inner.send(result));
Ok(().into())
}
}
impl<F: Future + fmt::Debug> fmt::Debug for Execute<F> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Execute")
.field("future", &self.future)
.finish()
}
}
impl<F: Future> Drop for Execute<F> {
fn drop(&mut self) {
self.tx.inner.drop_tx();
}
}