use picopub::PicoPub;
#[cfg(feature = "tokio")]
use tokio::task::spawn;
#[cfg(feature = "tokio")]
use futures_util::StreamExt;
#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_works() {
let pubsub = PicoPub::<&str, &str>::new();
let sub = pubsub.subscribe("test", None).await;
pubsub.publish("test", "works").await;
let msg = sub.recv().await;
assert_eq!(msg, "works".into());
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_works_across_threads() {
let ps = PicoPub::<&str, &str>::new();
let sub = ps.subscribe("orders", None).await;
let t = {
let sub = sub.clone();
spawn(async move {
let mut received = Vec::new();
for _ in 0..2 {
received.push(sub.recv().await);
}
received
})
};
ps.publish("orders", "order-1".into()).await;
ps.publish("orders", "order-2".into()).await;
let received = tokio::join!(t).0.unwrap();
assert_eq!(received.len(), 2);
assert_eq!(received[0], "order-1".into());
assert_eq!(received[1], "order-2".into());
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_stream_reader() {
let ps = PicoPub::<String, u32>::new();
let sub = ps.subscribe(String::from("counters"), Some(2)).await;
let nums = vec![1, 1];
let _ = tokio::join!({
let nums = nums.clone();
spawn(async move {
for n in nums.iter() {
ps.publish(String::from("counters"), n + 1).await;
}
})
});
let mut stream = sub.stream();
let n1 = stream.next().await.expect("early");
let n2 = stream.next().await.expect("early");
assert_eq!(n1, (nums[0] + 1).into());
assert_eq!(n2, (nums[1] + 1).into());
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn async_it_fails() {
let pubsub = PicoPub::<&str, &str>::new();
let sub = pubsub.subscribe("test", None).await;
pubsub.publish("test", "works").await;
let msg = sub.recv().await;
assert_ne!(msg, "foo".into());
}