async_nats/jetstream/
message.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::header::{IntoHeaderName, IntoHeaderValue};
17use crate::subject::ToSubject;
18use crate::{error, header, message, Error, HeaderValue};
19use crate::{subject::Subject, HeaderMap};
20use bytes::Bytes;
21use futures_util::future::TryFutureExt;
22use futures_util::StreamExt;
23use std::fmt::Display;
24use std::{mem, time::Duration};
25use time::format_description::well_known::Rfc3339;
26use time::OffsetDateTime;
27
28/// A message received directly from the stream, without leveraging a consumer.
29#[derive(Debug, Clone)]
30pub struct StreamMessage {
31    pub subject: Subject,
32    pub sequence: u64,
33    pub headers: HeaderMap,
34    pub payload: Bytes,
35    pub time: OffsetDateTime,
36}
37
38/// An outbound message to be published.
39/// Does not contain status or description which are valid only for inbound messages.
40pub struct OutboundMessage {
41    pub subject: Subject,
42    pub payload: Bytes,
43    pub headers: Option<HeaderMap>,
44}
45
46impl OutboundMessage {
47    pub fn new(subject: Subject, payload: Bytes, headers: Option<HeaderMap>) -> Self {
48        Self {
49            subject,
50            payload,
51            headers,
52        }
53    }
54}
55
56impl From<OutboundMessage> for message::OutboundMessage {
57    fn from(message: OutboundMessage) -> Self {
58        message::OutboundMessage {
59            subject: message.subject,
60            payload: message.payload,
61            headers: message.headers,
62            reply: None,
63        }
64    }
65}
66
67/// Used for building customized `publish` message.
68#[derive(Default, Clone, Debug)]
69pub struct PublishMessage {
70    pub(crate) payload: Bytes,
71    pub(crate) headers: Option<header::HeaderMap>,
72}
73impl PublishMessage {
74    /// Creates a new custom Publish struct to be used with.
75    pub fn build() -> Self {
76        Default::default()
77    }
78
79    /// Sets the payload for the message.
80    pub fn payload(mut self, payload: Bytes) -> Self {
81        self.payload = payload;
82        self
83    }
84    /// Adds headers to the message.
85    pub fn headers(mut self, headers: HeaderMap) -> Self {
86        self.headers = Some(headers);
87        self
88    }
89    /// A shorthand to add a single header.
90    pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
91        self.headers
92            .get_or_insert(header::HeaderMap::new())
93            .insert(name, value);
94        self
95    }
96    /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
97    pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
98        self.header(header::NATS_MESSAGE_ID, id.as_ref())
99    }
100    /// Sets expected last message ID.
101    /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
102    pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
103        self.header(
104            header::NATS_EXPECTED_LAST_MESSAGE_ID,
105            last_message_id.as_ref(),
106        )
107    }
108    /// Sets the last expected stream sequence.
109    /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
110    pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
111        self.header(
112            header::NATS_EXPECTED_LAST_SEQUENCE,
113            HeaderValue::from(last_sequence),
114        )
115    }
116    /// Sets the last expected stream sequence for a subject this message will be published to.
117    /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
118    pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
119        self.header(
120            header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
121            HeaderValue::from(subject_sequence),
122        )
123    }
124    /// Sets the expected stream name.
125    /// It sets the `Nats-Expected-Stream` header with provided value.
126    pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
127        self.header(
128            header::NATS_EXPECTED_STREAM,
129            HeaderValue::from(stream.as_ref()),
130        )
131    }
132
133    #[cfg(feature = "server_2_11")]
134    /// Sets TTL for a single message.
135    /// It sets the `Nats-TTL` header with provided value.
136    pub fn ttl(self, ttl: Duration) -> Self {
137        self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
138    }
139
140    /// Creates an [crate::jetstream::message::OutboundMessage] that can be sent using
141    /// [crate::jetstream::context::traits::Publisher::publish_message].
142    pub fn outbound_message<S: ToSubject>(self, subject: S) -> OutboundMessage {
143        OutboundMessage {
144            subject: subject.to_subject(),
145            payload: self.payload,
146            headers: self.headers,
147        }
148    }
149}
150
151#[derive(Clone, Debug)]
152pub struct Message {
153    pub message: crate::Message,
154    pub context: Context,
155}
156
157impl TryFrom<crate::Message> for StreamMessage {
158    type Error = StreamMessageError;
159
160    fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
161        let headers = message.headers.ok_or_else(|| {
162            StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
163        })?;
164
165        let sequence = headers
166            .get_last(header::NATS_SEQUENCE)
167            .ok_or_else(|| {
168                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
169            })
170            .and_then(|seq| {
171                seq.as_str().parse().map_err(|err| {
172                    StreamMessageError::with_source(
173                        StreamMessageErrorKind::ParseError,
174                        format!("could not parse sequence header: {err}"),
175                    )
176                })
177            })?;
178
179        let time = headers
180            .get_last(header::NATS_TIME_STAMP)
181            .ok_or_else(|| {
182                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
183            })
184            .and_then(|time| {
185                OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
186                    StreamMessageError::with_source(
187                        StreamMessageErrorKind::ParseError,
188                        format!("could not parse timestamp header: {err}"),
189                    )
190                })
191            })?;
192
193        let subject = headers
194            .get_last(header::NATS_SUBJECT)
195            .ok_or_else(|| {
196                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
197            })?
198            .as_str()
199            .into();
200
201        Ok(StreamMessage {
202            subject,
203            sequence,
204            headers,
205            payload: message.payload,
206            time,
207        })
208    }
209}
210
211#[derive(Debug, Clone, PartialEq)]
212pub enum StreamMessageErrorKind {
213    MissingHeader,
214    ParseError,
215}
216
217/// Error returned when library is unable to parse message got directly from the stream.
218pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
219
220impl Display for StreamMessageErrorKind {
221    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222        match self {
223            StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
224            StreamMessageErrorKind::ParseError => write!(f, "parse error"),
225        }
226    }
227}
228
229impl std::ops::Deref for Message {
230    type Target = crate::Message;
231
232    fn deref(&self) -> &Self::Target {
233        &self.message
234    }
235}
236
237impl From<Message> for crate::Message {
238    fn from(source: Message) -> crate::Message {
239        source.message
240    }
241}
242
243impl Message {
244    /// Splits [Message] into [Acker] and [crate::Message].
245    /// This can help reduce memory footprint if [Message] can be dropped before acking,
246    /// for example when it's transformed into another structure and acked later
247    pub fn split(mut self) -> (crate::Message, Acker) {
248        let reply = mem::take(&mut self.message.reply);
249        (
250            self.message,
251            Acker {
252                context: self.context,
253                reply,
254            },
255        )
256    }
257    /// Acknowledges a message delivery by sending `+ACK` to the server.
258    ///
259    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
260    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
261    ///
262    /// # Examples
263    ///
264    /// ```no_run
265    /// # #[tokio::main]
266    /// # async fn main() -> Result<(), async_nats::Error> {
267    /// use async_nats::jetstream::consumer::PullConsumer;
268    /// use futures_util::StreamExt;
269    /// let client = async_nats::connect("localhost:4222").await?;
270    /// let jetstream = async_nats::jetstream::new(client);
271    ///
272    /// let consumer: PullConsumer = jetstream
273    ///     .get_stream("events")
274    ///     .await?
275    ///     .get_consumer("pull")
276    ///     .await?;
277    ///
278    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
279    ///
280    /// while let Some(message) = messages.next().await {
281    ///     message?.ack().await?;
282    /// }
283    /// # Ok(())
284    /// # }
285    /// ```
286    pub async fn ack(&self) -> Result<(), Error> {
287        if let Some(ref reply) = self.reply {
288            self.context
289                .client
290                .publish(reply.clone(), "".into())
291                .map_err(Error::from)
292                .await
293        } else {
294            Err(Box::new(std::io::Error::other(
295                "No reply subject, not a JetStream message",
296            )))
297        }
298    }
299
300    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
301    ///
302    /// # Examples
303    ///
304    /// ```no_run
305    /// # #[tokio::main]
306    /// # async fn main() -> Result<(), async_nats::Error> {
307    /// use async_nats::jetstream::consumer::PullConsumer;
308    /// use async_nats::jetstream::AckKind;
309    /// use futures_util::StreamExt;
310    /// let client = async_nats::connect("localhost:4222").await?;
311    /// let jetstream = async_nats::jetstream::new(client);
312    ///
313    /// let consumer: PullConsumer = jetstream
314    ///     .get_stream("events")
315    ///     .await?
316    ///     .get_consumer("pull")
317    ///     .await?;
318    ///
319    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
320    ///
321    /// while let Some(message) = messages.next().await {
322    ///     message?.ack_with(AckKind::Nak(None)).await?;
323    /// }
324    /// # Ok(())
325    /// # }
326    /// ```
327    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
328        if let Some(ref reply) = self.reply {
329            self.context
330                .client
331                .publish(reply.to_owned(), kind.into())
332                .map_err(Error::from)
333                .await
334        } else {
335            Err(Box::new(std::io::Error::other(
336                "No reply subject, not a JetStream message",
337            )))
338        }
339    }
340
341    /// Acknowledges a message delivery by sending `+ACK` to the server
342    /// and awaits for confirmation for the server that it received the message.
343    /// Useful if user wants to ensure `exactly once` semantics.
344    ///
345    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
346    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
347    ///
348    /// # Examples
349    ///
350    /// ```no_run
351    /// # #[tokio::main]
352    /// # async fn main() -> Result<(), async_nats::Error> {
353    /// use futures_util::StreamExt;
354    /// let client = async_nats::connect("localhost:4222").await?;
355    /// let jetstream = async_nats::jetstream::new(client);
356    ///
357    /// let consumer = jetstream
358    ///     .get_stream("events")
359    ///     .await?
360    ///     .get_consumer("pull")
361    ///     .await?;
362    ///
363    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
364    ///
365    /// while let Some(message) = messages.next().await {
366    ///     message?.double_ack().await?;
367    /// }
368    /// # Ok(())
369    /// # }
370    /// ```
371    pub async fn double_ack(&self) -> Result<(), Error> {
372        if let Some(ref reply) = self.reply {
373            let inbox = self.context.client.new_inbox();
374            let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
375            self.context
376                .client
377                .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
378                .await?;
379            match tokio::time::timeout(self.context.timeout, subscription.next())
380                .await
381                .map_err(|_| {
382                    std::io::Error::new(
383                        std::io::ErrorKind::TimedOut,
384                        "double ack response timed out",
385                    )
386                })? {
387                Some(_) => Ok(()),
388                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
389            }
390        } else {
391            Err(Box::new(std::io::Error::other(
392                "No reply subject, not a JetStream message",
393            )))
394        }
395    }
396
397    /// Returns the `JetStream` message ID
398    /// if this is a `JetStream` message.
399    #[allow(clippy::mixed_read_write_in_expression)]
400    pub fn info(&self) -> Result<Info<'_>, Error> {
401        const PREFIX: &str = "$JS.ACK.";
402        const SKIP: usize = PREFIX.len();
403
404        let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
405            std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
406        })?;
407
408        if !reply.starts_with(PREFIX) {
409            return Err(Box::new(std::io::Error::other(
410                "did not found proper prefix",
411            )));
412        }
413
414        reply = &reply[SKIP..];
415
416        let mut split = reply.split('.');
417
418        // we should avoid allocating to prevent
419        // large performance degradations in
420        // parsing this.
421        let mut tokens: [Option<&str>; 10] = [None; 10];
422        let mut n_tokens = 0;
423        for each_token in &mut tokens {
424            if let Some(token) = split.next() {
425                *each_token = Some(token);
426                n_tokens += 1;
427            }
428        }
429
430        let mut token_index = 0;
431
432        macro_rules! try_parse {
433            () => {
434                match str::parse(try_parse!(str)) {
435                    Ok(parsed) => parsed,
436                    Err(e) => {
437                        return Err(Box::new(e));
438                    }
439                }
440            };
441            (str) => {
442                if let Some(next) = tokens[token_index].take() {
443                    #[allow(unused)]
444                    {
445                        // this isn't actually unused, but it's
446                        // difficult for the compiler to infer this.
447                        token_index += 1;
448                    }
449                    next
450                } else {
451                    return Err(Box::new(std::io::Error::other("too few tokens")));
452                }
453            };
454        }
455
456        // now we can try to parse the tokens to
457        // individual types. We use an if-else
458        // chain instead of a match because it
459        // produces more optimal code usually,
460        // and we want to try the 9 (11 - the first 2)
461        // case first because we expect it to
462        // be the most common. We use >= to be
463        // future-proof.
464        if n_tokens >= 9 {
465            Ok(Info {
466                domain: {
467                    let domain: &str = try_parse!(str);
468                    if domain == "_" {
469                        None
470                    } else {
471                        Some(domain)
472                    }
473                },
474                acc_hash: Some(try_parse!(str)),
475                stream: try_parse!(str),
476                consumer: try_parse!(str),
477                delivered: try_parse!(),
478                stream_sequence: try_parse!(),
479                consumer_sequence: try_parse!(),
480                published: {
481                    let nanos: i128 = try_parse!();
482                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
483                },
484                pending: try_parse!(),
485                token: if n_tokens >= 9 {
486                    Some(try_parse!(str))
487                } else {
488                    None
489                },
490            })
491        } else if n_tokens == 7 {
492            // we expect this to be increasingly rare, as older
493            // servers are phased out.
494            Ok(Info {
495                domain: None,
496                acc_hash: None,
497                stream: try_parse!(str),
498                consumer: try_parse!(str),
499                delivered: try_parse!(),
500                stream_sequence: try_parse!(),
501                consumer_sequence: try_parse!(),
502                published: {
503                    let nanos: i128 = try_parse!();
504                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
505                },
506                pending: try_parse!(),
507                token: None,
508            })
509        } else {
510            Err(Box::new(std::io::Error::other("bad token number")))
511        }
512    }
513}
514
515/// A lightweight struct useful for decoupling message contents and the ability to ack it.
516pub struct Acker {
517    context: Context,
518    reply: Option<Subject>,
519}
520
521// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
522// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
523// Creating separate function to ack just to avoid one duplication is not worth it either.
524impl Acker {
525    pub fn new(context: Context, reply: Option<Subject>) -> Self {
526        Self { context, reply }
527    }
528    /// Acknowledges a message delivery by sending `+ACK` to the server.
529    ///
530    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
531    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
532    ///
533    /// # Examples
534    ///
535    /// ```no_run
536    /// # #[tokio::main]
537    /// # async fn main() -> Result<(), async_nats::Error> {
538    /// use async_nats::jetstream::consumer::PullConsumer;
539    /// use async_nats::jetstream::Message;
540    /// use futures_util::StreamExt;
541    /// let client = async_nats::connect("localhost:4222").await?;
542    /// let jetstream = async_nats::jetstream::new(client);
543    ///
544    /// let consumer: PullConsumer = jetstream
545    ///     .get_stream("events")
546    ///     .await?
547    ///     .get_consumer("pull")
548    ///     .await?;
549    ///
550    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
551    ///
552    /// while let Some(message) = messages.next().await {
553    ///     let (message, acker) = message.map(Message::split)?;
554    ///     // Do something with the message. Ownership can be taken over `Message`
555    ///     // while retaining ability to ack later.
556    ///     println!("message: {:?}", message);
557    ///     // Ack it. `Message` may be dropped already.
558    ///     acker.ack().await?;
559    /// }
560    /// # Ok(())
561    /// # }
562    /// ```
563    pub async fn ack(&self) -> Result<(), Error> {
564        if let Some(ref reply) = self.reply {
565            self.context
566                .client
567                .publish(reply.to_owned(), "".into())
568                .map_err(Error::from)
569                .await
570        } else {
571            Err(Box::new(std::io::Error::other(
572                "No reply subject, not a JetStream message",
573            )))
574        }
575    }
576
577    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
578    ///
579    /// # Examples
580    ///
581    /// ```no_run
582    /// # #[tokio::main]
583    /// # async fn main() -> Result<(), async_nats::Error> {
584    /// use async_nats::jetstream::consumer::PullConsumer;
585    /// use async_nats::jetstream::AckKind;
586    /// use async_nats::jetstream::Message;
587    /// use futures_util::StreamExt;
588    /// let client = async_nats::connect("localhost:4222").await?;
589    /// let jetstream = async_nats::jetstream::new(client);
590    ///
591    /// let consumer: PullConsumer = jetstream
592    ///     .get_stream("events")
593    ///     .await?
594    ///     .get_consumer("pull")
595    ///     .await?;
596    ///
597    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
598    ///
599    /// while let Some(message) = messages.next().await {
600    ///     let (message, acker) = message.map(Message::split)?;
601    ///     // Do something with the message. Ownership can be taken over `Message`.
602    ///     // while retaining ability to ack later.
603    ///     println!("message: {:?}", message);
604    ///     // Ack it. `Message` may be dropped already.
605    ///     acker.ack_with(AckKind::Nak(None)).await?;
606    /// }
607    /// # Ok(())
608    /// # }
609    /// ```
610    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
611        if let Some(ref reply) = self.reply {
612            self.context
613                .client
614                .publish(reply.to_owned(), kind.into())
615                .map_err(Error::from)
616                .await
617        } else {
618            Err(Box::new(std::io::Error::other(
619                "No reply subject, not a JetStream message",
620            )))
621        }
622    }
623
624    /// Acknowledges a message delivery by sending `+ACK` to the server
625    /// and awaits for confirmation for the server that it received the message.
626    /// Useful if user wants to ensure `exactly once` semantics.
627    ///
628    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
629    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
630    ///
631    /// # Examples
632    ///
633    /// ```no_run
634    /// # #[tokio::main]
635    /// # async fn main() -> Result<(), async_nats::Error> {
636    /// use async_nats::jetstream::Message;
637    /// use futures_util::StreamExt;
638    /// let client = async_nats::connect("localhost:4222").await?;
639    /// let jetstream = async_nats::jetstream::new(client);
640    ///
641    /// let consumer = jetstream
642    ///     .get_stream("events")
643    ///     .await?
644    ///     .get_consumer("pull")
645    ///     .await?;
646    ///
647    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
648    ///
649    /// while let Some(message) = messages.next().await {
650    ///     let (message, acker) = message.map(Message::split)?;
651    ///     // Do something with the message. Ownership can be taken over `Message`.
652    ///     // while retaining ability to ack later.
653    ///     println!("message: {:?}", message);
654    ///     // Ack it. `Message` may be dropped already.
655    ///     acker.double_ack().await?;
656    /// }
657    /// # Ok(())
658    /// # }
659    /// ```
660    pub async fn double_ack(&self) -> Result<(), Error> {
661        if let Some(ref reply) = self.reply {
662            let inbox = self.context.client.new_inbox();
663            let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
664            self.context
665                .client
666                .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
667                .await?;
668            match tokio::time::timeout(self.context.timeout, subscription.next())
669                .await
670                .map_err(|_| {
671                    std::io::Error::new(
672                        std::io::ErrorKind::TimedOut,
673                        "double ack response timed out",
674                    )
675                })? {
676                Some(_) => Ok(()),
677                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
678            }
679        } else {
680            Err(Box::new(std::io::Error::other(
681                "No reply subject, not a JetStream message",
682            )))
683        }
684    }
685}
686/// The kinds of response used for acknowledging a processed message.
687#[derive(Debug, Clone, Copy)]
688pub enum AckKind {
689    /// Acknowledges a message was completely handled.
690    Ack,
691    /// Signals that the message will not be processed now
692    /// and processing can move onto the next message, NAK'd
693    /// message will be retried.
694    Nak(Option<Duration>),
695    /// When sent before the AckWait period indicates that
696    /// work is ongoing and the period should be extended by
697    /// another equal to AckWait.
698    Progress,
699    /// Acknowledges the message was handled and requests
700    /// delivery of the next message to the reply subject.
701    /// Only applies to Pull-mode.
702    Next,
703    /// Instructs the server to stop redelivery of a message
704    /// without acknowledging it as successfully processed.
705    Term,
706}
707
708impl From<AckKind> for Bytes {
709    fn from(kind: AckKind) -> Self {
710        use AckKind::*;
711        match kind {
712            Ack => Bytes::from_static(b"+ACK"),
713            Nak(maybe_duration) => match maybe_duration {
714                None => Bytes::from_static(b"-NAK"),
715                Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
716            },
717            Progress => Bytes::from_static(b"+WPI"),
718            Next => Bytes::from_static(b"+NXT"),
719            Term => Bytes::from_static(b"+TERM"),
720        }
721    }
722}
723
724/// Information about a received message
725#[derive(Debug, Clone)]
726pub struct Info<'a> {
727    /// Optional domain, present in servers post-ADR-15
728    pub domain: Option<&'a str>,
729    /// Optional account hash, present in servers post-ADR-15
730    pub acc_hash: Option<&'a str>,
731    /// The stream name
732    pub stream: &'a str,
733    /// The consumer name
734    pub consumer: &'a str,
735    /// The stream sequence number associated with this message
736    pub stream_sequence: u64,
737    /// The consumer sequence number associated with this message
738    pub consumer_sequence: u64,
739    /// The number of delivery attempts for this message
740    pub delivered: i64,
741    /// the number of messages known by the server to be pending to this consumer
742    pub pending: u64,
743    /// the time that this message was received by the server from its publisher
744    pub published: time::OffsetDateTime,
745    /// Optional token, present in servers post-ADR-15
746    pub token: Option<&'a str>,
747}