async_nats/jetstream/
message.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::{error, header, Error};
17use crate::{subject::Subject, HeaderMap};
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::fmt::Display;
22use std::{mem, time::Duration};
23use time::format_description::well_known::Rfc3339;
24use time::OffsetDateTime;
25
26/// A message received directly from the stream, without leveraging a consumer.
27#[derive(Debug, Clone)]
28pub struct StreamMessage {
29    pub subject: Subject,
30    pub sequence: u64,
31    pub headers: HeaderMap,
32    pub payload: Bytes,
33    pub time: OffsetDateTime,
34}
35
36#[derive(Clone, Debug)]
37pub struct Message {
38    pub message: crate::Message,
39    pub context: Context,
40}
41
42impl TryFrom<crate::Message> for StreamMessage {
43    type Error = StreamMessageError;
44
45    fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
46        let headers = message.headers.ok_or_else(|| {
47            StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
48        })?;
49
50        let sequence = headers
51            .get_last(header::NATS_SEQUENCE)
52            .ok_or_else(|| {
53                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
54            })
55            .and_then(|seq| {
56                seq.as_str().parse().map_err(|err| {
57                    StreamMessageError::with_source(
58                        StreamMessageErrorKind::ParseError,
59                        format!("could not parse sequence header: {}", err),
60                    )
61                })
62            })?;
63
64        let time = headers
65            .get_last(header::NATS_TIME_STAMP)
66            .ok_or_else(|| {
67                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
68            })
69            .and_then(|time| {
70                OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
71                    StreamMessageError::with_source(
72                        StreamMessageErrorKind::ParseError,
73                        format!("could not parse timestamp header: {}", err),
74                    )
75                })
76            })?;
77
78        let subject = headers
79            .get_last(header::NATS_SUBJECT)
80            .ok_or_else(|| {
81                StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
82            })?
83            .as_str()
84            .into();
85
86        Ok(StreamMessage {
87            subject,
88            sequence,
89            headers,
90            payload: message.payload,
91            time,
92        })
93    }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum StreamMessageErrorKind {
98    MissingHeader,
99    ParseError,
100}
101
102/// Error returned when library is unable to parse message got directly from the stream.
103pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
104
105impl Display for StreamMessageErrorKind {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        match self {
108            StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
109            StreamMessageErrorKind::ParseError => write!(f, "parse error"),
110        }
111    }
112}
113
114impl std::ops::Deref for Message {
115    type Target = crate::Message;
116
117    fn deref(&self) -> &Self::Target {
118        &self.message
119    }
120}
121
122impl From<Message> for crate::Message {
123    fn from(source: Message) -> crate::Message {
124        source.message
125    }
126}
127
128impl Message {
129    /// Splits [Message] into [Acker] and [crate::Message].
130    /// This can help reduce memory footprint if [Message] can be dropped before acking,
131    /// for example when it's transformed into another structure and acked later
132    pub fn split(mut self) -> (crate::Message, Acker) {
133        let reply = mem::take(&mut self.message.reply);
134        (
135            self.message,
136            Acker {
137                context: self.context,
138                reply,
139            },
140        )
141    }
142    /// Acknowledges a message delivery by sending `+ACK` to the server.
143    ///
144    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
145    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
146    ///
147    /// # Examples
148    ///
149    /// ```no_run
150    /// # #[tokio::main]
151    /// # async fn main() -> Result<(), async_nats::Error> {
152    /// use async_nats::jetstream::consumer::PullConsumer;
153    /// use futures::StreamExt;
154    /// let client = async_nats::connect("localhost:4222").await?;
155    /// let jetstream = async_nats::jetstream::new(client);
156    ///
157    /// let consumer: PullConsumer = jetstream
158    ///     .get_stream("events")
159    ///     .await?
160    ///     .get_consumer("pull")
161    ///     .await?;
162    ///
163    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
164    ///
165    /// while let Some(message) = messages.next().await {
166    ///     message?.ack().await?;
167    /// }
168    /// # Ok(())
169    /// # }
170    /// ```
171    pub async fn ack(&self) -> Result<(), Error> {
172        if let Some(ref reply) = self.reply {
173            self.context
174                .client
175                .publish(reply.clone(), "".into())
176                .map_err(Error::from)
177                .await
178        } else {
179            Err(Box::new(std::io::Error::other(
180                "No reply subject, not a JetStream message",
181            )))
182        }
183    }
184
185    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
186    ///
187    /// # Examples
188    ///
189    /// ```no_run
190    /// # #[tokio::main]
191    /// # async fn main() -> Result<(), async_nats::Error> {
192    /// use async_nats::jetstream::consumer::PullConsumer;
193    /// use async_nats::jetstream::AckKind;
194    /// use futures::StreamExt;
195    /// let client = async_nats::connect("localhost:4222").await?;
196    /// let jetstream = async_nats::jetstream::new(client);
197    ///
198    /// let consumer: PullConsumer = jetstream
199    ///     .get_stream("events")
200    ///     .await?
201    ///     .get_consumer("pull")
202    ///     .await?;
203    ///
204    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
205    ///
206    /// while let Some(message) = messages.next().await {
207    ///     message?.ack_with(AckKind::Nak(None)).await?;
208    /// }
209    /// # Ok(())
210    /// # }
211    /// ```
212    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
213        if let Some(ref reply) = self.reply {
214            self.context
215                .client
216                .publish(reply.to_owned(), kind.into())
217                .map_err(Error::from)
218                .await
219        } else {
220            Err(Box::new(std::io::Error::other(
221                "No reply subject, not a JetStream message",
222            )))
223        }
224    }
225
226    /// Acknowledges a message delivery by sending `+ACK` to the server
227    /// and awaits for confirmation for the server that it received the message.
228    /// Useful if user wants to ensure `exactly once` semantics.
229    ///
230    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
231    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
232    ///
233    /// # Examples
234    ///
235    /// ```no_run
236    /// # #[tokio::main]
237    /// # async fn main() -> Result<(), async_nats::Error> {
238    /// use futures::StreamExt;
239    /// let client = async_nats::connect("localhost:4222").await?;
240    /// let jetstream = async_nats::jetstream::new(client);
241    ///
242    /// let consumer = jetstream
243    ///     .get_stream("events")
244    ///     .await?
245    ///     .get_consumer("pull")
246    ///     .await?;
247    ///
248    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
249    ///
250    /// while let Some(message) = messages.next().await {
251    ///     message?.double_ack().await?;
252    /// }
253    /// # Ok(())
254    /// # }
255    /// ```
256    pub async fn double_ack(&self) -> Result<(), Error> {
257        if let Some(ref reply) = self.reply {
258            let inbox = self.context.client.new_inbox();
259            let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
260            self.context
261                .client
262                .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
263                .await?;
264            match tokio::time::timeout(self.context.timeout, subscription.next())
265                .await
266                .map_err(|_| {
267                    std::io::Error::new(
268                        std::io::ErrorKind::TimedOut,
269                        "double ack response timed out",
270                    )
271                })? {
272                Some(_) => Ok(()),
273                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
274            }
275        } else {
276            Err(Box::new(std::io::Error::other(
277                "No reply subject, not a JetStream message",
278            )))
279        }
280    }
281
282    /// Returns the `JetStream` message ID
283    /// if this is a `JetStream` message.
284    #[allow(clippy::mixed_read_write_in_expression)]
285    pub fn info(&self) -> Result<Info<'_>, Error> {
286        const PREFIX: &str = "$JS.ACK.";
287        const SKIP: usize = PREFIX.len();
288
289        let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
290            std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
291        })?;
292
293        if !reply.starts_with(PREFIX) {
294            return Err(Box::new(std::io::Error::other(
295                "did not found proper prefix",
296            )));
297        }
298
299        reply = &reply[SKIP..];
300
301        let mut split = reply.split('.');
302
303        // we should avoid allocating to prevent
304        // large performance degradations in
305        // parsing this.
306        let mut tokens: [Option<&str>; 10] = [None; 10];
307        let mut n_tokens = 0;
308        for each_token in &mut tokens {
309            if let Some(token) = split.next() {
310                *each_token = Some(token);
311                n_tokens += 1;
312            }
313        }
314
315        let mut token_index = 0;
316
317        macro_rules! try_parse {
318            () => {
319                match str::parse(try_parse!(str)) {
320                    Ok(parsed) => parsed,
321                    Err(e) => {
322                        return Err(Box::new(e));
323                    }
324                }
325            };
326            (str) => {
327                if let Some(next) = tokens[token_index].take() {
328                    #[allow(unused)]
329                    {
330                        // this isn't actually unused, but it's
331                        // difficult for the compiler to infer this.
332                        token_index += 1;
333                    }
334                    next
335                } else {
336                    return Err(Box::new(std::io::Error::other("too few tokens")));
337                }
338            };
339        }
340
341        // now we can try to parse the tokens to
342        // individual types. We use an if-else
343        // chain instead of a match because it
344        // produces more optimal code usually,
345        // and we want to try the 9 (11 - the first 2)
346        // case first because we expect it to
347        // be the most common. We use >= to be
348        // future-proof.
349        if n_tokens >= 9 {
350            Ok(Info {
351                domain: {
352                    let domain: &str = try_parse!(str);
353                    if domain == "_" {
354                        None
355                    } else {
356                        Some(domain)
357                    }
358                },
359                acc_hash: Some(try_parse!(str)),
360                stream: try_parse!(str),
361                consumer: try_parse!(str),
362                delivered: try_parse!(),
363                stream_sequence: try_parse!(),
364                consumer_sequence: try_parse!(),
365                published: {
366                    let nanos: i128 = try_parse!();
367                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
368                },
369                pending: try_parse!(),
370                token: if n_tokens >= 9 {
371                    Some(try_parse!(str))
372                } else {
373                    None
374                },
375            })
376        } else if n_tokens == 7 {
377            // we expect this to be increasingly rare, as older
378            // servers are phased out.
379            Ok(Info {
380                domain: None,
381                acc_hash: None,
382                stream: try_parse!(str),
383                consumer: try_parse!(str),
384                delivered: try_parse!(),
385                stream_sequence: try_parse!(),
386                consumer_sequence: try_parse!(),
387                published: {
388                    let nanos: i128 = try_parse!();
389                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
390                },
391                pending: try_parse!(),
392                token: None,
393            })
394        } else {
395            Err(Box::new(std::io::Error::other("bad token number")))
396        }
397    }
398}
399
400/// A lightweight struct useful for decoupling message contents and the ability to ack it.
401pub struct Acker {
402    context: Context,
403    reply: Option<Subject>,
404}
405
406// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
407// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
408// Creating separate function to ack just to avoid one duplication is not worth it either.
409impl Acker {
410    pub fn new(context: Context, reply: Option<Subject>) -> Self {
411        Self { context, reply }
412    }
413    /// Acknowledges a message delivery by sending `+ACK` to the server.
414    ///
415    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
416    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
417    ///
418    /// # Examples
419    ///
420    /// ```no_run
421    /// # #[tokio::main]
422    /// # async fn main() -> Result<(), async_nats::Error> {
423    /// use async_nats::jetstream::consumer::PullConsumer;
424    /// use async_nats::jetstream::Message;
425    /// use futures::StreamExt;
426    /// let client = async_nats::connect("localhost:4222").await?;
427    /// let jetstream = async_nats::jetstream::new(client);
428    ///
429    /// let consumer: PullConsumer = jetstream
430    ///     .get_stream("events")
431    ///     .await?
432    ///     .get_consumer("pull")
433    ///     .await?;
434    ///
435    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
436    ///
437    /// while let Some(message) = messages.next().await {
438    ///     let (message, acker) = message.map(Message::split)?;
439    ///     // Do something with the message. Ownership can be taken over `Message`
440    ///     // while retaining ability to ack later.
441    ///     println!("message: {:?}", message);
442    ///     // Ack it. `Message` may be dropped already.
443    ///     acker.ack().await?;
444    /// }
445    /// # Ok(())
446    /// # }
447    /// ```
448    pub async fn ack(&self) -> Result<(), Error> {
449        if let Some(ref reply) = self.reply {
450            self.context
451                .client
452                .publish(reply.to_owned(), "".into())
453                .map_err(Error::from)
454                .await
455        } else {
456            Err(Box::new(std::io::Error::other(
457                "No reply subject, not a JetStream message",
458            )))
459        }
460    }
461
462    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
463    ///
464    /// # Examples
465    ///
466    /// ```no_run
467    /// # #[tokio::main]
468    /// # async fn main() -> Result<(), async_nats::Error> {
469    /// use async_nats::jetstream::consumer::PullConsumer;
470    /// use async_nats::jetstream::AckKind;
471    /// use async_nats::jetstream::Message;
472    /// use futures::StreamExt;
473    /// let client = async_nats::connect("localhost:4222").await?;
474    /// let jetstream = async_nats::jetstream::new(client);
475    ///
476    /// let consumer: PullConsumer = jetstream
477    ///     .get_stream("events")
478    ///     .await?
479    ///     .get_consumer("pull")
480    ///     .await?;
481    ///
482    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
483    ///
484    /// while let Some(message) = messages.next().await {
485    ///     let (message, acker) = message.map(Message::split)?;
486    ///     // Do something with the message. Ownership can be taken over `Message`.
487    ///     // while retaining ability to ack later.
488    ///     println!("message: {:?}", message);
489    ///     // Ack it. `Message` may be dropped already.
490    ///     acker.ack_with(AckKind::Nak(None)).await?;
491    /// }
492    /// # Ok(())
493    /// # }
494    /// ```
495    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
496        if let Some(ref reply) = self.reply {
497            self.context
498                .client
499                .publish(reply.to_owned(), kind.into())
500                .map_err(Error::from)
501                .await
502        } else {
503            Err(Box::new(std::io::Error::other(
504                "No reply subject, not a JetStream message",
505            )))
506        }
507    }
508
509    /// Acknowledges a message delivery by sending `+ACK` to the server
510    /// and awaits for confirmation for the server that it received the message.
511    /// Useful if user wants to ensure `exactly once` semantics.
512    ///
513    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
514    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
515    ///
516    /// # Examples
517    ///
518    /// ```no_run
519    /// # #[tokio::main]
520    /// # async fn main() -> Result<(), async_nats::Error> {
521    /// use async_nats::jetstream::Message;
522    /// use futures::StreamExt;
523    /// let client = async_nats::connect("localhost:4222").await?;
524    /// let jetstream = async_nats::jetstream::new(client);
525    ///
526    /// let consumer = jetstream
527    ///     .get_stream("events")
528    ///     .await?
529    ///     .get_consumer("pull")
530    ///     .await?;
531    ///
532    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
533    ///
534    /// while let Some(message) = messages.next().await {
535    ///     let (message, acker) = message.map(Message::split)?;
536    ///     // Do something with the message. Ownership can be taken over `Message`.
537    ///     // while retaining ability to ack later.
538    ///     println!("message: {:?}", message);
539    ///     // Ack it. `Message` may be dropped already.
540    ///     acker.double_ack().await?;
541    /// }
542    /// # Ok(())
543    /// # }
544    /// ```
545    pub async fn double_ack(&self) -> Result<(), Error> {
546        if let Some(ref reply) = self.reply {
547            let inbox = self.context.client.new_inbox();
548            let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
549            self.context
550                .client
551                .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
552                .await?;
553            match tokio::time::timeout(self.context.timeout, subscription.next())
554                .await
555                .map_err(|_| {
556                    std::io::Error::new(
557                        std::io::ErrorKind::TimedOut,
558                        "double ack response timed out",
559                    )
560                })? {
561                Some(_) => Ok(()),
562                None => Err(Box::new(std::io::Error::other("subscription dropped"))),
563            }
564        } else {
565            Err(Box::new(std::io::Error::other(
566                "No reply subject, not a JetStream message",
567            )))
568        }
569    }
570}
571/// The kinds of response used for acknowledging a processed message.
572#[derive(Debug, Clone, Copy)]
573pub enum AckKind {
574    /// Acknowledges a message was completely handled.
575    Ack,
576    /// Signals that the message will not be processed now
577    /// and processing can move onto the next message, NAK'd
578    /// message will be retried.
579    Nak(Option<Duration>),
580    /// When sent before the AckWait period indicates that
581    /// work is ongoing and the period should be extended by
582    /// another equal to AckWait.
583    Progress,
584    /// Acknowledges the message was handled and requests
585    /// delivery of the next message to the reply subject.
586    /// Only applies to Pull-mode.
587    Next,
588    /// Instructs the server to stop redelivery of a message
589    /// without acknowledging it as successfully processed.
590    Term,
591}
592
593impl From<AckKind> for Bytes {
594    fn from(kind: AckKind) -> Self {
595        use AckKind::*;
596        match kind {
597            Ack => Bytes::from_static(b"+ACK"),
598            Nak(maybe_duration) => match maybe_duration {
599                None => Bytes::from_static(b"-NAK"),
600                Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
601            },
602            Progress => Bytes::from_static(b"+WPI"),
603            Next => Bytes::from_static(b"+NXT"),
604            Term => Bytes::from_static(b"+TERM"),
605        }
606    }
607}
608
609/// Information about a received message
610#[derive(Debug, Clone)]
611pub struct Info<'a> {
612    /// Optional domain, present in servers post-ADR-15
613    pub domain: Option<&'a str>,
614    /// Optional account hash, present in servers post-ADR-15
615    pub acc_hash: Option<&'a str>,
616    /// The stream name
617    pub stream: &'a str,
618    /// The consumer name
619    pub consumer: &'a str,
620    /// The stream sequence number associated with this message
621    pub stream_sequence: u64,
622    /// The consumer sequence number associated with this message
623    pub consumer_sequence: u64,
624    /// The number of delivery attempts for this message
625    pub delivered: i64,
626    /// the number of messages known by the server to be pending to this consumer
627    pub pending: u64,
628    /// the time that this message was received by the server from its publisher
629    pub published: time::OffsetDateTime,
630    /// Optional token, present in servers post-ADR-15
631    pub token: Option<&'a str>,
632}