Module async_nats::jetstream
source · Expand description
JetStream is a NATS built-in persistence layer providing Streams with at least once and exactly once semantics.
To start, create a new Context which is an entrypoint to JetStream
API.
Examples
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
jetstream.publish("events".to_string(), "data".into()).await?;
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
Ok(())
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
jetstream.publish("events".to_string(), "data".into()).await?;
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
while let Some(Ok(message)) = batch.next().await {
println!("message receiver: {:?}", message);
message.ack().await?;
}
}
Ok(())
Re-exports
Modules
- Push and Pull Consumer API.
- A wrapped
crate::Message
withJetStream
related methods. - Object Store module
- Publish
JetStream
messages. - A low level
JetStream
responses.