#![deny(warnings)]
#![deny(missing_docs)]
#![deny(missing_debug_implementations)]
extern crate futures;
use std::cell::RefCell;
use std::fmt;
use std::rc::Rc;
use futures::{Future, Poll, Async};
use futures::task::{self, Task};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
complete: false,
tx_task: None,
rx_task: None,
}));
let tx = Sender {
inner: inner.clone(),
};
let rx = Receiver {
inner: inner,
};
(tx, rx)
}
pub struct Sender<T> {
inner: Rc<RefCell<Inner<T>>>,
}
impl<T> Sender<T> {
pub fn complete(self, val: T) {
let mut borrow = self.inner.borrow_mut();
borrow.value = Some(val);
}
pub fn is_canceled(&self) -> bool {
self.inner.borrow().complete
}
pub fn waiting(self) -> Waiting<T> {
Waiting {
tx: Some(self),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let rx_task = {
let mut borrow = self.inner.borrow_mut();
borrow.complete = true;
borrow.tx_task.take();
borrow.rx_task.take()
};
if let Some(task) = rx_task {
task.notify();
}
}
}
impl<T> fmt::Debug for Sender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Sender")
}
}
pub struct Receiver<T> {
inner: Rc<RefCell<Inner<T>>>,
}
impl<T> Receiver<T> {
pub fn is_canceled(&self) -> bool {
let borrow = self.inner.borrow();
borrow.complete && borrow.value.is_none()
}
pub fn try_recv(&mut self) -> Result<Option<T>, Canceled> {
self.recv_inner(false)
}
fn recv_inner(&mut self, should_park: bool) -> Result<Option<T>, Canceled> {
let mut borrow = self.inner.borrow_mut();
if let Some(val) = borrow.value.take() {
Ok(Some(val))
} else if borrow.complete {
Err(Canceled)
} else {
if should_park {
borrow.rx_task = Some(task::current());
}
if let Some(task) = borrow.tx_task.take() {
task.notify();
}
Ok(None)
}
}
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.recv_inner(true).map(|opt| match opt {
Some(t) => Async::Ready(t),
None => Async::NotReady,
})
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
let tx_task = {
let mut borrow = self.inner.borrow_mut();
borrow.complete = true;
borrow.rx_task.take();
borrow.tx_task.take()
};
if let Some(task) = tx_task {
task.notify();
}
}
}
impl<T> fmt::Debug for Receiver<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Receiver")
}
}
pub struct Waiting<T> {
tx: Option<Sender<T>>,
}
impl<T> Future for Waiting<T> {
type Item = Sender<T>;
type Error = Canceled;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
if self.tx.as_ref().unwrap().is_canceled() {
Err(Canceled)
} else if self.tx.as_ref().unwrap().inner.borrow().rx_task.is_some() {
Ok(Async::Ready(self.tx.take().unwrap()))
} else {
self.tx.as_ref().unwrap().inner.borrow_mut().tx_task = Some(task::current());
Ok(Async::NotReady)
}
}
}
impl<T> fmt::Debug for Waiting<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Waiting")
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub struct Canceled;
struct Inner<T> {
value: Option<T>,
complete: bool,
tx_task: Option<Task>,
rx_task: Option<Task>,
}
#[cfg(test)]
mod tests {
use futures::Future;
use super::channel;
#[test]
fn test_smoke() {
let (tx, rx) = channel();
tx.complete(33);
assert_eq!(rx.wait().unwrap(), 33);
}
#[test]
fn test_canceled() {
let (_, rx) = channel::<()>();
assert_eq!(rx.wait().unwrap_err(), super::Canceled);
}
#[test]
fn test_is_canceled() {
let (tx, _) = channel::<()>();
assert!(tx.is_canceled());
let (_, rx) = channel::<()>();
assert!(rx.is_canceled());
let (tx, rx) = channel::<()>();
assert!(!tx.is_canceled());
assert!(!rx.is_canceled());
tx.complete(());
assert!(!rx.is_canceled());
}
#[test]
fn test_tx_complete_rx_unparked() {
let (tx, rx) = channel();
let res = rx.join(::futures::lazy(move || {
tx.complete(55);
Ok(11)
}));
assert_eq!(res.wait().unwrap(), (55, 11));
}
#[test]
fn test_tx_dropped_rx_unparked() {
let (tx, rx) = channel::<i32>();
let res = rx.join(::futures::lazy(move || {
let _tx = tx;
Ok(11)
}));
assert_eq!(res.wait().unwrap_err(), super::Canceled);
}
#[test]
fn test_waiting_unparked() {
let (tx, rx) = channel::<i32>();
let res = tx.waiting().join(::futures::lazy(move || {
let mut rx = rx;
let _ = rx.poll(); Ok(rx)
})).and_then(|(tx, rx)| {
tx.complete(5);
rx
});
assert_eq!(res.wait().unwrap(), 5);
}
#[test]
fn test_waiting_canceled() {
let (tx, rx) = channel::<i32>();
let res = tx.waiting().join(::futures::lazy(move || {
let _rx = rx;
Ok(())
}));
assert_eq!(res.wait().unwrap_err(), super::Canceled);
}
#[test]
fn try_recv() {
let (tx, mut rx) = channel::<i32>();
assert!(rx.try_recv().unwrap().is_none());
::futures::lazy(move || {
tx.complete(5);
Ok::<(), ()>(())
}).wait().unwrap();
assert_eq!(rx.try_recv().unwrap(), Some(5));
}
#[test]
fn try_recv_canceled() {
let (tx, mut rx) = channel::<i32>();
assert!(rx.try_recv().unwrap().is_none());
::futures::lazy(move || {
let _tx = tx;
Ok::<(), ()>(())
}).wait().unwrap();
assert!(rx.try_recv().is_err());
}
}