use std::cell::RefCell;
use std::rc::{Rc, Weak};
use {Future, Poll, Async};
use task::{self, Task};
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
tx_task: None,
rx_task: None,
}));
let tx = Sender {
inner: Rc::downgrade(&inner),
};
let rx = Receiver {
state: State::Open(inner),
};
(tx, rx)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Weak<RefCell<Inner<T>>>,
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
state: State<T>,
}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Inner<T>>>),
Closed(Option<T>),
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct Canceled;
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
tx_task: Option<Task>,
rx_task: Option<Task>,
}
impl<T> Sender<T> {
pub fn send(self, val: T) -> Result<(), T> {
if let Some(inner) = self.inner.upgrade() {
inner.borrow_mut().value = Some(val);
Ok(())
} else {
Err(val)
}
}
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
match self.inner.upgrade() {
Some(inner) => {
inner.borrow_mut().tx_task = Some(task::park());
Ok(Async::NotReady)
}
None => Ok(().into()),
}
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
let rx_task = {
let mut borrow = inner.borrow_mut();
borrow.tx_task.take();
borrow.rx_task.take()
};
if let Some(task) = rx_task {
task.unpark();
}
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
let (item, task) = match self.state {
State::Open(ref inner) => {
let mut inner = inner.borrow_mut();
drop(inner.rx_task.take());
(inner.value.take(), inner.tx_task.take())
}
State::Closed(_) => return,
};
self.state = State::Closed(item);
if let Some(task) = task {
task.unpark();
}
}
}
impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let inner = match self.state {
State::Open(ref mut inner) => inner,
State::Closed(ref mut item) => {
match item.take() {
Some(item) => return Ok(item.into()),
None => return Err(Canceled),
}
}
};
if let Some(val) = inner.borrow_mut().value.take() {
return Ok(Async::Ready(val))
}
if Rc::get_mut(inner).is_some() {
Err(Canceled)
} else {
inner.borrow_mut().rx_task = Some(task::park());
Ok(Async::NotReady)
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}