use kevy_client::{Connection, PubsubEvent, Subscriber};
use std::sync::{Arc, Barrier};
use std::thread;
use std::time::Duration;
#[test]
fn mailrs_pattern_cross_thread_publish_recv() {
const URL: &str = "mem://mailrs-cross-thread";
let mut sub = Subscriber::open(URL, &[b"mail.event"]).unwrap();
let ack = sub.recv().unwrap();
assert!(matches!(ack, PubsubEvent::Subscribe { count: 1, .. }));
let barrier = Arc::new(Barrier::new(2));
let pub_barrier = barrier.clone();
let publisher = thread::spawn(move || {
let mut conn = Connection::open(URL).unwrap();
pub_barrier.wait();
conn.publish(b"mail.event", b"recipient=foo@bar.example")
.unwrap()
});
barrier.wait();
let n = publisher.join().unwrap();
assert_eq!(n, 1);
sub.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Message {
channel: b"mail.event".to_vec(),
payload: b"recipient=foo@bar.example".to_vec(),
}
);
}
#[test]
fn distinct_named_urls_have_independent_buses() {
let mut sub_a = Subscriber::open("mem://bus-A", &[b"chan"]).unwrap();
let _ = sub_a.recv().unwrap(); let mut pub_b = Connection::open("mem://bus-B").unwrap();
assert_eq!(pub_b.publish(b"chan", b"x").unwrap(), 0);
sub_a.set_read_timeout(Some(Duration::from_millis(50))).unwrap();
let err = sub_a.recv().unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::TimedOut);
}
#[test]
fn psubscribe_glob_reaches_via_same_url() {
const URL: &str = "mem://glob-bus";
let mut sub = Subscriber::connect(URL).unwrap();
sub.psubscribe(&[b"mail.*"]).unwrap();
let _ = sub.recv().unwrap();
let mut pubconn = Connection::open(URL).unwrap();
assert_eq!(pubconn.publish(b"mail.inbound", b"x").unwrap(), 1);
assert_eq!(pubconn.publish(b"weather", b"sunny").unwrap(), 0);
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Pmessage {
pattern: b"mail.*".to_vec(),
channel: b"mail.inbound".to_vec(),
payload: b"x".to_vec(),
}
);
}
#[test]
fn fan_out_to_multiple_subscribers() {
const URL: &str = "mem://fanout-bus";
let mut s1 = Subscriber::open(URL, &[b"chan"]).unwrap();
let mut s2 = Subscriber::open(URL, &[b"chan"]).unwrap();
let _ = s1.recv().unwrap(); let _ = s2.recv().unwrap();
let mut pubconn = Connection::open(URL).unwrap();
assert_eq!(pubconn.publish(b"chan", b"hello").unwrap(), 2);
for sub in [&mut s1, &mut s2] {
sub.set_read_timeout(Some(Duration::from_secs(1))).unwrap();
let ev = sub.recv().unwrap();
assert_eq!(
ev,
PubsubEvent::Message {
channel: b"chan".to_vec(),
payload: b"hello".to_vec(),
}
);
}
}
#[test]
fn anonymous_mem_url_rejected_at_subscriber_open() {
let err = Subscriber::open("mem://", &[b"chan"]).unwrap_err();
assert_eq!(err.kind(), std::io::ErrorKind::Unsupported);
}
#[test]
fn named_bus_recycles_after_all_handles_drop() {
const URL: &str = "mem://recycle-bus";
{
let mut conn = Connection::open(URL).unwrap();
conn.set(b"hot", b"yes").unwrap();
assert_eq!(conn.get(b"hot").unwrap(), Some(b"yes".to_vec()));
}
let mut conn2 = Connection::open(URL).unwrap();
assert_eq!(conn2.get(b"hot").unwrap(), None);
}
#[test]
fn recv_message_skips_ack_and_returns_payload() {
const URL: &str = "mem://recv-message-skip-ack";
let mut sub = Subscriber::open(URL, &[b"chan"]).unwrap();
let mut conn = Connection::open(URL).unwrap();
let n = conn.publish(b"chan", b"hello").unwrap();
assert_eq!(n, 1);
sub.set_read_timeout(Some(Duration::from_secs(2))).unwrap();
let (channel, payload) = sub.recv_message().unwrap();
assert_eq!(channel, b"chan");
assert_eq!(payload, b"hello");
}