Module nats::jetstream[][src]

Expand description

JetStream stream management and consumers. Experimental JetStream support enabled via the jetstream feature.

Examples

Create a new stream with default options:

let nc = nats::connect("my_server::4222")?;

// create_stream converts a str into a
// default `StreamConfig`.
nc.create_stream("my_stream")?;

Create a new stream with specific options set:

use nats::jetstream::{StreamConfig, StorageType};

let nc = nats::connect("my_server::4222")?;

nc.create_stream(StreamConfig {
    name: "my_memory_stream".to_string(),
    max_bytes: 5 * 1024 * 1024 * 1024,
    storage: StorageType::Memory,
    ..Default::default()
})?;

Create and use a new default consumer (defaults to Pull-based, see the docs for ConsumerConfig for how this influences behavior)

let nc = nats::connect("my_server::4222")?;

nc.create_stream("my_stream")?;

let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", "my_consumer")?;

Create and use a new push-based consumer with batched acknowledgements

use nats::jetstream::{AckPolicy, ConsumerConfig};

let nc = nats::connect("my_server::4222")?;

nc.create_stream("my_stream")?;

let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", ConsumerConfig {
    durable_name: Some("my_consumer".to_string()),
    deliver_subject: Some("my_push_consumer_subject".to_string()),
    ack_policy: AckPolicy::All,
    ..Default::default()
})?;

Consumers can also be created on-the-fly using Consumer::create_or_open, and later used with Consumer::existing if you do not wish to auto-create them.

use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig};

let nc = nats::connect("my_server::4222")?;

let consumer_res = Consumer::existing(nc.clone(), "my_stream", "non-existent_consumer");

// trying to use this consumer will fail because it hasn't been created yet
assert!(consumer_res.is_err());

// this will create the consumer if it does not exist already
let consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?;

Consumers may be used for processing messages individually, with timeouts, or in batches:

use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig};

let nc = nats::connect("my_server::4222")?;

// this will create the consumer if it does not exist already.
// consumer must be mut because the `process*` methods perform
// message deduplication using an interval tree, which is
// also publicly accessible via the `Consumer`'s `dedupe_window`
// field.
let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?;

// The `Consumer::process` method executes a closure
// on both push- and pull-based consumers, and if
// the closure returns `Ok` then the message is acked.
// If no message is available, it will wait forever
// for one to arrive.
let msg_data_len: usize = consumer.process(|msg| {
    println!("got message {:?}", msg);
    Ok(msg.data.len())
})?;

// Similar to `Consumer::process` except wait until the
// consumer's `timeout` field for the message to arrive.
// This can and should be set manually, as it has a low
// default of 5ms.
let msg_data_len: usize = consumer.process_timeout(|msg| {
    println!("got message {:?}", msg);
    Ok(msg.data.len())
})?;

// For consumers operating with `AckPolicy::All`, batch
// processing can provide nice throughput optimizations.
// `Consumer::process_batch` will wait indefinitely for
// the first message in a batch, then process
// more messages until the configured timeout is expired.
// It will batch acks if running with `AckPolicy::All`.
// If there is an error with acking, the last item in the
// returned `Vec` will be the io error. Terminates early
// without acking if the closure returns an `Err`, which
// is included in the final element of the `Vec`. If a
// Timeout happens before the batch size is reached, then
// there will be no errors included in the response `Vec`.
let batch_size = 128;
let results: Vec<std::io::Result<usize>> = consumer.process_batch(batch_size, |msg| {
    println!("got message {:?}", msg);
    Ok(msg.data.len())
});
let flipped: std::io::Result<Vec<usize>> = results.into_iter().collect();
let sizes: Vec<usize> = flipped?;

// For lower-level control for use cases that are not
// well-served by the high-level process* methods,
// there are a number of lower level primitives that
// can be used, such as `Consumer::pull` for pull-based
// consumers and `Message::ack` for manually acking things:
let msg = consumer.pull()?;

// --- process message ---

// tell the server the message has been processed
msg.ack()?;

Structs

AccountInfo

contains info about the JetStream usage from the current account.

AccountLimits

Various limits imposed on a particular account.

ApiStats

reports on API calls to JetStream for this account.

ClusterInfo

Information about the consumer’s associated JetStream cluster

Consumer

JetStream reliable consumption functionality.

ConsumerConfig

Configuration for consumers. From a high level, the durable_name and deliver_subject fields have a particularly strong influence on the consumer’s overall behavior.

ConsumerInfo

Information about a consumer

DateTime

A UTC time

IntervalTree

Records ranges of acknowledged IDs for low-memory deduplication.

JetStreamMessageInfo

Information about a received message

NextRequest

for getting next messages for pull based consumers.

PagedIterator

An iterator over paged JetStream API operations.

PurgeResponse

The response generated by trying ot purge a stream.

SequencePair

Information about a consumer and the stream it is consuming

StreamConfig

StreamConfig determines the properties for a stream. There are sensible defaults for most. If no subjects are given the name will be used as the only subject.

StreamInfo

Shows config and current state for this stream.

StreamState

information about the given stream.

Enums

AckKind

The kinds of response used for acknowledging a processed message.

AckPolicy

Determines whether messages will be acknowledged individually, in batches, or never.

DeliverPolicy

DeliverPolicy determines how the consumer should select the first message to deliver.

DiscardPolicy

DiscardPolicy determines how we proceed when limits of messages or bytes are hit. The default, Old will remove older messages. New will fail to store the new message.

ReplayPolicy

ReplayPolicy controls whether messages are sent to a consumer as quickly as possible or at the rate that they were originally received at.

RetentionPolicy

RetentionPolicy determines how messages in a set are retained.

StorageType

determines how messages are stored for retention.