Struct async_nats::jetstream::stream::Stream
source · pub struct Stream { /* private fields */ }
Expand description
Handle to operations that can be performed on a Stream
.
Implementations§
source§impl Stream
impl Stream
sourcepub async fn info(&mut self) -> Result<&Info, InfoError>
pub async fn info(&mut self) -> Result<&Info, InfoError>
Retrieves info
about Stream from the server, updates the cached info
inside
Stream and returns it.
Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream.get_stream("events").await?;
let info = stream.info().await?;
sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Stream. Cache is either from initial creation/retrieval of the Stream or last call to Stream::info.
Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.cached_info();
sourcepub async fn direct_get_next_for_subject<T: AsRef<str>>(
&self,
subject: T,
sequence: Option<u64>
) -> Result<Message, DirectGetError>
pub async fn direct_get_next_for_subject<T: AsRef<str>>( &self, subject: T, sequence: Option<u64> ) -> Result<Message, DirectGetError>
Gets next message for a Stream.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream
.publish("events.data".into(), "data".into())
.await?;
let pub_ack = jetstream
.publish("events.data".into(), "data".into())
.await?;
let message = stream
.direct_get_next_for_subject("events.data", Some(pub_ack.await?.sequence))
.await?;
sourcepub async fn direct_get_first_for_subject<T: AsRef<str>>(
&self,
subject: T
) -> Result<Message, DirectGetError>
pub async fn direct_get_first_for_subject<T: AsRef<str>>( &self, subject: T ) -> Result<Message, DirectGetError>
Gets first message from Stream.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream
.publish("events.data".into(), "data".into())
.await?;
let message = stream.direct_get_first_for_subject("events.data").await?;
sourcepub async fn direct_get(&self, sequence: u64) -> Result<Message, DirectGetError>
pub async fn direct_get(&self, sequence: u64) -> Result<Message, DirectGetError>
Gets message from Stream with given sequence id
.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
let pub_ack = jetstream
.publish("events.data".into(), "data".into())
.await?;
let message = stream.direct_get(pub_ack.await?.sequence).await?;
sourcepub async fn direct_get_last_for_subject<T: AsRef<str>>(
&self,
subject: T
) -> Result<Message, DirectGetError>
pub async fn direct_get_last_for_subject<T: AsRef<str>>( &self, subject: T ) -> Result<Message, DirectGetError>
Gets last message for a given subject
.
Requires a Stream with allow_direct
set to true
.
This is different from Stream::get_raw_message, as it can fetch Message
from any replica member. This means read after write is possible,
as that given replica might not yet catch up with the leader.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
subjects: vec!["events.>".to_string()],
allow_direct: true,
..Default::default()
})
.await?;
jetstream
.publish("events.data".into(), "data".into())
.await?;
let message = stream.direct_get_last_for_subject("events.data").await?;
sourcepub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, Error>
pub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, Error>
Get a raw message from the stream.
Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.await?.sequence).await?;
println!("Retrieved raw message {:?}", raw_message);
sourcepub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str
) -> Result<RawMessage, LastRawMessageError>
pub async fn get_last_raw_message_by_subject( &self, stream_subject: &str ) -> Result<RawMessage, LastRawMessageError>
Get the last raw message from the stream by subject.
Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events").await?;
println!("Retrieved raw message {:?}", raw_message);
sourcepub async fn delete_message(
&self,
sequence: u64
) -> Result<bool, DeleteMessageError>
pub async fn delete_message( &self, sequence: u64 ) -> Result<bool, DeleteMessageError>
Delete a message from the stream.
Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context
.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
})
.await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
stream.delete_message(publish_ack.await?.sequence).await?;
sourcepub fn purge(&self) -> Purge<'_, No, No>
pub fn purge(&self) -> Purge<'_, No, No>
Purge Stream
messages.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge().await?;
sourcepub async fn purge_subject<T>(
&self,
subject: T
) -> Result<PurgeResponse, PurgeError>where
T: Into<String>,
👎Deprecated since 0.25.0: Overloads have been replaced with an into_future based builder. Use Stream::purge().filter(subject) instead.
pub async fn purge_subject<T>( &self, subject: T ) -> Result<PurgeResponse, PurgeError>where T: Into<String>,
Purge Stream
messages for a matching subject.
Examples
let client = async_nats::connect("demo.nats.io").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
stream.purge_subject("data").await?;
sourcepub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C
) -> Result<Consumer<C>, ConsumerError>
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>( &self, config: C ) -> Result<Consumer<C>, ConsumerError>
Create a new Durable
or Ephemeral
Consumer (if durable_name
was not provided) and
returns the info from the server about created Consumer
Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream
.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
})
.await?;
sourcepub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
pub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
Retrieve Info about Consumer from the server.
Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;
sourcepub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str
) -> Result<Consumer<T>, Error>
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str ) -> Result<Consumer<T>, Error>
Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.
Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;
sourcepub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T
) -> Result<Consumer<T>, ConsumerError>
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>( &self, name: &str, config: T ) -> Result<Consumer<T>, ConsumerError>
Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.
Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.
Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer = stream
.get_or_create_consumer(
"pull",
consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
},
)
.await?;
sourcepub async fn delete_consumer(
&self,
name: &str
) -> Result<DeleteStatus, ConsumerError>
pub async fn delete_consumer( &self, name: &str ) -> Result<DeleteStatus, ConsumerError>
sourcepub fn consumer_names(&self) -> ConsumerNames<'_>
pub fn consumer_names(&self) -> ConsumerNames<'_>
Lists names of all consumers for current stream.
Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut names = stream.consumer_names();
while let Some(consumer) = names.try_next().await? {
println!("consumer: {stream:?}");
}
sourcepub fn consumers(&self) -> Consumers<'_>
pub fn consumers(&self) -> Consumers<'_>
Lists all consumers info for current stream.
Examples
use futures::TryStreamExt;
let client = async_nats::connect("demo.nats.io:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("stream").await?;
let mut consumers = stream.consumers();
while let Some(consumer) = consumers.try_next().await? {
println!("consumer: {consumer:?}");
}