#[cfg(test)]
mod batch_publish_tests {
use std::time::Duration;
use async_nats::jetstream::{self, stream};
use jetstream_extra::batch_publish::BatchPublishExt;
async fn setup_test_stream(
jetstream: &async_nats::jetstream::Context,
stream_name: &str,
) -> stream::Stream {
let stream_config = stream::Config {
name: stream_name.to_string(),
subjects: vec![format!("{}.*", stream_name)],
allow_atomic_publish: true,
allow_message_ttl: true,
..Default::default()
};
let _ = jetstream.delete_stream(stream_name).await;
jetstream.create_stream(stream_config).await.unwrap()
}
#[tokio::test]
async fn test_basic_batch_publish() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let jetstream = async_nats::jetstream::new(client);
let mut stream = setup_test_stream(&jetstream, "test").await;
let mut batch = jetstream.batch_publish().build();
batch.add("test.data", "Hello".into()).await.unwrap();
batch.add("test.subject", "World".into()).await.unwrap();
let ack = batch.commit("test.subject", "World".into()).await.unwrap();
assert_eq!(ack.batch_size, 3);
assert_eq!(ack.stream, "test");
let info = stream.info().await.unwrap();
assert_eq!(info.state.messages, 3);
}
#[tokio::test]
async fn test_batch_publish_options() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let jetstream = async_nats::jetstream::new(client);
let mut stream = setup_test_stream(&jetstream, "test").await;
let mut batch = jetstream.batch_publish().build();
let message = jetstream::message::PublishMessage::build()
.ttl(Duration::from_secs(2))
.outbound_message("test.ttl");
batch.add("test.normal", "data".into()).await.unwrap();
batch.add_message(message).await.unwrap();
let ack = batch.commit("test.commit", "data".into()).await.unwrap();
assert_eq!(ack.batch_size, 3);
let info = stream.info().await.unwrap();
assert_eq!(info.state.messages, 3);
tokio::time::sleep(Duration::from_secs(3)).await;
let info = stream.info().await.unwrap();
assert_eq!(info.state.messages, 2);
}
#[tokio::test]
async fn test_error() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let jetstream = async_nats::jetstream::new(client);
let mut stream = setup_test_stream(&jetstream, "test").await;
let mut batch = jetstream.batch_publish().ack_every(1).build();
let message = jetstream::message::PublishMessage::build()
.expected_last_sequence(20)
.outbound_message("test.sequence");
batch.add_message(message).await.unwrap();
batch.add("test.normal", "data".into()).await.unwrap();
batch
.commit("test.commit", "data".into())
.await
.unwrap_err();
let info = stream.info().await.unwrap();
assert_eq!(info.state.messages, 0);
}
#[tokio::test]
async fn test_batch_discard() {
let server = nats_server::run_server("tests/configs/jetstream.conf");
let client = async_nats::connect(server.client_url()).await.unwrap();
let jetstream = async_nats::jetstream::new(client);
let mut stream = setup_test_stream(&jetstream, "test_discard").await;
let mut batch = jetstream.batch_publish().build();
batch
.add("test_discard.1", "message1".into())
.await
.unwrap();
batch
.add("test_discard.2", "message2".into())
.await
.unwrap();
batch
.add("test_discard.3", "message3".into())
.await
.unwrap();
let batch_size = batch.size();
assert_eq!(batch_size, 3);
batch.discard();
let info = stream.info().await.unwrap();
assert_eq!(
info.state.messages, 0,
"Stream should have no committed messages immediately after discard"
);
tokio::time::sleep(std::time::Duration::from_secs(11)).await;
let info = stream.info().await.unwrap();
assert_eq!(
info.state.messages, 0,
"Stream should have no messages after server timeout"
);
}
}