pub struct Stream { /* private fields */ }
Expand description

Handle to operations that can be performed on a Stream.

Implementations§

Retrieves info about Stream from the server, updates the cached info inside Stream and returns it.

Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream
    .get_stream("events").await?;

let info = stream.info().await?;

Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.

Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_stream("events").await?;

let info = stream.cached_info();

Gets next message for a Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    subjects: vec!["events.>".to_string()],
    allow_direct: true,
    ..Default::default()
}).await?;

jetstream.publish("events.data".into(), "data".into()).await?;
let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;

let message =  stream
    .direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence)).await?;

Gets first message from Stream.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    subjects: vec!["events.>".to_string()],
    allow_direct: true,
    ..Default::default()
}).await?;

let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;

let message =  stream.direct_get_first_for_subject("events.data").await?;

Gets message from Stream with given sequence id.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    subjects: vec!["events.>".to_string()],
    allow_direct: true,
    ..Default::default()
}).await?;

let pub_ack = jetstream.publish("events.data".into(), "data".into()).await?;

let message =  stream.direct_get(pub_ack.await?.sequence).await?;

Gets last message for a given subject.

Requires a Stream with allow_direct set to true. This is different from Stream::get_raw_message, as it can fetch Message from any replica member. This means read after write is possible, as that given replica might not yet catch up with the leader.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    subjects: vec!["events.>".to_string()],
    allow_direct: true,
    ..Default::default()
}).await?;

jetstream.publish("events.data".into(), "data".into()).await?;

let message =  stream.direct_get_last_for_subject("events.data").await?;

Get a raw message from the stream.

Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);

Get the last raw message from the stream by subject.

Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events").await?;
println!("Retrieved raw message {:?}", raw_message);

Delete a message from the stream.

Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;

Purge Stream messages.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge().await?;
👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.

Purge Stream messages for a matching subject.

Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;

Create a new Durable or Ephemeral Consumer (if durable_name was not provided) and returns the info from the server about created Consumer

Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.create_consumer(consumer::pull::Config {
    durable_name: Some("pull".to_string()),
    ..Default::default()
}).await?;

Retrieve Info about Consumer from the server.

Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;

Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;

Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.

Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer = stream.get_or_create_consumer("pull", consumer::pull::Config {
    durable_name: Some("pull".to_string()),
    ..Default::default()
}).await?;

Delete a Consumer from the server.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

jetstream.get_stream("events").await?
    .delete_consumer("pull").await?;

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Should always be Self
The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more