use std::{
cell::RefCell,
pin::Pin,
rc::Rc,
task::{Context, Poll, Waker},
};
use futures_core::{FusedStream, Stream};
#[derive(Debug)]
struct ReceiverInner<E> {
event: E,
waker: Option<Waker>,
}
#[derive(Debug)]
pub struct Sender<E> {
receivers: RefCell<Vec<Rc<RefCell<ReceiverInner<E>>>>>,
}
impl<E> Default for Sender<E> {
fn default() -> Self {
Self {
receivers: Vec::new().into(),
}
}
}
impl<E> Sender<E> {
pub fn new() -> Self {
Default::default()
}
}
impl<E: Default> Sender<E> {
pub fn new_receiver(&self) -> Receiver<E> {
let inner = Rc::new(RefCell::new(ReceiverInner {
event: E::default(),
waker: None,
}));
self.receivers.borrow_mut().push(inner.clone());
Receiver {
inner,
terminated: false,
}
}
}
#[derive(Debug)]
pub struct Receiver<E> {
inner: Rc<RefCell<ReceiverInner<E>>>,
terminated: bool,
}
impl<E> Sender<E> {
pub fn send<I: Clone>(&self, event: I)
where
E: Extend<I>,
{
for receiver in self.receivers.borrow().iter() {
let mut receiver = receiver.borrow_mut();
receiver.event.extend(Some(event.clone()));
if let Some(waker) = receiver.waker.take() {
waker.wake_by_ref();
}
}
}
}
impl<E: Iterator + Default + 'static> super::EventSource<E::Item> for Sender<E> {
type Source = impl Stream<Item = E::Item> + FusedStream + Unpin;
fn subscribe(&self) -> Self::Source {
self.new_receiver()
}
}
impl<E: Iterator> Stream for Receiver<E> {
type Item = E::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let Self { inner, terminated } = self.get_mut();
let strong_count = Rc::strong_count(inner);
let mut inner = inner.borrow_mut();
if let Some(old_waker) = inner.waker.take() {
if !old_waker.will_wake(cx.waker()) {
old_waker.wake()
}
}
if let Some(item) = inner.event.next() {
return Poll::Ready(Some(item))
}
if strong_count == 1 {
*terminated = true;
return Poll::Ready(None)
}
inner.waker = Some(cx.waker().clone());
Poll::Pending
}
}
impl<E: Iterator> FusedStream for Receiver<E> {
fn is_terminated(&self) -> bool {
self.terminated
}
}