Module async_nats::jetstream

source ·
Expand description

JetStream is a NATS built-in persistence layer providing Streams with at least once and exactly once semantics.

To start, create a new Context which is an entrypoint to JetStream API.

Examples

use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

jetstream.publish("events".to_string(), "data".into()).await?;

let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
    durable_name: Some("consumer".to_string()),
    ..Default::default()
}).await?;

let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
  println!("message receiver: {:?}", message);
  message.ack().await?;
}
Ok(())
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

jetstream.publish("events".to_string(), "data".into()).await?;

let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
    durable_name: Some("consumer".to_string()),
    ..Default::default()
}).await?;

let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
    while let Some(Ok(message)) = batch.next().await {
        println!("message receiver: {:?}", message);
        message.ack().await?;
    }
}
Ok(())

Re-exports

Modules

Functions