picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
use picopub::PicoPub;

#[cfg(feature = "tokio")]
use tokio::task::spawn;

#[cfg(feature = "tokio")]
use futures_util::StreamExt;

#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_works() {
    let pubsub = PicoPub::<&str, &str>::new();
    let sub = pubsub.subscribe("test", None).await;
    pubsub.publish("test", "works").await;
    let msg = sub.recv().await;
    assert_eq!(msg, "works".into());
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_works_across_threads() {
    let ps = PicoPub::<&str, &str>::new();
    let sub = ps.subscribe("orders", None).await;

    let t = {
        let sub = sub.clone();
        spawn(async move {
            let mut received = Vec::new();
            for _ in 0..2 {
                received.push(sub.recv().await);
            }
            received
        })
    };
    ps.publish("orders", "order-1".into()).await;
    ps.publish("orders", "order-2".into()).await;

    let received = tokio::join!(t).0.unwrap();
    assert_eq!(received.len(), 2);
    assert_eq!(received[0], "order-1".into());
    assert_eq!(received[1], "order-2".into());
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_stream_reader() {
    let ps = PicoPub::<String, u32>::new();
    let sub = ps.subscribe(String::from("counters"), Some(2)).await;
    let nums = vec![1, 1];

    let _ = tokio::join!({
        let nums = nums.clone();
        spawn(async move {
            for n in nums.iter() {
                ps.publish(String::from("counters"), n + 1).await;
            }
        })
    });

    let mut stream = sub.stream();

    let n1 = stream.next().await.expect("early");
    let n2 = stream.next().await.expect("early");

    assert_eq!(n1, (nums[0] + 1).into());
    assert_eq!(n2, (nums[1] + 1).into());
}

#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_fails() {
    let pubsub = PicoPub::<&str, &str>::new();
    let sub = pubsub.subscribe("test", None).await;
    pubsub.publish("test", "works").await;
    let msg = sub.recv().await;
    assert_ne!(msg, "foo".into());
}