use crate::{Sub, TryPopError};
use log::trace;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
impl<T: Clone> Sub<T> {
pub fn next(&mut self) -> Next<'_, T> {
Next { subscription: self }
}
pub(crate) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
trace!("poll_next");
for attempt in 0..2 {
return match self.inner.try_pop() {
Ok(value) => {
trace!("got some");
Poll::Ready(Some(value))
}
Err(TryPopError::Empty) => {
if attempt == 0 && self.add_waker(cx.waker()) {
trace!("pending");
Poll::Pending
} else {
eprintln!("pending attempt {attempt}");
continue;
}
}
Err(TryPopError::Finished) => {
trace!("finished");
None.into()
}
};
}
None.into()
}
}
pub struct Next<'a, T> {
subscription: &'a mut Sub<T>,
}
impl<'a, T: Clone> Future for Next<'a, T> {
type Output = Option<T>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.subscription.poll_next(cx)
}
}
#[cfg(test)]
use log::debug;
#[test]
fn async_pub_sub() {
use futures_lite::future::poll_once;
futures_lite::future::block_on(async move {
let mut publisher = crate::Pub::default();
let mut subscriber1 = publisher.subscribe();
assert_eq!(poll_once(subscriber1.next()).await, None);
publisher.push(1);
let mut subscriber2 = publisher.subscribe();
assert_eq!(poll_once(subscriber2.next()).await, None);
publisher.push(2);
assert_eq!(subscriber1.next().await, Some(1));
assert_eq!(subscriber1.next().await, Some(2));
assert_eq!(poll_once(subscriber1.next()).await, None);
assert_eq!(subscriber2.next().await, Some(2));
assert_eq!(poll_once(subscriber2.next()).await, None);
drop(publisher);
assert_eq!(subscriber1.next().await, None);
assert_eq!(subscriber2.next().await, None);
})
}
#[test]
fn async_threads_pub_sub() {
use std::time::Duration;
let mut publisher = crate::Pub::default();
let mut subscriber1 = publisher.subscribe();
publisher.push(1);
let mut subscriber2 = publisher.subscribe();
let h1 = std::thread::Builder::new()
.name("t1".to_owned())
.spawn(|| {
futures_lite::future::block_on(async move {
let mut values = vec![];
while let Some(value) = subscriber1.next().await {
debug!("t1 got {value}");
values.push(value);
}
values
})
})
.expect("t1");
let h2 = std::thread::Builder::new()
.name("t2".to_owned())
.spawn(|| {
futures_lite::future::block_on(async move {
let mut values = vec![];
while let Some(value) = subscriber2.next().await {
debug!("t2 got {value}");
values.push(value);
}
values
})
})
.expect("t2");
std::thread::sleep(Duration::from_millis(10));
publisher.push(2);
publisher.push(3);
drop(publisher);
let r1 = h1.join().expect("thread 1");
let r2 = h2.join().expect("thread 2");
assert_eq!(r2, vec![2, 3]);
assert_eq!(r1, vec![1, 2, 3]);
}