picopub 0.1.1

A sync/async pub-sub library with bounded queues and backpressure using Mutex and Condvar
Documentation
use picopub::PicoPub;

#[cfg(feature = "blocking")]
use std::thread;

#[cfg(feature = "blocking")]
#[test]
fn it_works() {
    let pubsub = PicoPub::<&str, &str>::new();
    let sub = pubsub.subscribe("test", None);
    pubsub.publish("test", "works");
    let msg = sub.recv().unwrap();
    assert_eq!(msg, "works".into());
}

#[cfg(feature = "blocking")]
#[test]
fn it_works_across_threads() {
    let ps = PicoPub::<&str, &str>::default();
    let sub = ps.subscribe("orders", None);

    let t = {
        let sub = sub.clone();
        thread::spawn(move || {
            let mut received = Vec::new();
            for _ in 0..2 {
                received.push(sub.recv().unwrap());
            }
            received
        })
    };
    ps.publish("orders", "order-1".into());
    ps.publish("orders", "order-2".into());

    let received = t.join().unwrap();
    assert_eq!(received.len(), 2);
    assert_eq!(received[0], "order-1".into());
    assert_eq!(received[1], "order-2".into());
}

#[cfg(feature = "blocking")]
#[test]
fn it_fails() {
    let pubsub = PicoPub::<&str, &str>::new();
    let sub = pubsub.subscribe("test", None);
    pubsub.publish("test", "works");
    let msg = sub.recv().unwrap();
    assert_ne!(msg, "foo".into());
}

#[cfg(feature = "blocking")]
#[test]
fn blocking_backpressure_blocks_publisher() {
    use std::sync::{
        Arc,
        atomic::{AtomicBool, Ordering},
    };
    use std::time::Duration;

    //let sub = Arc::new(blocking::Subscriber::new(1));
    let pb = Arc::new(PicoPub::<&str, usize>::new());
    let sub = pb.subscribe("test", Some(1));
    let started_blocking = Arc::new(AtomicBool::new(false));
    let finished = Arc::new(AtomicBool::new(false));

    // First publish fills the queue
    pb.publish("test", 1);

    // Second publish should block
    let pb_clone = pb.clone();
    let started = started_blocking.clone();
    let done = finished.clone();

    let publisher = thread::spawn(move || {
        started.store(true, Ordering::SeqCst);
        pb_clone.publish("test", 2); // must block here
        done.store(true, Ordering::SeqCst);
    });

    // Wait until publisher is definitely trying to push
    while !started_blocking.load(Ordering::SeqCst) {}

    // Give scheduler a chance
    thread::sleep(Duration::from_millis(50));

    // Must still be blocked
    assert!(!finished.load(Ordering::SeqCst));

    // Now consume, freeing space
    let v = sub.recv().unwrap();
    assert_eq!(*v, 1);

    // Publisher must now finish
    publisher.join().unwrap();
    assert!(finished.load(Ordering::SeqCst));
}