Struct async_nats::jetstream::message::Message
source · pub struct Message {
pub message: Message,
pub context: Context,
}
Fields§
§message: Message
§context: Context
Implementations§
source§impl Message
impl Message
sourcepub async fn ack(&self) -> Result<(), Error>
pub async fn ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
Examples
use futures::StreamExt;
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack().await?;
}
sourcepub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error>
Acknowledges a message delivery by sending a chosen AckKind variant to the server.
Examples
use futures::StreamExt;
use async_nats::jetstream::AckKind;
use async_nats::jetstream::consumer::PullConsumer;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer: PullConsumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.ack_with(AckKind::Nak(None)).await?;
}
sourcepub async fn double_ack(&self) -> Result<(), Error>
pub async fn double_ack(&self) -> Result<(), Error>
Acknowledges a message delivery by sending +ACK
to the server
and awaits for confirmation for the server that it received the message.
Useful if user wants to ensure exactly once
semantics.
If AckPolicy is set to All
or Explicit
, messages has to be acked.
Otherwise redeliveries will occur and Consumer will not be able to advance.
Examples
use futures::StreamExt;
let client = async_nats::connect("localhost:4222").await?;
let jetstream = async_nats::jetstream::new(client);
let consumer = jetstream
.get_stream("events").await?
.get_consumer("pull").await?;
let mut messages = consumer.fetch().max_messages(100).messages().await?;
while let Some(message) = messages.next().await {
message?.double_ack().await?;
}