Expand description

JetStream is a NATS built-in persistence layer providing Streams with at least once and exactly once semantics.

To start, create a new Context which is an entrypoint to JetStream API.

Examples

use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

jetstream.publish("events".to_string(), "data".into()).await?;

let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
    durable_name: Some("consumer".to_string()),
    ..Default::default()
}).await?;

let mut messages = consumer.messages().await?.take(100);
while let Ok(Some(message)) = messages.try_next().await {
  println!("message receiver: {:?}", message);
  message.ack().await?;
}
Ok(())
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

jetstream.publish("events".to_string(), "data".into()).await?;

let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
    durable_name: Some("consumer".to_string()),
    ..Default::default()
}).await?;

let mut batches = consumer.sequence(50)?.take(10);
while let Ok(Some(mut batch)) = batches.try_next().await {
    while let Some(Ok(message)) = batch.next().await {
        println!("message receiver: {:?}", message);
        message.ack().await?;
    }
}
Ok(())

Re-exports

pub use context::Context;
pub use message::AckKind;
pub use message::Message;

Modules

Push and Pull Consumer API.

Manage operations on Context, create/delete/update Stream

A wrapped crate::Message with JetStream related methods.

Publish JetStream messages.

A low level JetStream responses.

Manage operations on a Stream, create/delete/update Consumer.

Functions

Creates a new JetStream Context that provides JetStream API for managming and using Streams, Consumers, key value and object store.

Creates a new JetStream Context with given JetStteam domain.

Creates a new JetStream Context with given JetStream prefix. By default it is $JS.API.