mod util;
use std::{
sync::{atomic::AtomicUsize, Arc},
thread,
time::Duration,
};
pub use util::*;
#[test]
fn slow_consumers() {
let dropped_messages = Arc::new(AtomicUsize::new(0));
let s = util::run_basic_server();
let nc = nats::Options::with_user_pass("derek", "s3cr3t!")
.error_callback({
let dropped_messages = dropped_messages.clone();
move |err| {
if err.to_string()
== *"slow consumer detected for subscription on subject data. dropping messages"
{
dropped_messages.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
}
}
})
.connect(s.client_url())
.expect("could not connect");
let sub = nc.subscribe("data").unwrap();
sub.set_message_limits(100);
for _ in 0..140 {
nc.publish("data", b"test message").unwrap();
}
thread::sleep(Duration::from_millis(200));
let mut i = 0;
while i < 100 {
sub.next();
i += 1;
}
assert_eq!(
sub.dropped_messages().unwrap(),
dropped_messages.load(std::sync::atomic::Ordering::SeqCst)
);
assert_eq!(sub.dropped_messages().unwrap(), 40);
}