pub struct Stream { /* private fields */ }Expand description
Handle to operations that can be performed on a Stream.
Implementations
sourceimpl Stream
impl Stream
sourcepub async fn info(&mut self) -> Result<&Info, Error>
pub async fn info(&mut self) -> Result<&Info, Error>
Retrieves info about Stream from the server, updates the cached info inside
Stream and returns it.
Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut stream = jetstream
.get_stream("events").await?;
let info = stream.info().await?;sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Stream. Cache is either from initial creation/retrival of the Stream or last call to Stream::info.
Examples
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream
.get_stream("events").await?;
let info = stream.cached_info();sourcepub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, Error>
pub async fn get_raw_message(&self, sequence: u64) -> Result<RawMessage, Error>
Get a raw message from the stream.
Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_raw_message(publish_ack.sequence).await?;
println!("Retreived raw message {:?}", raw_message);sourcepub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str
) -> Result<RawMessage, Error>
pub async fn get_last_raw_message_by_subject(
&self,
stream_subject: &str
) -> Result<RawMessage, Error>
Get the last raw message from the stream by subject.
Examples
#[tokio::main]
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
let raw_message = stream.get_last_raw_message_by_subject("events".into()).await?;
println!("Retreived raw message {:?}", raw_message);sourcepub async fn delete_message(&self, sequence: u64) -> Result<bool, Error>
pub async fn delete_message(&self, sequence: u64) -> Result<bool, Error>
Delete a message from the stream.
Examples
let client = async_nats::connect("localhost:4222").await?;
let context = async_nats::jetstream::new(client);
let stream = context.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
let publish_ack = context.publish("events".to_string(), "data".into()).await?;
stream.delete_message(publish_ack.sequence).await?;sourcepub async fn purge(&self) -> Result<PurgeResponse, Error>
pub async fn purge(&self) -> Result<PurgeResponse, Error>
Purge Stream messages.
Examples
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect(“demo.nats.io”).await?; let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream(“events”).await?; stream.purge().await?;
Ok(())
}
sourcepub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, Error> where
T: Into<String>,
pub async fn purge_subject<T>(&self, subject: T) -> Result<PurgeResponse, Error> where
T: Into<String>,
Purge Stream messages for a matching subject.
Examples
#[tokio::main]
async fn main() -> Result<(), async_nats::Error> {
let client = async_nats::connect(“demo.nats.io”).await?; let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream(“events”).await?; stream.purge_subject(“data”).await?;
Ok(())
}
sourcepub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C
) -> Result<Consumer<C>, Error>
pub async fn create_consumer<C: IntoConsumerConfig + FromConsumer>(
&self,
config: C
) -> Result<Consumer<C>, Error>
Create a new Durable or Ephemeral Consumer (if durable_name was not provided) and
returns the info from the server about created Consumer
Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.create_consumer(consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
}).await?;sourcepub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
pub async fn consumer_info<T: AsRef<str>>(&self, name: T) -> Result<Info, Error>
Retrieve Info about Consumer from the server.
Examples
use async_nats::jetstream::consumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let info = stream.consumer_info("pull").await?;sourcepub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str
) -> Result<Consumer<T>, Error>
pub async fn get_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str
) -> Result<Consumer<T>, Error>
Get Consumer from the the server. Consumer iterators can be used to retrieve Messages for a given Consumer.
Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer: consumer::PullConsumer = stream.get_consumer("pull").await?;sourcepub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T
) -> Result<Consumer<T>, Error>
pub async fn get_or_create_consumer<T: FromConsumer + IntoConsumerConfig>(
&self,
name: &str,
config: T
) -> Result<Consumer<T>, Error>
Create a Consumer with the given configuration if it is not present on the server. Returns a handle to the Consumer.
Note: This does not validate if the Consumer on the server is compatible with the configuration passed in except Push/Pull compatibility.
Examples
use async_nats::jetstream::consumer;
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_stream("events").await?;
let consumer = stream.get_or_create_consumer("pull", consumer::pull::Config {
durable_name: Some("pull".to_string()),
..Default::default()
}).await?;sourcepub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, Error>
pub async fn delete_consumer(&self, name: &str) -> Result<DeleteStatus, Error>
Trait Implementations
Auto Trait Implementations
impl !RefUnwindSafe for Stream
impl Send for Stream
impl Sync for Stream
impl Unpin for Stream
impl !UnwindSafe for Stream
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more
sourceimpl<T> Instrument for T
impl<T> Instrument for T
sourcefn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
sourcefn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
impl<V, T> VZip<V> for T where
V: MultiLane<T>,
fn vzip(self) -> V
sourceimpl<T> WithSubscriber for T
impl<T> WithSubscriber for T
sourcefn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self> where
S: Into<Dispatch>,
Attaches the provided Subscriber to this type, returning a
WithDispatch wrapper. Read more
sourcefn with_current_subscriber(self) -> WithDispatch<Self>
fn with_current_subscriber(self) -> WithDispatch<Self>
Attaches the current default Subscriber to this type, returning a
WithDispatch wrapper. Read more