mod util;
use nats::jetstream::*;
use std::time::Duration;
#[test]
fn request_timeout() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).expect("could not connect");
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec!["foo".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
js.add_consumer(
"TEST",
ConsumerConfig {
durable_name: Some("CONSUMER".to_string()),
..Default::default()
},
)
.unwrap();
let consumer = js
.pull_subscribe_with_options(
"foo",
&PullSubscribeOptions::new().bind_stream("TEST".to_string()),
)
.unwrap();
std::thread::spawn(move || {
for __ in 0..2 {
js.publish("foo", b"data").unwrap();
std::thread::sleep(Duration::from_millis(300));
}
});
consumer
.request_batch(BatchOptions {
expires: Some(Duration::from_millis(200).as_nanos() as usize),
batch: 2,
no_wait: false,
})
.unwrap();
let msg = consumer.next_timeout(Duration::from_millis(1000)).unwrap();
msg.ack().unwrap();
let _ = consumer
.next_timeout(Duration::from_millis(1000))
.expect_err("should timeout");
}