pub struct Message {
pub message: Message,
pub context: Context,
}
Fields§
§message: Message
§context: Context
Implementations§
Source§impl Message
impl Message
Sourcepub async fn ack(&self) -> Result<(), Error>
pub async fn ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use futures::StreamExt;
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack().await?;
}
Sourcepub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
Acknowledges a message delivery by sending a chosen AckKind variant to the server.
§Examples
use futures::StreamExt;
use async_nats::jetstream::AckKind;
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack_with(AckKind::Nak).await?;
}
Sourcepub async fn double_ack(&self) -> Result<(), Error>
pub async fn double_ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server
and awaits for confirmation for the server that it received the message.
Useful if user wants to ensure exactly once
semantics.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
§Examples
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.double_ack().await?;
}
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for Message
impl !RefUnwindSafe for Message
impl Send for Message
impl Sync for Message
impl Unpin for Message
impl !UnwindSafe for Message
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more