Module nats::jetstream [−][src]
JetStream
stream management and consumers.
Experimental JetStream
support enabled via the jetstream
feature.
Examples
Create a new stream with default options:
let nc = nats::connect("my_server::4222")?; // create_stream converts a str into a // default `StreamConfig`. nc.create_stream("my_stream")?;
Create a new stream with specific options set:
use nats::jetstream::{StreamConfig, StorageType}; let nc = nats::connect("my_server::4222")?; nc.create_stream(StreamConfig { name: "my_memory_stream".to_string(), max_bytes: 5 * 1024 * 1024 * 1024, storage: StorageType::Memory, ..Default::default() })?;
Create and use a new default consumer (defaults to Pull-based, see the docs for ConsumerConfig
for how this influences behavior)
let nc = nats::connect("my_server::4222")?; nc.create_stream("my_stream")?; let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", "my_consumer")?;
Create and use a new push-based consumer with batched acknowledgements
use nats::jetstream::{AckPolicy, ConsumerConfig}; let nc = nats::connect("my_server::4222")?; nc.create_stream("my_stream")?; let consumer: nats::jetstream::Consumer = nc.create_consumer("my_stream", ConsumerConfig { durable_name: Some("my_consumer".to_string()), deliver_subject: Some("my_push_consumer_subject".to_string()), ack_policy: AckPolicy::All, ..Default::default() })?;
Consumers can also be created on-the-fly using Consumer::create_or_open
, and later used with
Consumer::existing
if you do not wish to auto-create them.
use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig}; let nc = nats::connect("my_server::4222")?; let consumer_res = Consumer::existing(nc.clone(), "my_stream", "non-existent_consumer"); // trying to use this consumer will fail because it hasn't been created yet assert!(consumer_res.is_err()); // this will create the consumer if it does not exist already let consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?;
Consumers may be used for processing messages individually, with timeouts, or in batches:
use nats::jetstream::{AckPolicy, Consumer, ConsumerConfig}; let nc = nats::connect("my_server::4222")?; // this will create the consumer if it does not exist already. // consumer must be mut because the `process*` methods perform // message deduplication using an interval tree, which is // also publicly accessible via the `Consumer`'s `dedupe_window` // field. let mut consumer = Consumer::create_or_open(nc, "my_stream", "existing_or_created_consumer")?; // The `Consumer::process` method executes a closure // on both push- and pull-based consumers, and if // the closure returns `Ok` then the message is acked. // If no message is available, it will wait forever // for one to arrive. let msg_data_len: usize = consumer.process(|msg| { println!("got message {:?}", msg); Ok(msg.data.len()) })?; // Similar to `Consumer::process` except wait until the // consumer's `timeout` field for the message to arrive. // This can and should be set manually, as it has a low // default of 5ms. let msg_data_len: usize = consumer.process_timeout(|msg| { println!("got message {:?}", msg); Ok(msg.data.len()) })?; // For consumers operating with `AckPolicy::All`, batch // processing can provide nice throughput optimizations. // `Consumer::process_batch` will wait indefinitely for // the first message in a batch, then process // more messages until the configured timeout is expired. // It will batch acks if running with `AckPolicy::All`. // If there is an error with acking, the last item in the // returned `Vec` will be the io error. Terminates early // without acking if the closure returns an `Err`, which // is included in the final element of the `Vec`. If a // Timeout happens before the batch size is reached, then // there will be no errors included in the response `Vec`. let batch_size = 128; let results: Vec<std::io::Result<usize>> = consumer.process_batch(batch_size, |msg| { println!("got message {:?}", msg); Ok(msg.data.len()) }); let flipped: std::io::Result<Vec<usize>> = results.into_iter().collect(); let sizes: Vec<usize> = flipped?; // For lower-level control for use cases that are not // well-served by the high-level process* methods, // there are a number of lower level primitives that // can be used, such as `Consumer::pull` for pull-based // consumers and `Message::ack` for manually acking things: let msg = consumer.pull()?; // --- process message --- // tell the server the message has been processed msg.ack()?;
Structs
AccountInfo | contains info about the |
AccountLimits | Various limits imposed on a particular account. |
ApiStats | reports on API calls to |
ClusterInfo | Information about the consumer’s associated |
Consumer |
|
ConsumerConfig | Configuration for consumers. From a high level, the
|
ConsumerInfo | Information about a consumer |
DateTime | A UTC time |
IntervalTree | Records ranges of acknowledged IDs for low-memory deduplication. |
JetStreamMessageInfo | Information about a received message |
NextRequest | for getting next messages for pull based consumers. |
PagedIterator | An iterator over paged |
PurgeResponse | The response generated by trying ot purge a stream. |
SequencePair | Information about a consumer and the stream it is consuming |
StreamConfig |
|
StreamInfo | Shows config and current state for this stream. |
StreamState | information about the given stream. |
Enums
AckKind | The kinds of response used for acknowledging a processed message. |
AckPolicy | Determines whether messages will be acknowledged individually, in batches, or never. |
DeliverPolicy |
|
DiscardPolicy |
|
ReplayPolicy |
|
RetentionPolicy |
|
StorageType | determines how messages are stored for retention. |