potential 3.0.1

PubSub lib with sync and async API
Documentation
use crate::{Sub, TryPopError};
use log::trace;
use std::{
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

impl<T: Clone> Sub<T> {
    pub fn next(&mut self) -> Next<'_, T> {
        Next { subscription: self }
    }
    pub(crate) fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
        trace!("poll_next");
        // we try twice to be sure because of races with dropped publishers
        for attempt in 0..2 {
            return match self.inner.try_pop() {
                Ok(value) => {
                    trace!("got some");
                    Poll::Ready(Some(value))
                }
                Err(TryPopError::Empty) => {
                    if attempt == 0 && self.add_waker(cx.waker()) {
                        trace!("pending");
                        Poll::Pending
                    } else {
                        eprintln!("pending attempt {attempt}");
                        // retry
                        continue;
                    }
                }
                Err(TryPopError::Finished) => {
                    trace!("finished");
                    None.into()
                }
            };
        }
        None.into()
    }
}

pub struct Next<'a, T> {
    subscription: &'a mut Sub<T>,
}

impl<'a, T: Clone> Future for Next<'a, T> {
    type Output = Option<T>;

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        self.subscription.poll_next(cx)
    }
}

#[cfg(test)]
use log::debug;

#[test]
fn async_pub_sub() {
    use futures_lite::future::poll_once;

    futures_lite::future::block_on(async move {
        let mut publisher = crate::Pub::default();
        let mut subscriber1 = publisher.subscribe();
        assert_eq!(poll_once(subscriber1.next()).await, None);

        publisher.push(1);
        let mut subscriber2 = publisher.subscribe();
        assert_eq!(poll_once(subscriber2.next()).await, None);

        publisher.push(2);
        assert_eq!(subscriber1.next().await, Some(1));
        assert_eq!(subscriber1.next().await, Some(2));
        assert_eq!(poll_once(subscriber1.next()).await, None);

        assert_eq!(subscriber2.next().await, Some(2));
        assert_eq!(poll_once(subscriber2.next()).await, None);

        drop(publisher);

        assert_eq!(subscriber1.next().await, None);
        assert_eq!(subscriber2.next().await, None);
    })
}

#[test]
fn async_threads_pub_sub() {
    use std::time::Duration;

    let mut publisher = crate::Pub::default();
    let mut subscriber1 = publisher.subscribe();
    publisher.push(1);
    let mut subscriber2 = publisher.subscribe();

    let h1 = std::thread::Builder::new()
        .name("t1".to_owned())
        .spawn(|| {
            futures_lite::future::block_on(async move {
                let mut values = vec![];
                while let Some(value) = subscriber1.next().await {
                    debug!("t1 got {value}");
                    values.push(value);
                }
                values
            })
        })
        .expect("t1");

    let h2 = std::thread::Builder::new()
        .name("t2".to_owned())
        .spawn(|| {
            futures_lite::future::block_on(async move {
                let mut values = vec![];
                while let Some(value) = subscriber2.next().await {
                    debug!("t2 got {value}");
                    values.push(value);
                }
                values
            })
        })
        .expect("t2");

    std::thread::sleep(Duration::from_millis(10));

    publisher.push(2);
    publisher.push(3);
    drop(publisher);

    let r1 = h1.join().expect("thread 1");
    let r2 = h2.join().expect("thread 2");

    assert_eq!(r2, vec![2, 3]);
    assert_eq!(r1, vec![1, 2, 3]);
}