use std::{io, time::Duration};
mod util;
use nats::jetstream;
use nats::jetstream::*;
pub use util::*;
#[test]
#[ignore]
fn jetstream_not_enabled() {
let s = util::run_basic_server();
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
let err = js.account_info().unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into error");
assert_eq!(err.error_code(), jetstream::ErrorCode::NotEnabled);
}
#[test]
fn jetstream_account_not_enabled() {
let s = util::run_server("tests/configs/jetstream_account_not_enabled.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
let err = js.account_info().unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into jetstream::Error");
assert_eq!(err.error_code(), jetstream::ErrorCode::NotEnabledForAccount);
}
#[test]
fn jetstream_publish() {
let (_s, nc, js) = run_basic_jetstream();
js.add_stream(StreamConfig {
name: "TEST".to_string(),
subjects: vec!["test".to_string(), "foo".to_string(), "bar".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
let msg = b"Hello JS";
let ack = js.publish("foo", &msg).unwrap();
assert_eq!(ack.stream, "TEST");
assert_eq!(ack.sequence, 1);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 1);
let err = js
.publish_with_options(
"foo",
&msg,
&PublishOptions {
expected_stream: Some("ORDERS".to_string()),
..Default::default()
},
)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into error");
assert_eq!(err.error_code(), jetstream::ErrorCode::StreamNotMatch);
let err = js
.publish_with_options(
"foo",
&msg,
&PublishOptions {
expected_last_sequence: Some(10),
..Default::default()
},
)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into error");
assert_eq!(
err.error_code(),
jetstream::ErrorCode::StreamWrongLastSequence
);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 1);
let ack = js
.publish_with_options(
"foo",
&msg,
&PublishOptions {
id: Some("ZZZ".to_string()),
..Default::default()
},
)
.unwrap();
assert_eq!(ack.stream, "TEST");
assert_eq!(ack.sequence, 2);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 2);
let ack = js
.publish_with_options(
"foo",
&msg,
&PublishOptions {
id: Some("ZZZ".to_string()),
..Default::default()
},
)
.unwrap();
assert!(ack.duplicate);
assert_eq!(ack.stream, "TEST");
assert_eq!(ack.sequence, 2);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 2);
let err = js
.publish_with_options(
"foo",
msg,
&PublishOptions {
expected_last_msg_id: Some("AAA".to_string()),
..Default::default()
},
)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into error");
assert_eq!(err.error_code(), jetstream::ErrorCode::StreamWrongLastMsgId);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 2);
let err = js
.publish_with_options(
"foo",
msg,
&PublishOptions {
expected_last_sequence: Some(22),
..Default::default()
},
)
.unwrap_err();
assert_eq!(err.kind(), io::ErrorKind::Other);
let err = err
.into_inner()
.expect("should be able to convert error into inner")
.downcast::<jetstream::Error>()
.expect("should be able to downcast into error");
assert_eq!(
err.error_code(),
jetstream::ErrorCode::StreamWrongLastSequence
);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 2);
let ack = js
.publish_with_options(
"foo",
msg,
&PublishOptions {
expected_last_sequence: Some(2),
..Default::default()
},
)
.unwrap();
assert_eq!(ack.stream, "TEST");
assert_eq!(ack.sequence, 3);
assert_eq!(js.stream_info("TEST").unwrap().state.messages, 3);
let sub = nc.subscribe("test").unwrap();
js.publish_with_options(
"test",
msg,
&PublishOptions {
expected_last_subject_sequence: Some(1),
..Default::default()
},
)
.ok();
let msg = sub.next_timeout(Duration::from_secs(1)).unwrap();
assert_eq!(
msg.headers
.unwrap()
.get("Nats-Expected-Last-Subject-Sequence"),
Some(&"1".to_string())
);
}
#[test]
fn jetstream_subscribe() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc.clone());
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec![
"foo".to_string(),
"bar".to_string(),
"baz".to_string(),
"foo.*".to_string(),
],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
let sub = js.subscribe("foo").unwrap();
let payload = b"hello js";
for _ in 0..10 {
js.publish("foo", payload).unwrap();
}
for _ in 0..10 {
let msg = sub.next().unwrap();
msg.ack().unwrap();
assert_eq!(msg.data, payload);
}
nc.flush().unwrap();
let info = sub.consumer_info().unwrap();
assert_eq!(info.config.ack_policy, AckPolicy::Explicit);
assert_eq!(info.delivered.consumer_seq, 10);
assert_eq!(info.ack_floor.consumer_seq, 10);
js.publish("foo", payload).unwrap();
sub.drain().unwrap();
assert!(sub.next().is_some());
js.publish("foo", payload).unwrap();
assert!(sub.next().is_none());
}
#[test]
fn jetstream_subscribe_durable() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec![
"foo".to_string(),
"bar".to_string(),
"baz".to_string(),
"foo.*".to_string(),
],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
let sub = js
.subscribe_with_options(
"foo",
&SubscribeOptions::new().durable_name("foobar".to_string()),
)
.unwrap();
let info = sub.consumer_info().unwrap();
assert_eq!(info.config.durable_name, Some("foobar".to_string()));
sub.drain().unwrap();
let sub = js
.subscribe_with_options(
"foo",
&SubscribeOptions::new().durable_name("foobar".to_string()),
)
.unwrap();
let old_info = info;
let new_info = sub.consumer_info().unwrap();
assert_ne!(
new_info.config.deliver_subject,
old_info.config.deliver_subject
);
sub.unsubscribe().unwrap();
js.subscribe_with_options(
"foo",
&SubscribeOptions::new().durable_name("foobar".to_string()),
)
.unwrap();
}
#[test]
fn jetstream_queue_subscribe() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec![
"foo".to_string(),
"bar".to_string(),
"baz".to_string(),
"foo.*".to_string(),
],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
for _ in 0..10 {
js.publish("bar", b"hello js").unwrap();
}
let sub1 = js.queue_subscribe("bar", "v0").unwrap();
js.queue_subscribe("baz", "v0").unwrap_err();
js.queue_subscribe_with_options(
"bar",
"v1",
&SubscribeOptions::new().durable_name("v0".to_string()),
)
.unwrap_err();
let sub2 = js
.queue_subscribe_with_options(
"bar",
"v0",
&SubscribeOptions::new().durable_name("other_queue_durable".to_string()),
)
.unwrap();
let msg = sub1.next().unwrap();
msg.ack().unwrap();
assert_eq!(msg.data, b"hello js");
let msg = sub2.next_timeout(Duration::from_secs(1)).unwrap();
msg.ack().unwrap();
assert_eq!(msg.data, b"hello js");
sub1.unsubscribe().unwrap();
sub2.unsubscribe().unwrap();
}
#[test]
fn jetstream_queue_subscribe_no_mismatch_handle() {
let s = util::run_server("tests/configs/jetstream.conf");
let con = nats::connect(s.client_url()).unwrap();
let jsm = nats::jetstream::new(con);
jsm.add_stream(StreamConfig {
name: "jobs_stream".to_string(),
discard: DiscardPolicy::Old,
subjects: vec!["waiting_jobs".to_string()],
retention: RetentionPolicy::WorkQueue,
storage: StorageType::File,
..Default::default()
})
.unwrap();
jsm.add_consumer(
"jobs_stream",
ConsumerConfig {
deliver_group: Some("dg".to_string()),
durable_name: Some("durable".to_string()),
deliver_policy: DeliverPolicy::All,
ack_policy: AckPolicy::Explicit,
deliver_subject: Some("deliver_subject".to_string()),
..Default::default()
},
)
.unwrap();
let job_sub = jsm
.queue_subscribe_with_options(
"waiting_jobs",
"dg",
&SubscribeOptions::bind("jobs_stream".to_string(), "durable".to_string())
.deliver_subject("deliver_subject".to_string())
.ack_explicit()
.deliver_all()
.replay_instant(),
)
.unwrap();
jsm.publish("waiting_jobs", b"foo").unwrap();
jsm.publish("waiting_jobs", b"foo").unwrap();
let msg = job_sub.next().unwrap();
msg.ack().unwrap();
drop(job_sub);
let job_sub = jsm
.queue_subscribe_with_options(
"waiting_jobs",
"dg",
&SubscribeOptions::bind("jobs_stream".to_string(), "durable".to_string())
.deliver_subject("deliver_subject".to_string())
.ack_explicit()
.deliver_all()
.replay_instant(),
)
.unwrap();
jsm.publish("waiting_jobs", b"foo").unwrap();
jsm.publish("waiting_jobs", b"foo").unwrap();
let msg = job_sub
.next_timeout(Duration::from_millis(100))
.expect("should got a message");
msg.ack().unwrap();
}
#[test]
fn jetstream_flow_control() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec![
"foo".to_string(),
"bar".to_string(),
"baz".to_string(),
"foo.*".to_string(),
],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
let sub = js
.subscribe_with_options(
"foo",
&SubscribeOptions::new()
.durable_name("foo".to_string())
.deliver_subject("fs".to_string())
.idle_heartbeat(Duration::from_millis(300))
.enable_flow_control(),
)
.unwrap();
let info = sub.consumer_info().unwrap();
assert!(info.config.flow_control);
let data = b"hello";
for _ in 0..250 {
js.publish("foo", data).unwrap();
}
std::thread::sleep(Duration::from_secs(1));
for _ in 0..250 {
let message = sub.next().unwrap();
assert_eq!(message.data, data);
}
}
#[test]
fn jetstream_ordered() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::connect(&s.client_url()).unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec![
"foo".to_string(),
"bar".to_string(),
"baz".to_string(),
"foo.*".to_string(),
],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
let sub = js
.subscribe_with_options("foo", &SubscribeOptions::ordered().deliver_all())
.unwrap();
let info = sub.consumer_info().unwrap();
assert!(info.config.flow_control);
for i in 0..250 {
js.publish("foo", (i as i64).to_be_bytes()).unwrap();
}
for i in 0..250 {
let message = sub.next().unwrap();
assert_eq!(message.data, (i as i64).to_be_bytes());
}
}
#[test]
fn jetstream_pull_subscribe_fetch() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {}", err))
.connect(&s.client_url())
.unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec!["foo".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
js.add_consumer(
"TEST",
ConsumerConfig {
durable_name: Some("CONSUMER".to_string()),
..Default::default()
},
)
.unwrap();
for _ in 0..1000 {
js.publish("foo", b"lorem").unwrap();
}
let consumer = js
.pull_subscribe_with_options(
"foo",
&PullSubscribeOptions::new().durable_name("CONSUMER".to_string()),
)
.unwrap();
let batch = consumer.fetch(10).unwrap();
let mut i = 0;
for _ in batch {
i += 1;
}
assert_eq!(i, 10);
let batch = consumer.fetch(10).unwrap();
for _ in batch {
i += 1;
}
assert_eq!(i, 20);
}
#[test]
fn jetstream_pull_subscribe_timeout_fetch() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {}", err))
.connect(&s.client_url())
.unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec!["foo".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
js.add_consumer(
"TEST",
ConsumerConfig {
durable_name: Some("CONSUMER".to_string()),
..Default::default()
},
)
.unwrap();
for _ in 0..15 {
js.publish("foo", b"lorem").unwrap();
}
let consumer = js
.pull_subscribe_with_options(
"foo",
&PullSubscribeOptions::new().durable_name("CONSUMER".to_string()),
)
.unwrap();
let batch = consumer
.timeout_fetch(10, Duration::from_millis(100))
.unwrap();
for msg in batch {
msg.unwrap().ack().unwrap();
}
let batch = consumer
.timeout_fetch(10, Duration::from_millis(100))
.unwrap();
for (j, msg) in batch.enumerate() {
if j >= 5 {
msg.unwrap_err();
break;
}
msg.unwrap().ack().unwrap();
}
}
#[test]
fn jetstream_pull_subscribe_fetch_with_handler() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {}", err))
.connect(&s.client_url())
.unwrap();
let js = nats::jetstream::new(nc.clone());
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec!["foo".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
js.add_consumer(
"TEST",
ConsumerConfig {
durable_name: Some("CONSUMER".to_string()),
..Default::default()
},
)
.unwrap();
for _ in 0..15 {
js.publish("foo", b"lorem").unwrap();
}
let consumer = js
.pull_subscribe_with_options(
"foo",
&PullSubscribeOptions::new().durable_name("CONSUMER".to_string()),
)
.unwrap();
let mut i = 0;
consumer
.fetch_with_handler(10, |_| {
i += 1;
Ok(())
})
.unwrap();
nc.flush().unwrap();
let info = js.consumer_info("TEST", "CONSUMER").unwrap();
assert_eq!(info.num_ack_pending, 0);
assert_eq!(info.num_pending, 5);
assert_eq!(10, i);
}
#[test]
fn jetstream_pull_subscribe_ephemeral() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {}", err))
.connect(&s.client_url())
.unwrap();
let js = nats::jetstream::new(nc);
js.add_stream(&StreamConfig {
name: "TEST".to_string(),
subjects: vec!["foo".to_string()],
..Default::default()
})
.unwrap();
js.stream_info("TEST").unwrap();
js.publish("foo", b"foo").unwrap();
let consumer = js.pull_subscribe("foo").unwrap();
consumer.request_batch(1).unwrap();
consumer.next();
}
#[test]
fn jetstream_pull_subscribe_bad_stream() {
let s = util::run_server("tests/configs/jetstream.conf");
let nc = nats::Options::new()
.error_callback(|err| println!("error!: {}", err))
.connect(&s.client_url())
.unwrap();
let js = nats::jetstream::new(nc);
js.pull_subscribe("WRONG")
.expect_err("expected not found stream for a given subject");
}