async_nats/jetstream/message.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::header::{IntoHeaderName, IntoHeaderValue};
17use crate::subject::ToSubject;
18use crate::{error, header, message, Error, HeaderValue};
19use crate::{subject::Subject, HeaderMap};
20use bytes::Bytes;
21use futures_util::future::TryFutureExt;
22use futures_util::StreamExt;
23use std::fmt::Display;
24use std::{mem, time::Duration};
25use time::format_description::well_known::Rfc3339;
26use time::OffsetDateTime;
27
28/// A message received directly from the stream, without leveraging a consumer.
29#[derive(Debug, Clone)]
30pub struct StreamMessage {
31 pub subject: Subject,
32 pub sequence: u64,
33 pub headers: HeaderMap,
34 pub payload: Bytes,
35 pub time: OffsetDateTime,
36}
37
38/// An outbound message to be published.
39/// Does not contain status or description which are valid only for inbound messages.
40pub struct OutboundMessage {
41 pub subject: Subject,
42 pub payload: Bytes,
43 pub headers: Option<HeaderMap>,
44}
45
46impl OutboundMessage {
47 pub fn new(subject: Subject, payload: Bytes, headers: Option<HeaderMap>) -> Self {
48 Self {
49 subject,
50 payload,
51 headers,
52 }
53 }
54}
55
56impl From<OutboundMessage> for message::OutboundMessage {
57 fn from(message: OutboundMessage) -> Self {
58 message::OutboundMessage {
59 subject: message.subject,
60 payload: message.payload,
61 headers: message.headers,
62 reply: None,
63 }
64 }
65}
66
67/// Used for building customized `publish` message.
68#[derive(Default, Clone, Debug)]
69pub struct PublishMessage {
70 pub(crate) payload: Bytes,
71 pub(crate) headers: Option<header::HeaderMap>,
72}
73impl PublishMessage {
74 /// Creates a new custom Publish struct to be used with.
75 pub fn build() -> Self {
76 Default::default()
77 }
78
79 /// Sets the payload for the message.
80 pub fn payload(mut self, payload: Bytes) -> Self {
81 self.payload = payload;
82 self
83 }
84 /// Adds headers to the message.
85 pub fn headers(mut self, headers: HeaderMap) -> Self {
86 self.headers = Some(headers);
87 self
88 }
89 /// A shorthand to add a single header.
90 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
91 self.headers
92 .get_or_insert(header::HeaderMap::new())
93 .insert(name, value);
94 self
95 }
96 /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
97 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
98 self.header(header::NATS_MESSAGE_ID, id.as_ref())
99 }
100 /// Sets expected last message ID.
101 /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
102 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
103 self.header(
104 header::NATS_EXPECTED_LAST_MESSAGE_ID,
105 last_message_id.as_ref(),
106 )
107 }
108 /// Sets the last expected stream sequence.
109 /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
110 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
111 self.header(
112 header::NATS_EXPECTED_LAST_SEQUENCE,
113 HeaderValue::from(last_sequence),
114 )
115 }
116 /// Sets the last expected stream sequence for a subject this message will be published to.
117 /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
118 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
119 self.header(
120 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
121 HeaderValue::from(subject_sequence),
122 )
123 }
124 /// Sets the expected stream name.
125 /// It sets the `Nats-Expected-Stream` header with provided value.
126 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
127 self.header(
128 header::NATS_EXPECTED_STREAM,
129 HeaderValue::from(stream.as_ref()),
130 )
131 }
132
133 #[cfg(feature = "server_2_11")]
134 /// Sets TTL for a single message.
135 /// It sets the `Nats-TTL` header with provided value.
136 pub fn ttl(self, ttl: Duration) -> Self {
137 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
138 }
139
140 /// Creates an [crate::jetstream::message::OutboundMessage] that can be sent using
141 /// [crate::jetstream::context::traits::Publisher::publish_message].
142 pub fn outbound_message<S: ToSubject>(self, subject: S) -> OutboundMessage {
143 OutboundMessage {
144 subject: subject.to_subject(),
145 payload: self.payload,
146 headers: self.headers,
147 }
148 }
149}
150
151#[derive(Clone, Debug)]
152pub struct Message {
153 pub message: crate::Message,
154 pub context: Context,
155}
156
157impl TryFrom<crate::Message> for StreamMessage {
158 type Error = StreamMessageError;
159
160 fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
161 let headers = message.headers.ok_or_else(|| {
162 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
163 })?;
164
165 let sequence = headers
166 .get_last(header::NATS_SEQUENCE)
167 .ok_or_else(|| {
168 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
169 })
170 .and_then(|seq| {
171 seq.as_str().parse().map_err(|err| {
172 StreamMessageError::with_source(
173 StreamMessageErrorKind::ParseError,
174 format!("could not parse sequence header: {err}"),
175 )
176 })
177 })?;
178
179 let time = headers
180 .get_last(header::NATS_TIME_STAMP)
181 .ok_or_else(|| {
182 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
183 })
184 .and_then(|time| {
185 OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
186 StreamMessageError::with_source(
187 StreamMessageErrorKind::ParseError,
188 format!("could not parse timestamp header: {err}"),
189 )
190 })
191 })?;
192
193 let subject = headers
194 .get_last(header::NATS_SUBJECT)
195 .ok_or_else(|| {
196 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
197 })?
198 .as_str()
199 .into();
200
201 Ok(StreamMessage {
202 subject,
203 sequence,
204 headers,
205 payload: message.payload,
206 time,
207 })
208 }
209}
210
211#[derive(Debug, Clone, PartialEq)]
212pub enum StreamMessageErrorKind {
213 MissingHeader,
214 ParseError,
215}
216
217/// Error returned when library is unable to parse message got directly from the stream.
218pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
219
220impl Display for StreamMessageErrorKind {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 match self {
223 StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
224 StreamMessageErrorKind::ParseError => write!(f, "parse error"),
225 }
226 }
227}
228
229impl std::ops::Deref for Message {
230 type Target = crate::Message;
231
232 fn deref(&self) -> &Self::Target {
233 &self.message
234 }
235}
236
237impl From<Message> for crate::Message {
238 fn from(source: Message) -> crate::Message {
239 source.message
240 }
241}
242
243impl Message {
244 /// Splits [Message] into [Acker] and [crate::Message].
245 /// This can help reduce memory footprint if [Message] can be dropped before acking,
246 /// for example when it's transformed into another structure and acked later
247 pub fn split(mut self) -> (crate::Message, Acker) {
248 let reply = mem::take(&mut self.message.reply);
249 (
250 self.message,
251 Acker {
252 context: self.context,
253 reply,
254 },
255 )
256 }
257 /// Acknowledges a message delivery by sending `+ACK` to the server.
258 ///
259 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
260 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
261 ///
262 /// # Examples
263 ///
264 /// ```no_run
265 /// # #[tokio::main]
266 /// # async fn main() -> Result<(), async_nats::Error> {
267 /// use async_nats::jetstream::consumer::PullConsumer;
268 /// use futures_util::StreamExt;
269 /// let client = async_nats::connect("localhost:4222").await?;
270 /// let jetstream = async_nats::jetstream::new(client);
271 ///
272 /// let consumer: PullConsumer = jetstream
273 /// .get_stream("events")
274 /// .await?
275 /// .get_consumer("pull")
276 /// .await?;
277 ///
278 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
279 ///
280 /// while let Some(message) = messages.next().await {
281 /// message?.ack().await?;
282 /// }
283 /// # Ok(())
284 /// # }
285 /// ```
286 pub async fn ack(&self) -> Result<(), Error> {
287 if let Some(ref reply) = self.reply {
288 self.context
289 .client
290 .publish(reply.clone(), "".into())
291 .map_err(Error::from)
292 .await
293 } else {
294 Err(Box::new(std::io::Error::other(
295 "No reply subject, not a JetStream message",
296 )))
297 }
298 }
299
300 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// # #[tokio::main]
306 /// # async fn main() -> Result<(), async_nats::Error> {
307 /// use async_nats::jetstream::consumer::PullConsumer;
308 /// use async_nats::jetstream::AckKind;
309 /// use futures_util::StreamExt;
310 /// let client = async_nats::connect("localhost:4222").await?;
311 /// let jetstream = async_nats::jetstream::new(client);
312 ///
313 /// let consumer: PullConsumer = jetstream
314 /// .get_stream("events")
315 /// .await?
316 /// .get_consumer("pull")
317 /// .await?;
318 ///
319 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
320 ///
321 /// while let Some(message) = messages.next().await {
322 /// message?.ack_with(AckKind::Nak(None)).await?;
323 /// }
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
328 if let Some(ref reply) = self.reply {
329 self.context
330 .client
331 .publish(reply.to_owned(), kind.into())
332 .map_err(Error::from)
333 .await
334 } else {
335 Err(Box::new(std::io::Error::other(
336 "No reply subject, not a JetStream message",
337 )))
338 }
339 }
340
341 /// Acknowledges a message delivery by sending a chosen [AckKind] to the server
342 /// and awaits for confirmation for the server that it received the message.
343 /// Useful if user wants to ensure `exactly once` semantics.
344 ///
345 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
346 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
347 ///
348 /// # Examples
349 ///
350 /// ```no_run
351 /// # #[tokio::main]
352 /// # async fn main() -> Result<(), async_nats::Error> {
353 /// use async_nats::jetstream::AckKind;
354 /// use futures_util::StreamExt;
355 /// let client = async_nats::connect("localhost:4222").await?;
356 /// let jetstream = async_nats::jetstream::new(client);
357 ///
358 /// let consumer = jetstream
359 /// .get_stream("events")
360 /// .await?
361 /// .get_consumer("pull")
362 /// .await?;
363 ///
364 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
365 ///
366 /// while let Some(message) = messages.next().await {
367 /// message?.double_ack_with(AckKind::Ack).await?;
368 /// }
369 /// # Ok(())
370 /// # }
371 /// ```
372 pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
373 if let Some(ref reply) = self.reply {
374 let inbox = self.context.client.new_inbox();
375 let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
376 self.context
377 .client
378 .publish_with_reply(reply.clone(), inbox, ack_kind.into())
379 .await?;
380 match tokio::time::timeout(self.context.timeout, subscription.next())
381 .await
382 .map_err(|_| {
383 std::io::Error::new(
384 std::io::ErrorKind::TimedOut,
385 "double ack response timed out",
386 )
387 })? {
388 Some(_) => Ok(()),
389 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
390 }
391 } else {
392 Err(Box::new(std::io::Error::other(
393 "No reply subject, not a JetStream message",
394 )))
395 }
396 }
397
398 /// Acknowledges a message delivery by sending `+ACK` to the server
399 /// and awaits for confirmation for the server that it received the message.
400 /// Useful if user wants to ensure `exactly once` semantics.
401 ///
402 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
403 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
404 ///
405 /// # Examples
406 ///
407 /// ```no_run
408 /// # #[tokio::main]
409 /// # async fn main() -> Result<(), async_nats::Error> {
410 /// use futures_util::StreamExt;
411 /// let client = async_nats::connect("localhost:4222").await?;
412 /// let jetstream = async_nats::jetstream::new(client);
413 ///
414 /// let consumer = jetstream
415 /// .get_stream("events")
416 /// .await?
417 /// .get_consumer("pull")
418 /// .await?;
419 ///
420 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
421 ///
422 /// while let Some(message) = messages.next().await {
423 /// message?.double_ack().await?;
424 /// }
425 /// # Ok(())
426 /// # }
427 /// ```
428 pub async fn double_ack(&self) -> Result<(), Error> {
429 self.double_ack_with(AckKind::Ack).await
430 }
431
432 /// Returns the `JetStream` message ID
433 /// if this is a `JetStream` message.
434 #[allow(clippy::mixed_read_write_in_expression)]
435 pub fn info(&self) -> Result<Info<'_>, Error> {
436 const PREFIX: &str = "$JS.ACK.";
437 const SKIP: usize = PREFIX.len();
438
439 let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
440 std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
441 })?;
442
443 if !reply.starts_with(PREFIX) {
444 return Err(Box::new(std::io::Error::other(
445 "did not found proper prefix",
446 )));
447 }
448
449 reply = &reply[SKIP..];
450
451 let mut split = reply.split('.');
452
453 // we should avoid allocating to prevent
454 // large performance degradations in
455 // parsing this.
456 let mut tokens: [Option<&str>; 10] = [None; 10];
457 let mut n_tokens = 0;
458 for each_token in &mut tokens {
459 if let Some(token) = split.next() {
460 *each_token = Some(token);
461 n_tokens += 1;
462 }
463 }
464
465 let mut token_index = 0;
466
467 macro_rules! try_parse {
468 () => {
469 match str::parse(try_parse!(str)) {
470 Ok(parsed) => parsed,
471 Err(e) => {
472 return Err(Box::new(e));
473 }
474 }
475 };
476 (str) => {
477 if let Some(next) = tokens[token_index].take() {
478 #[allow(unused)]
479 {
480 // this isn't actually unused, but it's
481 // difficult for the compiler to infer this.
482 token_index += 1;
483 }
484 next
485 } else {
486 return Err(Box::new(std::io::Error::other("too few tokens")));
487 }
488 };
489 }
490
491 // now we can try to parse the tokens to
492 // individual types. We use an if-else
493 // chain instead of a match because it
494 // produces more optimal code usually,
495 // and we want to try the 9 (11 - the first 2)
496 // case first because we expect it to
497 // be the most common. We use >= to be
498 // future-proof.
499 if n_tokens >= 9 {
500 Ok(Info {
501 domain: {
502 let domain: &str = try_parse!(str);
503 if domain == "_" {
504 None
505 } else {
506 Some(domain)
507 }
508 },
509 acc_hash: Some(try_parse!(str)),
510 stream: try_parse!(str),
511 consumer: try_parse!(str),
512 delivered: try_parse!(),
513 stream_sequence: try_parse!(),
514 consumer_sequence: try_parse!(),
515 published: {
516 let nanos: i128 = try_parse!();
517 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
518 },
519 pending: try_parse!(),
520 token: if n_tokens >= 9 {
521 Some(try_parse!(str))
522 } else {
523 None
524 },
525 })
526 } else if n_tokens == 7 {
527 // we expect this to be increasingly rare, as older
528 // servers are phased out.
529 Ok(Info {
530 domain: None,
531 acc_hash: None,
532 stream: try_parse!(str),
533 consumer: try_parse!(str),
534 delivered: try_parse!(),
535 stream_sequence: try_parse!(),
536 consumer_sequence: try_parse!(),
537 published: {
538 let nanos: i128 = try_parse!();
539 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
540 },
541 pending: try_parse!(),
542 token: None,
543 })
544 } else {
545 Err(Box::new(std::io::Error::other("bad token number")))
546 }
547 }
548}
549
550/// A lightweight struct useful for decoupling message contents and the ability to ack it.
551pub struct Acker {
552 context: Context,
553 reply: Option<Subject>,
554}
555
556// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
557// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
558// Creating separate function to ack just to avoid one duplication is not worth it either.
559impl Acker {
560 pub fn new(context: Context, reply: Option<Subject>) -> Self {
561 Self { context, reply }
562 }
563 /// Acknowledges a message delivery by sending `+ACK` to the server.
564 ///
565 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
566 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
567 ///
568 /// # Examples
569 ///
570 /// ```no_run
571 /// # #[tokio::main]
572 /// # async fn main() -> Result<(), async_nats::Error> {
573 /// use async_nats::jetstream::{consumer::PullConsumer, Message};
574 /// use futures_util::StreamExt;
575 /// let client = async_nats::connect("localhost:4222").await?;
576 /// let jetstream = async_nats::jetstream::new(client);
577 ///
578 /// let consumer: PullConsumer = jetstream
579 /// .get_stream("events")
580 /// .await?
581 /// .get_consumer("pull")
582 /// .await?;
583 ///
584 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
585 ///
586 /// while let Some(message) = messages.next().await {
587 /// let (message, acker) = message.map(Message::split)?;
588 /// // Do something with the message. Ownership can be taken over `Message`
589 /// // while retaining ability to ack later.
590 /// println!("message: {:?}", message);
591 /// // Ack it. `Message` may be dropped already.
592 /// acker.ack().await?;
593 /// }
594 /// # Ok(())
595 /// # }
596 /// ```
597 pub async fn ack(&self) -> Result<(), Error> {
598 if let Some(ref reply) = self.reply {
599 self.context
600 .client
601 .publish(reply.to_owned(), "".into())
602 .map_err(Error::from)
603 .await
604 } else {
605 Err(Box::new(std::io::Error::other(
606 "No reply subject, not a JetStream message",
607 )))
608 }
609 }
610
611 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
612 ///
613 /// # Examples
614 ///
615 /// ```no_run
616 /// # #[tokio::main]
617 /// # async fn main() -> Result<(), async_nats::Error> {
618 /// use async_nats::jetstream::{consumer::PullConsumer, AckKind, Message};
619 /// use futures_util::StreamExt;
620 /// let client = async_nats::connect("localhost:4222").await?;
621 /// let jetstream = async_nats::jetstream::new(client);
622 ///
623 /// let consumer: PullConsumer = jetstream
624 /// .get_stream("events")
625 /// .await?
626 /// .get_consumer("pull")
627 /// .await?;
628 ///
629 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
630 ///
631 /// while let Some(message) = messages.next().await {
632 /// let (message, acker) = message.map(Message::split)?;
633 /// // Do something with the message. Ownership can be taken over `Message`.
634 /// // while retaining ability to ack later.
635 /// println!("message: {:?}", message);
636 /// // Ack it. `Message` may be dropped already.
637 /// acker.ack_with(AckKind::Nak(None)).await?;
638 /// }
639 /// # Ok(())
640 /// # }
641 /// ```
642 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
643 if let Some(ref reply) = self.reply {
644 self.context
645 .client
646 .publish(reply.to_owned(), kind.into())
647 .map_err(Error::from)
648 .await
649 } else {
650 Err(Box::new(std::io::Error::other(
651 "No reply subject, not a JetStream message",
652 )))
653 }
654 }
655
656 /// Acknowledges a message delivery by sending the chosen [AckKind] to the server
657 /// and awaits for confirmation for the server that it received the message.
658 /// Useful if user wants to ensure `exactly once` semantics.
659 ///
660 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
661 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
662 ///
663 /// # Examples
664 ///
665 /// ```no_run
666 /// # #[tokio::main]
667 /// # async fn main() -> Result<(), async_nats::Error> {
668 /// use async_nats::jetstream::{AckKind, Message};
669 /// use futures_util::StreamExt;
670 /// let client = async_nats::connect("localhost:4222").await?;
671 /// let jetstream = async_nats::jetstream::new(client);
672 ///
673 /// let consumer = jetstream
674 /// .get_stream("events")
675 /// .await?
676 /// .get_consumer("pull")
677 /// .await?;
678 ///
679 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
680 ///
681 /// while let Some(message) = messages.next().await {
682 /// let (message, acker) = message.map(Message::split)?;
683 /// // Do something with the message. Ownership can be taken over `Message`.
684 /// // while retaining ability to ack later.
685 /// println!("message: {:?}", message);
686 /// // Ack it. `Message` may be dropped already.
687 /// acker.double_ack_with(AckKind::Ack).await?;
688 /// }
689 /// # Ok(())
690 /// # }
691 /// ```
692 pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
693 if let Some(ref reply) = self.reply {
694 let inbox = self.context.client.new_inbox();
695 let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
696 self.context
697 .client
698 .publish_with_reply(reply.to_owned(), inbox, ack_kind.into())
699 .await?;
700 match tokio::time::timeout(self.context.timeout, subscription.next())
701 .await
702 .map_err(|_| {
703 std::io::Error::new(
704 std::io::ErrorKind::TimedOut,
705 "double ack response timed out",
706 )
707 })? {
708 Some(_) => Ok(()),
709 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
710 }
711 } else {
712 Err(Box::new(std::io::Error::other(
713 "No reply subject, not a JetStream message",
714 )))
715 }
716 }
717
718 /// Acknowledges a message delivery by sending `+ACK` to the server
719 /// and awaits for confirmation for the server that it received the message.
720 /// Useful if user wants to ensure `exactly once` semantics.
721 ///
722 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
723 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
724 ///
725 /// # Examples
726 ///
727 /// ```no_run
728 /// # #[tokio::main]
729 /// # async fn main() -> Result<(), async_nats::Error> {
730 /// use async_nats::jetstream::Message;
731 /// use futures_util::StreamExt;
732 /// let client = async_nats::connect("localhost:4222").await?;
733 /// let jetstream = async_nats::jetstream::new(client);
734 ///
735 /// let consumer = jetstream
736 /// .get_stream("events")
737 /// .await?
738 /// .get_consumer("pull")
739 /// .await?;
740 ///
741 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
742 ///
743 /// while let Some(message) = messages.next().await {
744 /// let (message, acker) = message.map(Message::split)?;
745 /// // Do something with the message. Ownership can be taken over `Message`.
746 /// // while retaining ability to ack later.
747 /// println!("message: {:?}", message);
748 /// // Ack it. `Message` may be dropped already.
749 /// acker.double_ack().await?;
750 /// }
751 /// # Ok(())
752 /// # }
753 /// ```
754 pub async fn double_ack(&self) -> Result<(), Error> {
755 self.double_ack_with(AckKind::Ack).await
756 }
757}
758/// The kinds of response used for acknowledging a processed message.
759#[derive(Debug, Clone, Copy)]
760pub enum AckKind {
761 /// Acknowledges a message was completely handled.
762 Ack,
763 /// Signals that the message will not be processed now
764 /// and processing can move onto the next message, NAK'd
765 /// message will be retried.
766 Nak(Option<Duration>),
767 /// When sent before the AckWait period indicates that
768 /// work is ongoing and the period should be extended by
769 /// another equal to AckWait.
770 Progress,
771 /// Acknowledges the message was handled and requests
772 /// delivery of the next message to the reply subject.
773 /// Only applies to Pull-mode.
774 Next,
775 /// Instructs the server to stop redelivery of a message
776 /// without acknowledging it as successfully processed.
777 Term,
778}
779
780impl From<AckKind> for Bytes {
781 fn from(kind: AckKind) -> Self {
782 use AckKind::*;
783 match kind {
784 Ack => Bytes::from_static(b"+ACK"),
785 Nak(maybe_duration) => match maybe_duration {
786 None => Bytes::from_static(b"-NAK"),
787 Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
788 },
789 Progress => Bytes::from_static(b"+WPI"),
790 Next => Bytes::from_static(b"+NXT"),
791 Term => Bytes::from_static(b"+TERM"),
792 }
793 }
794}
795
796/// Information about a received message
797#[derive(Debug, Clone)]
798pub struct Info<'a> {
799 /// Optional domain, present in servers post-ADR-15
800 pub domain: Option<&'a str>,
801 /// Optional account hash, present in servers post-ADR-15
802 pub acc_hash: Option<&'a str>,
803 /// The stream name
804 pub stream: &'a str,
805 /// The consumer name
806 pub consumer: &'a str,
807 /// The stream sequence number associated with this message
808 pub stream_sequence: u64,
809 /// The consumer sequence number associated with this message
810 pub consumer_sequence: u64,
811 /// The number of delivery attempts for this message
812 pub delivered: i64,
813 /// the number of messages known by the server to be pending to this consumer
814 pub pending: u64,
815 /// the time that this message was received by the server from its publisher
816 pub published: time::OffsetDateTime,
817 /// Optional token, present in servers post-ADR-15
818 pub token: Option<&'a str>,
819}