use picopub::PicoPub;
#[cfg(feature = "blocking")]
use std::thread;
#[cfg(feature = "blocking")]
#[test]
fn it_works() {
let pubsub = PicoPub::<&str, &str>::new();
let sub = pubsub.subscribe("test", None);
pubsub.publish("test", "works");
let msg = sub.recv().unwrap();
assert_eq!(msg, "works".into());
}
#[cfg(feature = "blocking")]
#[test]
fn it_works_across_threads() {
let ps = PicoPub::<&str, &str>::default();
let sub = ps.subscribe("orders", None);
let t = {
let sub = sub.clone();
thread::spawn(move || {
let mut received = Vec::new();
for _ in 0..2 {
received.push(sub.recv().unwrap());
}
received
})
};
ps.publish("orders", "order-1".into());
ps.publish("orders", "order-2".into());
let received = t.join().unwrap();
assert_eq!(received.len(), 2);
assert_eq!(received[0], "order-1".into());
assert_eq!(received[1], "order-2".into());
}
#[cfg(feature = "blocking")]
#[test]
fn it_fails() {
let pubsub = PicoPub::<&str, &str>::new();
let sub = pubsub.subscribe("test", None);
pubsub.publish("test", "works");
let msg = sub.recv().unwrap();
assert_ne!(msg, "foo".into());
}
#[cfg(feature = "blocking")]
#[test]
fn blocking_backpressure_blocks_publisher() {
use std::sync::{
Arc,
atomic::{AtomicBool, Ordering},
};
use std::time::Duration;
let pb = Arc::new(PicoPub::<&str, usize>::new());
let sub = pb.subscribe("test", Some(1));
let started_blocking = Arc::new(AtomicBool::new(false));
let finished = Arc::new(AtomicBool::new(false));
pb.publish("test", 1);
let pb_clone = pb.clone();
let started = started_blocking.clone();
let done = finished.clone();
let publisher = thread::spawn(move || {
started.store(true, Ordering::SeqCst);
pb_clone.publish("test", 2); done.store(true, Ordering::SeqCst);
});
while !started_blocking.load(Ordering::SeqCst) {}
thread::sleep(Duration::from_millis(50));
assert!(!finished.load(Ordering::SeqCst));
let v = sub.recv().unwrap();
assert_eq!(*v, 1);
publisher.join().unwrap();
assert!(finished.load(Ordering::SeqCst));
}