pub struct Consumer<T: IntoConsumerConfig> { /* private fields */ }Implementations
sourceimpl Consumer<Config>
impl Consumer<Config>
sourcepub fn stream(&self) -> Result<Stream<'_>, Error>
pub fn stream(&self) -> Result<Stream<'_>, Error>
Returns a stream of message request results
Example
use futures::StreamExt;
use futures::TryStreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let stream = jetstream.get_or_create_stream(async_nats::jetstream::stream::Config {
name: "events".to_string(),
max_messages: 10_000,
..Default::default()
}).await?;
jetstream.publish("events".to_string(), "data".into()).await?;
let consumer = stream.get_or_create_consumer("consumer", async_nats::jetstream::consumer::pull::Config {
durable_name: Some("consumer".to_string()),
..Default::default()
}).await?;
let mut messages = consumer.stream()?.take(100);
while let Some(Ok(message)) = messages.next().await {
println!("got message {:?}", message);
message.ack().await?;
}
Ok(())pub async fn fetch(&self, batch: usize) -> Result<Batch, Error>
pub async fn batch(
&self,
batch: usize,
expires: Option<usize>
) -> Result<Batch, Error>
pub fn sequence(&self, batch: usize) -> Result<Sequence<'_>, Error>
sourceimpl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
sourceimpl<T: IntoConsumerConfig> Consumer<T>
impl<T: IntoConsumerConfig> Consumer<T>
sourcepub async fn info(&mut self) -> Result<&Info, Error>
pub async fn info(&mut self) -> Result<&Info, Error>
Retrieves info about Consumer from the server, updates the cached info inside
Consumer and returns it.
Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let mut consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let info = consumer.info().await?;sourcepub fn cached_info(&self) -> &Info
pub fn cached_info(&self) -> &Info
Returns cached Info for the Consumer. Cache is either from initial creation/retrival of the Consumer or last call to Consumer::info.
Examples
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let info = consumer.cached_info();Auto Trait Implementations
impl<T> !RefUnwindSafe for Consumer<T>
impl<T> Send for Consumer<T> where
T: Send,
impl<T> Sync for Consumer<T> where
T: Sync,
impl<T> Unpin for Consumer<T> where
T: Unpin,
impl<T> !UnwindSafe for Consumer<T>
Blanket Implementations
sourceimpl<T> BorrowMut<T> for T where
T: ?Sized,
impl<T> BorrowMut<T> for T where
T: ?Sized,
const: unstable · sourcefn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more