mod common;
use common::*;
use fibre::error::{RecvError, SendError, TryRecvError};
use fibre::spmc::topic as spmc_topic;
use std::collections::HashSet;
use std::sync::{Arc, Barrier};
use std::thread;
#[test]
fn sync_topic_single_subscriber_receives() {
let (tx, rx) = spmc_topic::channel::<&str, String>(16);
rx.subscribe("topic1");
tx.send("topic1", "hello".to_string()).unwrap();
tx.send("topic2", "world".to_string()).unwrap();
assert_eq!(rx.recv().unwrap(), ("topic1", "hello".to_string()));
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn sync_topic_multiple_subscribers_same_topic() {
let (tx, rx1) = spmc_topic::channel(16);
let rx2 = rx1.clone();
let rx3 = rx1.clone();
rx1.subscribe("news");
rx2.subscribe("news");
rx3.subscribe("news");
let barrier = Arc::new(Barrier::new(4));
let mut handles = vec![];
let t1_barrier = barrier.clone();
handles.push(thread::spawn(move || {
t1_barrier.wait();
assert_eq!(rx1.recv().unwrap().1, "breaking news");
}));
let t2_barrier = barrier.clone();
handles.push(thread::spawn(move || {
t2_barrier.wait();
assert_eq!(rx2.recv().unwrap().1, "breaking news");
}));
let t3_barrier = barrier.clone();
handles.push(thread::spawn(move || {
t3_barrier.wait();
assert_eq!(rx3.recv().unwrap().1, "breaking news");
}));
barrier.wait();
tx.send("news", "breaking news".to_string()).unwrap();
for h in handles {
h.join().unwrap();
}
}
#[test]
fn sync_topic_unsubscribe_works() {
let (tx, rx) = spmc_topic::channel(16);
rx.subscribe("topic1");
tx.send("topic1", "first".to_string()).unwrap();
assert_eq!(rx.recv().unwrap().1, "first");
rx.unsubscribe(&"topic1");
tx.send("topic1", "second".to_string()).unwrap();
thread::sleep(SHORT_TIMEOUT);
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn sync_topic_slow_consumer_drops_messages() {
let (tx, rx) = spmc_topic::channel(1);
rx.subscribe("important");
tx.send("important", "msg1".to_string()).unwrap();
tx.send("important", "msg2".to_string()).unwrap();
tx.send("important", "msg3".to_string()).unwrap();
assert_eq!(rx.recv().unwrap().1, "msg1");
assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
}
#[test]
fn sync_topic_sender_drop_disconnects_receivers() {
let (tx, rx) = spmc_topic::channel(16);
rx.subscribe("a");
tx.send("a", 1).unwrap();
drop(tx);
assert_eq!(rx.recv().unwrap().1, 1);
assert_eq!(rx.recv(), Err(RecvError::Disconnected));
}
#[test]
fn sync_topic_all_receivers_drop_closes_sender() {
let (tx, rx1) = spmc_topic::channel::<&str, i32>(16);
let rx2 = rx1.clone();
rx1.subscribe("a");
rx2.subscribe("b");
assert!(tx.send("a", 1).is_ok());
drop(rx1);
drop(rx2);
thread::sleep(SHORT_TIMEOUT);
assert_eq!(tx.send("a", 2), Err(SendError::Closed));
assert!(tx.is_closed());
}
#[test]
fn sync_topic_clone_receiver_inherits_subscriptions() {
let (tx, rx1) = spmc_topic::channel(16);
rx1.subscribe("a");
rx1.subscribe("b");
let rx2 = rx1.clone();
rx1.unsubscribe(&"b");
tx.send("a", 10).unwrap();
tx.send("b", 20).unwrap();
assert_eq!(rx1.recv().unwrap(), ("a", 10));
assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
let mut results = HashSet::new();
results.insert(rx2.recv().unwrap());
results.insert(rx2.recv().unwrap());
let expected: HashSet<(&str, i32)> = [("a", 10), ("b", 20)].iter().cloned().collect();
assert_eq!(results, expected);
}