jetstream-extra 0.3.0

Set of utilities and extensions for the JetStream NATS of the async-nats crate
Documentation

jetstream-extra

License Apache 2 Crates.io Documentation Build Status

Set of utilities and extensions for the JetStream NATS of the async-nats crate.

Features

  • Batch Publishing - Atomic batch publishing ensuring all-or-nothing message storage
  • Fast Ingest Batch Publishing - High-throughput, non-atomic batch publishing with server-driven flow control (requires nats-server 2.14+)
  • Batch Fetching - Efficient multi-message retrieval using DIRECT.GET API

Batch Publishing

Atomic batch publishing implementation for JetStream streams, ensuring that either all messages in a batch are stored or none are.

Complete example

Connect to NATS server with JetStream, and extend the jetstream context with the batch publishing capabilities.

use async_nats::jetstream;
// Extend the JetStream context with batch publishing.
use jetstream_extra::batch_publish::BatchPublishExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("demo.nats.io").await?;
    let jetstream = jetstream::new(client);

    // Create or get a stream with atomic publishing enabled
    let _stream = jetstream.get_or_create_stream(jetstream::stream::Config {
        name: "events".to_string(),
        subjects: vec!["events.*".to_string()],
        allow_atomic_publish: true,
        ..Default::default()
    }).await?;

    // Build and use a batch publisher
    let mut batch = jetstream.batch_publish().build();

    // Add messages to the batch
    batch.add("events.order", "order-123".into()).await?;
    batch.add("events.payment", "payment-456".into()).await?;
    batch.add("events.inventory", "item-789".into()).await?;

    // Commit the batch atomically
    let ack = batch.commit("events.notification", "notify-complete".into()).await?;

    println!("Batch published with sequence: {}", ack.sequence);

    Ok(())
}

Fast Ingest Batch Publishing

High-throughput, non-atomic batch publishing using JetStream's fast-ingest feature (ADR-50, requires nats-server 2.14 or later). Unlike atomic batch publishing, messages are persisted as they arrive and the server uses a flow-control channel to coordinate throughput across concurrent publishers.

Use fast ingest when:

  • You need to ship millions of messages per batch and don't need all-or-nothing semantics.
  • Throughput matters more than atomicity.
  • You want the server to dynamically tune ack frequency based on load.

The stream must have allow_batched: true. The publisher owns a dedicated inbox subscription for the duration of the batch and drives ack handling inline — no background task, no locks.

Complete example

use async_nats::jetstream;
use jetstream_extra::batch_publish_fast::{FastPublishExt, GapMode};
use std::time::Duration;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("nats://127.0.0.1:4222").await?;
    let jetstream = jetstream::new(client);

    // Stream must have allow_batched: true (not yet exposed in async-nats
    // 0.45.0 StreamConfig — create via raw JetStream API until upstream
    // adds the field).

    let mut batch = jetstream
        .fast_publish()
        .flow(100)                                 // ack every 100 messages (ceiling)
        .max_outstanding_acks(2)                   // up to 200 messages in flight
        .gap_mode(GapMode::Fail)                   // abort on any gap (default)
        .ack_timeout(Duration::from_secs(10))
        .on_error(|e| eprintln!("fast publish event: {e}"))
        .build()?;

    // Stream 10,000 messages. The stall gate transparently waits for flow
    // acks and sends pings to recover from any lost acks.
    for i in 0..10_000 {
        batch.add("metrics.cpu", format!("sample {i}").into()).await?;
    }

    // End-of-batch commit — the commit message itself is NOT stored.
    // Use `commit(...)` instead if you want a final message persisted.
    let ack = batch.close().await?;
    println!("committed {} messages as batch {}", ack.batch_size, ack.batch_id);

    Ok(())
}

See examples/fast_publisher.rs for a runnable example.

Batch Fetching

Efficient batch fetching of messages from JetStream streams using the DIRECT.GET API, supporting:

  • Fetching multiple messages in a single request
  • Subject filtering with wildcards
  • Sequence and time-based ranges
  • Multi-subject last message queries

Fetch a batch of messages

use async_nats::jetstream;
use jetstream_extra::batch_fetch::BatchFetchExt;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("demo.nats.io").await?;
    let context = jetstream::new(client);

    // Fetch 100 messages starting from sequence 1
    let mut messages = context
        .get_batch("my_stream", 100)
        .send()
        .await?;

    while let Some(msg) = messages.next().await {
        let msg = msg?;
        println!("Message at seq {}: {:?}", msg.sequence, msg.subject);
    }

    Ok(())
}

Get last messages for multiple subjects

use async_nats::jetstream;
use jetstream_extra::batch_fetch::BatchFetchExt;
use futures::StreamExt;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = async_nats::connect("demo.nats.io").await?;
    let context = jetstream::new(client);

    // Get the last message for each sensor
    let subjects = vec![
        "sensors.temp".to_string(),
        "sensors.humidity".to_string(),
        "sensors.pressure".to_string(),
    ];

    let mut messages = context
        .get_last_messages_for("sensor_stream")
        .subjects(subjects)
        .send()
        .await?;

    while let Some(msg) = messages.next().await {
        let msg = msg?;
        println!("Last value for {}: {:?}", msg.subject, msg.payload);
    }

    Ok(())
}