async_nats/jetstream/
message.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::header::{IntoHeaderName, IntoHeaderValue};
17use crate::subject::ToSubject;
18use crate::{error, header, message, Error, HeaderValue};
19use crate::{subject::Subject, HeaderMap};
20use bytes::Bytes;
21use futures_util::future::TryFutureExt;
22use futures_util::StreamExt;
23use std::fmt::Display;
24use std::{mem, time::Duration};
25use time::format_description::well_known::Rfc3339;
26use time::OffsetDateTime;
27
28/// A message received directly from the stream, without leveraging a consumer.
29#[derive(Debug, Clone)]
30pub struct StreamMessage {
31    pub subject: Subject,
32    pub sequence: u64,
33    pub headers: HeaderMap,
34    pub payload: Bytes,
35    pub time: OffsetDateTime,
36}
37
38/// An outbound message to be published.
39/// Does not contain status or description which are valid only for inbound messages.
40pub struct OutboundMessage {
41    pub subject: Subject,
42    pub payload: Bytes,
43    pub headers: Option<HeaderMap>,
44}
45
46impl OutboundMessage {
47    pub fn new(subject: Subject, payload: Bytes, headers: Option<HeaderMap>) -> Self {
48        Self {
49            subject,
50            payload,
51            headers,
52        }
53    }
54}
55
56impl From<OutboundMessage> for message::OutboundMessage {
57    fn from(message: OutboundMessage) -> Self {
58        message::OutboundMessage {
59            subject: message.subject,
60            payload: message.payload,
61            headers: message.headers,
62            reply: None,
63        }
64    }
65}
66
67/// Used for building customized `publish` message.
68#[derive(Default, Clone, Debug)]
69pub struct PublishMessage {
70    pub(crate) payload: Bytes,
71    pub(crate) headers: Option<header::HeaderMap>,
72}
73impl PublishMessage {
74    /// Creates a new custom Publish struct to be used with.
75    pub fn build() -> Self {
76        Default::default()
77    }
78
79    /// Sets the payload for the message.
80    pub fn payload(mut self, payload: Bytes) -> Self {
81        self.payload = payload;
82        self
83    }
84    /// Adds headers to the message.
85    pub fn headers(mut self, headers: HeaderMap) -> Self {
86        self.headers = Some(headers);
87        self
88    }
89    /// A shorthand to add a single header.
90    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
91        self.headers
92            .get_or_insert(header::HeaderMap::new())
93            .insert(name, value);
94        self
95    }
96    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
97    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
98        self.header(header::NATS_MESSAGE_ID, id.as_ref())
99    }
100    /// Sets expected last message ID.
101    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
102    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
103        self.header(
104            header::NATS_EXPECTED_LAST_MESSAGE_ID,
105            last_message_id.as_ref(),
106        )
107    }
108    /// Sets the last expected stream sequence.
109    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
110    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
111        self.header(
112            header::NATS_EXPECTED_LAST_SEQUENCE,
113            HeaderValue::from(last_sequence),
114        )
115    }
116    /// Sets the last expected stream sequence for a subject this message will be published to.
117    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
118    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
119        self.header(
120            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
121            HeaderValue::from(subject_sequence),
122        )
123    }
124    /// Sets the expected stream name.
125    /// It sets the `Nats-Expected-Stream` header with provided value.
126    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
127        self.header(
128            header::NATS_EXPECTED_STREAM,
129            HeaderValue::from(stream.as_ref()),
130        )
131    }
132
133    #[cfg(feature = "server_2_11")]
134    /// Sets TTL for a single message.
135    /// It sets the `Nats-TTL` header with provided value.
136    pub fn ttl(self, ttl: Duration) -> Self {
137        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
138    }
139
140    /// Creates an [crate::jetstream::message::OutboundMessage] that can be sent using
141    /// [crate::jetstream::context::traits::Publisher::publish_message].
142    pub fn outbound_message<S: ToSubject>(self, subject: S) -> OutboundMessage {
143        OutboundMessage {
144            subject: subject.to_subject(),
145            payload: self.payload,
146            headers: self.headers,
147        }
148    }
149}
150
151#[derive(Clone, Debug)]
152pub struct Message {
153    pub message: crate::Message,
154    pub context: Context,
155}
156
157impl TryFrom<crate::Message> for StreamMessage {
158    type Error = StreamMessageError;
159
160    fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
161        let headers = message.headers.ok_or_else(|| {
162            StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
163        })?;
164
165        let sequence = headers
166            .get_last(header::NATS_SEQUENCE)
167            .ok_or_else(|| {
168                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
169            })
170            .and_then(|seq| {
171                seq.as_str().parse().map_err(|err| {
172                    StreamMessageError::with_source(
173                        StreamMessageErrorKind::ParseError,
174                        format!("could not parse sequence header: {err}"),
175                    )
176                })
177            })?;
178
179        let time = headers
180            .get_last(header::NATS_TIME_STAMP)
181            .ok_or_else(|| {
182                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
183            })
184            .and_then(|time| {
185                OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
186                    StreamMessageError::with_source(
187                        StreamMessageErrorKind::ParseError,
188                        format!("could not parse timestamp header: {err}"),
189                    )
190                })
191            })?;
192
193        let subject = headers
194            .get_last(header::NATS_SUBJECT)
195            .ok_or_else(|| {
196                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
197            })?
198            .as_str()
199            .into();
200
201        Ok(StreamMessage {
202            subject,
203            sequence,
204            headers,
205            payload: message.payload,
206            time,
207        })
208    }
209}
210
211#[derive(Debug, Clone, PartialEq)]
212pub enum StreamMessageErrorKind {
213    MissingHeader,
214    ParseError,
215}
216
217/// Error returned when library is unable to parse message got directly from the stream.
218pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
219
220impl Display for StreamMessageErrorKind {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        match self {
223            StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
224            StreamMessageErrorKind::ParseError => write!(f, "parse error"),
225        }
226    }
227}
228
229impl std::ops::Deref for Message {
230    type Target = crate::Message;
231
232    fn deref(&self) -> &Self::Target {
233        &self.message
234    }
235}
236
237impl From<Message> for crate::Message {
238    fn from(source: Message) -> crate::Message {
239        source.message
240    }
241}
242
243impl Message {
244    /// Splits [Message] into [Acker] and [crate::Message].
245    /// This can help reduce memory footprint if [Message] can be dropped before acking,
246    /// for example when it's transformed into another structure and acked later
247    pub fn split(mut self) -> (crate::Message, Acker) {
248        let reply = mem::take(&mut self.message.reply);
249        (
250            self.message,
251            Acker {
252                context: self.context,
253                reply,
254            },
255        )
256    }
257    /// Acknowledges a message delivery by sending `+ACK` to the server.
258    ///
259    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
260    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
261    ///
262    /// # Examples
263    ///
264    /// ```no_run
265    /// # #[tokio::main]
266    /// # async fn main() -> Result<(), async_nats::Error> {
267    /// use async_nats::jetstream::consumer::PullConsumer;
268    /// use futures_util::StreamExt;
269    /// let client = async_nats::connect("localhost:4222").await?;
270    /// let jetstream = async_nats::jetstream::new(client);
271    ///
272    /// let consumer: PullConsumer = jetstream
273    ///     .get_stream("events")
274    ///     .await?
275    ///     .get_consumer("pull")
276    ///     .await?;
277    ///
278    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
279    ///
280    /// while let Some(message) = messages.next().await {
281    ///     message?.ack().await?;
282    /// }
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn ack(&self) -> Result<(), Error> {
287        if let Some(ref reply) = self.reply {
288            self.context
289                .client
290                .publish(reply.clone(), "".into())
291                .map_err(Error::from)
292                .await
293        } else {
294            Err(Box::new(std::io::Error::other(
295                "No reply subject, not a JetStream message",
296            )))
297        }
298    }
299
300    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
301    ///
302    /// # Examples
303    ///
304    /// ```no_run
305    /// # #[tokio::main]
306    /// # async fn main() -> Result<(), async_nats::Error> {
307    /// use async_nats::jetstream::consumer::PullConsumer;
308    /// use async_nats::jetstream::AckKind;
309    /// use futures_util::StreamExt;
310    /// let client = async_nats::connect("localhost:4222").await?;
311    /// let jetstream = async_nats::jetstream::new(client);
312    ///
313    /// let consumer: PullConsumer = jetstream
314    ///     .get_stream("events")
315    ///     .await?
316    ///     .get_consumer("pull")
317    ///     .await?;
318    ///
319    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
320    ///
321    /// while let Some(message) = messages.next().await {
322    ///     message?.ack_with(AckKind::Nak(None)).await?;
323    /// }
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
328        if let Some(ref reply) = self.reply {
329            self.context
330                .client
331                .publish(reply.to_owned(), kind.into())
332                .map_err(Error::from)
333                .await
334        } else {
335            Err(Box::new(std::io::Error::other(
336                "No reply subject, not a JetStream message",
337            )))
338        }
339    }
340
341    /// Acknowledges a message delivery by sending a chosen [AckKind] to the server
342    /// and awaits for confirmation for the server that it received the message.
343    /// Useful if user wants to ensure `exactly once` semantics.
344    ///
345    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
346    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
347    ///
348    /// # Examples
349    ///
350    /// ```no_run
351    /// # #[tokio::main]
352    /// # async fn main() -> Result<(), async_nats::Error> {
353    /// use async_nats::jetstream::AckKind;
354    /// use futures_util::StreamExt;
355    /// let client = async_nats::connect("localhost:4222").await?;
356    /// let jetstream = async_nats::jetstream::new(client);
357    ///
358    /// let consumer = jetstream
359    ///     .get_stream("events")
360    ///     .await?
361    ///     .get_consumer("pull")
362    ///     .await?;
363    ///
364    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
365    ///
366    /// while let Some(message) = messages.next().await {
367    ///     message?.double_ack_with(AckKind::Ack).await?;
368    /// }
369    /// # Ok(())
370    /// # }
371    /// ```
372    pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
373        if let Some(ref reply) = self.reply {
374            let inbox = self.context.client.new_inbox();
375            let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
376            self.context
377                .client
378                .publish_with_reply(reply.clone(), inbox, ack_kind.into())
379                .await?;
380            match tokio::time::timeout(self.context.timeout, subscription.next())
381                .await
382                .map_err(|_| {
383                    std::io::Error::new(
384                        std::io::ErrorKind::TimedOut,
385                        "double ack response timed out",
386                    )
387                })? {
388                Some(_) => Ok(()),
389                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
390            }
391        } else {
392            Err(Box::new(std::io::Error::other(
393                "No reply subject, not a JetStream message",
394            )))
395        }
396    }
397
398    /// Acknowledges a message delivery by sending `+ACK` to the server
399    /// and awaits for confirmation for the server that it received the message.
400    /// Useful if user wants to ensure `exactly once` semantics.
401    ///
402    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
403    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
404    ///
405    /// # Examples
406    ///
407    /// ```no_run
408    /// # #[tokio::main]
409    /// # async fn main() -> Result<(), async_nats::Error> {
410    /// use futures_util::StreamExt;
411    /// let client = async_nats::connect("localhost:4222").await?;
412    /// let jetstream = async_nats::jetstream::new(client);
413    ///
414    /// let consumer = jetstream
415    ///     .get_stream("events")
416    ///     .await?
417    ///     .get_consumer("pull")
418    ///     .await?;
419    ///
420    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
421    ///
422    /// while let Some(message) = messages.next().await {
423    ///     message?.double_ack().await?;
424    /// }
425    /// # Ok(())
426    /// # }
427    /// ```
428    pub async fn double_ack(&self) -> Result<(), Error> {
429        self.double_ack_with(AckKind::Ack).await
430    }
431
432    /// Returns the `JetStream` message ID
433    /// if this is a `JetStream` message.
434    #[allow(clippy::mixed_read_write_in_expression)]
435    pub fn info(&self) -> Result<Info<'_>, Error> {
436        const PREFIX: &str = "$JS.ACK.";
437        const SKIP: usize = PREFIX.len();
438
439        let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
440            std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
441        })?;
442
443        if !reply.starts_with(PREFIX) {
444            return Err(Box::new(std::io::Error::other(
445                "did not found proper prefix",
446            )));
447        }
448
449        reply = &reply[SKIP..];
450
451        let mut split = reply.split('.');
452
453        // we should avoid allocating to prevent
454        // large performance degradations in
455        // parsing this.
456        let mut tokens: [Option<&str>; 10] = [None; 10];
457        let mut n_tokens = 0;
458        for each_token in &mut tokens {
459            if let Some(token) = split.next() {
460                *each_token = Some(token);
461                n_tokens += 1;
462            }
463        }
464
465        let mut token_index = 0;
466
467        macro_rules! try_parse {
468            () => {
469                match str::parse(try_parse!(str)) {
470                    Ok(parsed) => parsed,
471                    Err(e) => {
472                        return Err(Box::new(e));
473                    }
474                }
475            };
476            (str) => {
477                if let Some(next) = tokens[token_index].take() {
478                    #[allow(unused)]
479                    {
480                        // this isn't actually unused, but it's
481                        // difficult for the compiler to infer this.
482                        token_index += 1;
483                    }
484                    next
485                } else {
486                    return Err(Box::new(std::io::Error::other("too few tokens")));
487                }
488            };
489        }
490
491        // now we can try to parse the tokens to
492        // individual types. We use an if-else
493        // chain instead of a match because it
494        // produces more optimal code usually,
495        // and we want to try the 9 (11 - the first 2)
496        // case first because we expect it to
497        // be the most common. We use >= to be
498        // future-proof.
499        if n_tokens >= 9 {
500            Ok(Info {
501                domain: {
502                    let domain: &str = try_parse!(str);
503                    if domain == "_" {
504                        None
505                    } else {
506                        Some(domain)
507                    }
508                },
509                acc_hash: Some(try_parse!(str)),
510                stream: try_parse!(str),
511                consumer: try_parse!(str),
512                delivered: try_parse!(),
513                stream_sequence: try_parse!(),
514                consumer_sequence: try_parse!(),
515                published: {
516                    let nanos: i128 = try_parse!();
517                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
518                },
519                pending: try_parse!(),
520                token: if n_tokens >= 9 {
521                    Some(try_parse!(str))
522                } else {
523                    None
524                },
525            })
526        } else if n_tokens == 7 {
527            // we expect this to be increasingly rare, as older
528            // servers are phased out.
529            Ok(Info {
530                domain: None,
531                acc_hash: None,
532                stream: try_parse!(str),
533                consumer: try_parse!(str),
534                delivered: try_parse!(),
535                stream_sequence: try_parse!(),
536                consumer_sequence: try_parse!(),
537                published: {
538                    let nanos: i128 = try_parse!();
539                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
540                },
541                pending: try_parse!(),
542                token: None,
543            })
544        } else {
545            Err(Box::new(std::io::Error::other("bad token number")))
546        }
547    }
548}
549
550/// A lightweight struct useful for decoupling message contents and the ability to ack it.
551pub struct Acker {
552    context: Context,
553    reply: Option<Subject>,
554}
555
556// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
557// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
558// Creating separate function to ack just to avoid one duplication is not worth it either.
559impl Acker {
560    pub fn new(context: Context, reply: Option<Subject>) -> Self {
561        Self { context, reply }
562    }
563    /// Acknowledges a message delivery by sending `+ACK` to the server.
564    ///
565    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
566    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
567    ///
568    /// # Examples
569    ///
570    /// ```no_run
571    /// # #[tokio::main]
572    /// # async fn main() -> Result<(), async_nats::Error> {
573    /// use async_nats::jetstream::{consumer::PullConsumer, Message};
574    /// use futures_util::StreamExt;
575    /// let client = async_nats::connect("localhost:4222").await?;
576    /// let jetstream = async_nats::jetstream::new(client);
577    ///
578    /// let consumer: PullConsumer = jetstream
579    ///     .get_stream("events")
580    ///     .await?
581    ///     .get_consumer("pull")
582    ///     .await?;
583    ///
584    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
585    ///
586    /// while let Some(message) = messages.next().await {
587    ///     let (message, acker) = message.map(Message::split)?;
588    ///     // Do something with the message. Ownership can be taken over `Message`
589    ///     // while retaining ability to ack later.
590    ///     println!("message: {:?}", message);
591    ///     // Ack it. `Message` may be dropped already.
592    ///     acker.ack().await?;
593    /// }
594    /// # Ok(())
595    /// # }
596    /// ```
597    pub async fn ack(&self) -> Result<(), Error> {
598        if let Some(ref reply) = self.reply {
599            self.context
600                .client
601                .publish(reply.to_owned(), "".into())
602                .map_err(Error::from)
603                .await
604        } else {
605            Err(Box::new(std::io::Error::other(
606                "No reply subject, not a JetStream message",
607            )))
608        }
609    }
610
611    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
612    ///
613    /// # Examples
614    ///
615    /// ```no_run
616    /// # #[tokio::main]
617    /// # async fn main() -> Result<(), async_nats::Error> {
618    /// use async_nats::jetstream::{consumer::PullConsumer, AckKind, Message};
619    /// use futures_util::StreamExt;
620    /// let client = async_nats::connect("localhost:4222").await?;
621    /// let jetstream = async_nats::jetstream::new(client);
622    ///
623    /// let consumer: PullConsumer = jetstream
624    ///     .get_stream("events")
625    ///     .await?
626    ///     .get_consumer("pull")
627    ///     .await?;
628    ///
629    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
630    ///
631    /// while let Some(message) = messages.next().await {
632    ///     let (message, acker) = message.map(Message::split)?;
633    ///     // Do something with the message. Ownership can be taken over `Message`.
634    ///     // while retaining ability to ack later.
635    ///     println!("message: {:?}", message);
636    ///     // Ack it. `Message` may be dropped already.
637    ///     acker.ack_with(AckKind::Nak(None)).await?;
638    /// }
639    /// # Ok(())
640    /// # }
641    /// ```
642    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
643        if let Some(ref reply) = self.reply {
644            self.context
645                .client
646                .publish(reply.to_owned(), kind.into())
647                .map_err(Error::from)
648                .await
649        } else {
650            Err(Box::new(std::io::Error::other(
651                "No reply subject, not a JetStream message",
652            )))
653        }
654    }
655
656    /// Acknowledges a message delivery by sending the chosen [AckKind] to the server
657    /// and awaits for confirmation for the server that it received the message.
658    /// Useful if user wants to ensure `exactly once` semantics.
659    ///
660    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
661    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
662    ///
663    /// # Examples
664    ///
665    /// ```no_run
666    /// # #[tokio::main]
667    /// # async fn main() -> Result<(), async_nats::Error> {
668    /// use async_nats::jetstream::{AckKind, Message};
669    /// use futures_util::StreamExt;
670    /// let client = async_nats::connect("localhost:4222").await?;
671    /// let jetstream = async_nats::jetstream::new(client);
672    ///
673    /// let consumer = jetstream
674    ///     .get_stream("events")
675    ///     .await?
676    ///     .get_consumer("pull")
677    ///     .await?;
678    ///
679    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
680    ///
681    /// while let Some(message) = messages.next().await {
682    ///     let (message, acker) = message.map(Message::split)?;
683    ///     // Do something with the message. Ownership can be taken over `Message`.
684    ///     // while retaining ability to ack later.
685    ///     println!("message: {:?}", message);
686    ///     // Ack it. `Message` may be dropped already.
687    ///     acker.double_ack_with(AckKind::Ack).await?;
688    /// }
689    /// # Ok(())
690    /// # }
691    /// ```
692    pub async fn double_ack_with(&self, ack_kind: AckKind) -> Result<(), Error> {
693        if let Some(ref reply) = self.reply {
694            let inbox = self.context.client.new_inbox();
695            let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
696            self.context
697                .client
698                .publish_with_reply(reply.to_owned(), inbox, ack_kind.into())
699                .await?;
700            match tokio::time::timeout(self.context.timeout, subscription.next())
701                .await
702                .map_err(|_| {
703                    std::io::Error::new(
704                        std::io::ErrorKind::TimedOut,
705                        "double ack response timed out",
706                    )
707                })? {
708                Some(_) => Ok(()),
709                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
710            }
711        } else {
712            Err(Box::new(std::io::Error::other(
713                "No reply subject, not a JetStream message",
714            )))
715        }
716    }
717
718    /// Acknowledges a message delivery by sending `+ACK` to the server
719    /// and awaits for confirmation for the server that it received the message.
720    /// Useful if user wants to ensure `exactly once` semantics.
721    ///
722    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
723    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
724    ///
725    /// # Examples
726    ///
727    /// ```no_run
728    /// # #[tokio::main]
729    /// # async fn main() -> Result<(), async_nats::Error> {
730    /// use async_nats::jetstream::Message;
731    /// use futures_util::StreamExt;
732    /// let client = async_nats::connect("localhost:4222").await?;
733    /// let jetstream = async_nats::jetstream::new(client);
734    ///
735    /// let consumer = jetstream
736    ///     .get_stream("events")
737    ///     .await?
738    ///     .get_consumer("pull")
739    ///     .await?;
740    ///
741    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
742    ///
743    /// while let Some(message) = messages.next().await {
744    ///     let (message, acker) = message.map(Message::split)?;
745    ///     // Do something with the message. Ownership can be taken over `Message`.
746    ///     // while retaining ability to ack later.
747    ///     println!("message: {:?}", message);
748    ///     // Ack it. `Message` may be dropped already.
749    ///     acker.double_ack().await?;
750    /// }
751    /// # Ok(())
752    /// # }
753    /// ```
754    pub async fn double_ack(&self) -> Result<(), Error> {
755        self.double_ack_with(AckKind::Ack).await
756    }
757}
758/// The kinds of response used for acknowledging a processed message.
759#[derive(Debug, Clone, Copy)]
760pub enum AckKind {
761    /// Acknowledges a message was completely handled.
762    Ack,
763    /// Signals that the message will not be processed now
764    /// and processing can move onto the next message, NAK'd
765    /// message will be retried.
766    Nak(Option<Duration>),
767    /// When sent before the AckWait period indicates that
768    /// work is ongoing and the period should be extended by
769    /// another equal to AckWait.
770    Progress,
771    /// Acknowledges the message was handled and requests
772    /// delivery of the next message to the reply subject.
773    /// Only applies to Pull-mode.
774    Next,
775    /// Instructs the server to stop redelivery of a message
776    /// without acknowledging it as successfully processed.
777    Term,
778}
779
780impl From<AckKind> for Bytes {
781    fn from(kind: AckKind) -> Self {
782        use AckKind::*;
783        match kind {
784            Ack => Bytes::from_static(b"+ACK"),
785            Nak(maybe_duration) => match maybe_duration {
786                None => Bytes::from_static(b"-NAK"),
787                Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
788            },
789            Progress => Bytes::from_static(b"+WPI"),
790            Next => Bytes::from_static(b"+NXT"),
791            Term => Bytes::from_static(b"+TERM"),
792        }
793    }
794}
795
796/// Information about a received message
797#[derive(Debug, Clone)]
798pub struct Info<'a> {
799    /// Optional domain, present in servers post-ADR-15
800    pub domain: Option<&'a str>,
801    /// Optional account hash, present in servers post-ADR-15
802    pub acc_hash: Option<&'a str>,
803    /// The stream name
804    pub stream: &'a str,
805    /// The consumer name
806    pub consumer: &'a str,
807    /// The stream sequence number associated with this message
808    pub stream_sequence: u64,
809    /// The consumer sequence number associated with this message
810    pub consumer_sequence: u64,
811    /// The number of delivery attempts for this message
812    pub delivered: i64,
813    /// the number of messages known by the server to be pending to this consumer
814    pub pending: u64,
815    /// the time that this message was received by the server from its publisher
816    pub published: time::OffsetDateTime,
817    /// Optional token, present in servers post-ADR-15
818    pub token: Option<&'a str>,
819}