use std::cell::RefCell;
use std::future::Future;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll, Waker};
#[derive(Debug)]
struct Inner<T> {
buf: Vec<T>,
wakers: Vec<Waker>,
listener: usize,
}
#[derive(Debug)]
pub struct Receiver<T> {
inner: Rc<RefCell<Inner<T>>>,
index: usize,
}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
self.inner.borrow_mut().listener -= 1;
}
}
impl<T> Receiver<T> {
fn new(inner: Rc<RefCell<Inner<T>>>) -> Receiver<T> {
inner.borrow_mut().listener += 1;
Receiver { inner, index: 0 }
}
pub fn recv(&mut self) -> RecvFuture<'_, T> {
RecvFuture { rx: self }
}
}
pub struct RecvFuture<'a, T> {
rx: &'a mut Receiver<T>,
}
impl<'a, T: Clone> Future for RecvFuture<'a, T> {
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let rx = &mut self.get_mut().rx;
let mut inner = rx.inner.borrow_mut();
if rx.index == inner.buf.len() {
rx.index = 0;
inner.wakers.push(cx.waker().clone());
if inner.listener == inner.wakers.len() {
inner.buf.clear();
}
Poll::Pending
} else {
let val = inner.buf[rx.index].clone();
rx.index += 1;
Poll::Ready(val)
}
}
}
#[derive(Debug, Clone)]
pub struct InactiveReceiver<T> {
inner: Rc<RefCell<Inner<T>>>,
}
impl<T> InactiveReceiver<T> {
pub fn resubscribe(&self) -> Receiver<T> {
Receiver::new(self.inner.clone())
}
}
#[derive(Debug, Clone)]
pub struct Sender<T> {
inner: Rc<RefCell<Inner<T>>>,
}
impl<T> Sender<T> {
fn set(&self, value: T) {
let mut inner = self.inner.borrow_mut();
if inner.listener == 0 {
return;
}
inner.buf.push(value);
for w in inner.wakers.drain(..) {
w.wake();
}
}
pub fn send(&self, value: T) {
self.set(value);
}
}
pub fn channel<T>() -> (Sender<T>, InactiveReceiver<T>) {
let inner = Rc::new(RefCell::new(Inner {
buf: Vec::new(),
wakers: Vec::new(),
listener: 0,
}));
let tx = Sender {
inner: inner.clone(),
};
let rx = InactiveReceiver { inner };
(tx, rx)
}