pub struct Stream { /* private fields */ }
Expand description

Handle to operations that can be performed on a Stream.

Implementations

Retrieves info about Stream from the server, updates the cached info inside Stream and returns it.

Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let mut stream = jetstream
    .get_stream("events").await?;

let info = stream.info().await?;

Returns cached Info for the Stream. Cache is either from initial creation/retrival of the Stream or last call to Stream::info.

Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream
    .get_stream("events").await?;

let info = stream.cached_info();

Get a raw message from the stream.

Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.sequence).await?;
println!("Retreived raw message {:?}", raw_message);

Get the last raw message from the stream by subject.

Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;

let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events".into()).await?;
println!("Retreived raw message {:?}", raw_message);

Delete a message from the stream.

Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);

let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
    name: "events".to_string(),
    max_messages: 10_000,
    ..Default::default()
}).await?;

let publish_ack = context.publish("events".to_string(), "data".into()).await?;
stream.delete_message(publish_ack.sequence).await?;

Purge Stream messages.

Examples
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

let client = async_nats::connect(“demo.nats.io”).await?; let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream(“events”).await?; stream.purge().await?;

Ok(())
}

Purge Stream messages for a matching subject.

Examples
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {

let client = async_nats::connect(“demo.nats.io”).await?; let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream(“events”).await?; stream.purge_subject(“data”).await?;

Ok(())
}

Create a new Durable or Ephemeral Consumer (if durable_name was not provided) and returns the info from the server about created Consumer

Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.create_consumer(consumer::pull::Config {
    durable_name: Some("pull".to_string()),
    ..Default::default()
}).await?;

Retrieve Info about Consumer from the server.

Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;

Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;

Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.

Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

let stream = jetstream.get_stream("events").await?;
let consumer = stream.get_or_create_consumer("pull", consumer::pull::Config {
    durable_name: Some("pull".to_string()),
    ..Default::default()
}).await?;

Delete a Consumer from the server.

Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);

jetstream.get_stream("events").await?
    .delete_consumer("pull").await?;

Trait Implementations

Formats the value using the given formatter. Read more

Auto Trait Implementations

Blanket Implementations

Gets the TypeId of self. Read more

Immutably borrows from an owned value. Read more

Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more

Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Should always be Self

The type returned in the event of a conversion error.

Performs the conversion.

The type returned in the event of a conversion error.

Performs the conversion.

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more