use super::*;
use crate::{Config, Store};
fn store() -> Store {
Store::open(Config::default().with_ttl_reaper_manual()).unwrap()
}
#[test]
fn publish_to_no_subscribers_returns_zero() {
let s = store();
assert_eq!(s.publish(b"chan", b"hi"), 0);
}
#[test]
fn subscribe_ack_then_message_delivered() {
let s = store();
let sub = s.subscribe(&[b"news"]);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Subscribe {
channel: b"news".to_vec(),
count: 1,
}
);
assert_eq!(s.publish(b"news", b"hello"), 1);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Message {
channel: b"news".to_vec(),
payload: b"hello".to_vec(),
}
);
}
#[test]
fn store_clone_publishes_reach_other_clones_subscribers() {
let s1 = store();
let s2 = s1.clone();
let sub = s1.subscribe(&[b"x"]);
let _ = sub.recv().unwrap(); assert_eq!(s2.publish(b"x", b"v"), 1);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Message {
channel: b"x".to_vec(),
payload: b"v".to_vec(),
}
);
}
#[test]
fn psubscribe_glob_match_delivers_pmessage() {
let s = store();
let sub = s.psubscribe(&[b"news.*"]);
let _ = sub.recv().unwrap(); assert_eq!(s.publish(b"news.tech", b"breaking"), 1);
assert_eq!(
sub.recv().unwrap(),
PubsubFrame::Pmessage {
pattern: b"news.*".to_vec(),
channel: b"news.tech".to_vec(),
payload: b"breaking".to_vec(),
}
);
assert_eq!(s.publish(b"weather", b"sunny"), 0);
assert!(sub.try_recv().unwrap().is_none());
}
#[test]
fn duplicate_subscribe_does_not_duplicate_delivery() {
let s = store();
let mut sub = s.subscribe(&[b"x"]);
sub.subscribe(&[b"x"]); let a1 = sub.recv().unwrap();
let a2 = sub.recv().unwrap();
assert!(matches!(a1, PubsubFrame::Subscribe { count: 1, .. }));
assert!(matches!(a2, PubsubFrame::Subscribe { count: 1, .. }));
assert_eq!(s.publish(b"x", b"v"), 1);
let _ = sub.recv().unwrap();
assert!(sub.try_recv().unwrap().is_none());
}
#[test]
fn unsubscribe_removes_then_no_more_messages() {
let s = store();
let mut sub = s.subscribe(&[b"x"]);
let _ = sub.recv().unwrap();
sub.unsubscribe(&[b"x"]);
assert!(matches!(
sub.recv().unwrap(),
PubsubFrame::Unsubscribe {
channel: Some(_),
count: 0
}
));
assert_eq!(s.publish(b"x", b"v"), 0);
}
#[test]
fn unsubscribe_all_with_empty_args_drains_every_channel() {
let s = store();
let mut sub = s.subscribe(&[b"a", b"b"]);
let _ = sub.recv().unwrap();
let _ = sub.recv().unwrap();
sub.unsubscribe(&[]);
for _ in 0..2 {
assert!(matches!(
sub.recv().unwrap(),
PubsubFrame::Unsubscribe {
channel: Some(_),
..
}
));
}
assert_eq!(s.publish(b"a", b"x"), 0);
assert_eq!(s.publish(b"b", b"x"), 0);
}
#[test]
fn unsubscribe_when_no_subs_held_emits_nil_channel_ack() {
let s = store();
let mut sub = s.subscribe(&[]); sub.unsubscribe(&[]);
assert!(matches!(
sub.recv().unwrap(),
PubsubFrame::Unsubscribe {
channel: None,
count: 0
}
));
}
#[test]
fn drop_subscriber_unregisters() {
let s = store();
let sub = s.subscribe(&[b"x"]);
let _ = sub.recv().unwrap();
assert_eq!(s.publish(b"x", b"v"), 1);
let _ = sub.recv().unwrap();
drop(sub);
assert_eq!(s.publish(b"x", b"v"), 0);
}
#[test]
fn recv_timeout_returns_timeout_when_empty() {
let s = store();
let sub = s.subscribe(&[b"x"]);
let _ = sub.recv_timeout(Duration::from_millis(100)).unwrap();
let err = sub
.recv_timeout(Duration::from_millis(50))
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::TimedOut);
}
#[test]
fn subscription_is_send_and_sync() {
fn require_send_sync<T: Send + Sync>() {}
require_send_sync::<Subscription>();
}
#[test]
fn arc_subscription_drains_concurrent_recvs_round_robin() {
let s = store();
let sub = std::sync::Arc::new(s.subscribe(&[b"flood"]));
let _ack = sub.recv().unwrap();
let consumer1 = {
let sub = sub.clone();
std::thread::spawn(move || {
let mut count = 0u32;
while count < 100 {
match sub.recv_timeout(Duration::from_secs(2)) {
Ok(_) => count += 1,
Err(_) => break,
}
}
count
})
};
let consumer2 = {
let sub = sub.clone();
std::thread::spawn(move || {
let mut count = 0u32;
while count < 100 {
match sub.recv_timeout(Duration::from_secs(2)) {
Ok(_) => count += 1,
Err(_) => break,
}
}
count
})
};
for i in 0..100u32 {
let payload = format!("msg-{i:04}");
let n = s.publish(b"flood", payload.as_bytes());
assert_eq!(n, 1, "subscriber count was wrong at publish {i}");
}
std::thread::sleep(Duration::from_millis(100));
drop(s); drop(sub);
let c1 = consumer1.join().unwrap();
let c2 = consumer2.join().unwrap();
assert_eq!(c1 + c2, 100, "got c1={c1}, c2={c2}, expected sum=100");
}
#[test]
fn try_recv_returns_none_under_concurrent_blocking_recv() {
let s = store();
let sub = std::sync::Arc::new(s.subscribe(&[b"slow"]));
let _ack = sub.recv().unwrap();
let blocker = {
let sub = sub.clone();
std::thread::spawn(move || {
let _ = sub.recv_timeout(Duration::from_secs(2));
})
};
std::thread::sleep(Duration::from_millis(50));
let start = std::time::Instant::now();
let res = sub.try_recv().unwrap();
let elapsed = start.elapsed();
assert!(res.is_none(), "expected Ok(None) under contention, got {res:?}");
assert!(
elapsed < Duration::from_millis(50),
"try_recv took {elapsed:?} — should not block on receiver mutex"
);
s.publish(b"slow", b"x");
let _ = blocker.join();
}