use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::{Rc, Weak};
use std::task::{Context, Poll};
pub use futures::channel::oneshot::Canceled;
use crate::task::LocalWaker;
pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
value: None,
tx_task: LocalWaker::new(),
rx_task: LocalWaker::new(),
}));
let tx = Sender {
inner: Rc::downgrade(&inner),
};
let rx = Receiver {
state: State::Open(inner),
};
(tx, rx)
}
#[derive(Debug)]
pub struct Sender<T> {
inner: Weak<RefCell<Inner<T>>>,
}
#[derive(Debug)]
#[must_use = "futures do nothing unless polled"]
pub struct Receiver<T> {
state: State<T>,
}
impl<T> Unpin for Receiver<T> {}
impl<T> Unpin for Sender<T> {}
#[derive(Debug)]
enum State<T> {
Open(Rc<RefCell<Inner<T>>>),
Closed(Option<T>),
}
#[derive(Debug)]
struct Inner<T> {
value: Option<T>,
tx_task: LocalWaker,
rx_task: LocalWaker,
}
impl<T> Sender<T> {
pub fn send(self, val: T) -> Result<(), T> {
if let Some(inner) = self.inner.upgrade() {
inner.borrow_mut().value = Some(val);
Ok(())
} else {
Err(val)
}
}
pub fn poll_canceled(&mut self, cx: &mut Context) -> Poll<()> {
match self.inner.upgrade() {
Some(inner) => {
inner.borrow_mut().tx_task.register(cx.waker());
Poll::Pending
}
None => Poll::Ready(()),
}
}
pub fn is_canceled(&self) -> bool {
self.inner.upgrade().is_none()
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
let inner = match self.inner.upgrade() {
Some(inner) => inner,
None => return,
};
inner.borrow().rx_task.wake();
}
}
impl<T> Receiver<T> {
pub fn close(&mut self) {
match self.state {
State::Open(ref inner) => {
let mut inner = inner.borrow_mut();
inner.tx_task.wake();
let value = inner.value.take();
drop(inner);
self.state = State::Closed(value);
}
State::Closed(_) => {}
};
}
}
impl<T> Future for Receiver<T> {
type Output = Result<T, Canceled>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let inner = match this.state {
State::Open(ref mut inner) => inner,
State::Closed(ref mut item) => match item.take() {
Some(item) => return Poll::Ready(Ok(item)),
None => return Poll::Ready(Err(Canceled)),
},
};
if let Some(val) = inner.borrow_mut().value.take() {
return Poll::Ready(Ok(val));
}
if Rc::get_mut(inner).is_some() {
Poll::Ready(Err(Canceled))
} else {
inner.borrow().rx_task.register(cx.waker());
Poll::Pending
}
}
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.close();
}
}