Module async_nats::jetstream
source · [−]Expand description
JetStream is a NATS built-in persistence layer providing Streams with at least once and exactly once semantics.
To start, create a new Context which is an entrypoint to JetStream
API.
Examples
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
jetstream.publish("events".to_string(), "data".into()).await?;
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
Ok(())
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
jetstream.publish("events".to_string(), "data".into()).await?;
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
while let Some(Ok(message)) = batch.next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
}
Ok(())
Re-exports
Modules
A wrapped crate::Message
with JetStream
related methods.
Publish JetStream
messages.
A low level JetStream
responses.