async_nats_wrpc/jetstream/
message.rs

1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::subject::Subject;
17use crate::Error;
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::{mem, time::Duration};
22use time::OffsetDateTime;
23
24#[derive(Clone, Debug)]
25pub struct Message {
26    pub message: crate::Message,
27    pub context: Context,
28}
29
30impl std::ops::Deref for Message {
31    type Target = crate::Message;
32
33    fn deref(&self) -> &Self::Target {
34        &self.message
35    }
36}
37
38impl From<Message> for crate::Message {
39    fn from(source: Message) -> crate::Message {
40        source.message
41    }
42}
43
44impl Message {
45    /// Splits [Message] into [Acker] and [crate::Message].
46    /// This can help reduce memory footprint if [Message] can be dropped before acking,
47    /// for example when it's transformed into another structure and acked later
48    pub fn split(mut self) -> (crate::Message, Acker) {
49        let reply = mem::take(&mut self.message.reply);
50        (
51            self.message,
52            Acker {
53                context: self.context,
54                reply,
55            },
56        )
57    }
58    /// Acknowledges a message delivery by sending `+ACK` to the server.
59    ///
60    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
61    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
62    ///
63    /// # Examples
64    ///
65    /// ```no_run
66    /// # #[tokio::main]
67    /// # async fn main() -> Result<(), async_nats::Error> {
68    /// use async_nats::jetstream::consumer::PullConsumer;
69    /// use futures::StreamExt;
70    /// let client = async_nats::connect("localhost:4222").await?;
71    /// let jetstream = async_nats::jetstream::new(client);
72    ///
73    /// let consumer: PullConsumer = jetstream
74    ///     .get_stream("events")
75    ///     .await?
76    ///     .get_consumer("pull")
77    ///     .await?;
78    ///
79    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
80    ///
81    /// while let Some(message) = messages.next().await {
82    ///     message?.ack().await?;
83    /// }
84    /// # Ok(())
85    /// # }
86    /// ```
87    pub async fn ack(&self) -> Result<(), Error> {
88        if let Some(ref reply) = self.reply {
89            self.context
90                .client
91                .publish(reply.clone(), "".into())
92                .map_err(Error::from)
93                .await
94        } else {
95            Err(Box::new(std::io::Error::new(
96                std::io::ErrorKind::Other,
97                "No reply subject, not a JetStream message",
98            )))
99        }
100    }
101
102    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
103    ///
104    /// # Examples
105    ///
106    /// ```no_run
107    /// # #[tokio::main]
108    /// # async fn main() -> Result<(), async_nats::Error> {
109    /// use async_nats::jetstream::consumer::PullConsumer;
110    /// use async_nats::jetstream::AckKind;
111    /// use futures::StreamExt;
112    /// let client = async_nats::connect("localhost:4222").await?;
113    /// let jetstream = async_nats::jetstream::new(client);
114    ///
115    /// let consumer: PullConsumer = jetstream
116    ///     .get_stream("events")
117    ///     .await?
118    ///     .get_consumer("pull")
119    ///     .await?;
120    ///
121    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
122    ///
123    /// while let Some(message) = messages.next().await {
124    ///     message?.ack_with(AckKind::Nak(None)).await?;
125    /// }
126    /// # Ok(())
127    /// # }
128    /// ```
129    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
130        if let Some(ref reply) = self.reply {
131            self.context
132                .client
133                .publish(reply.to_owned(), kind.into())
134                .map_err(Error::from)
135                .await
136        } else {
137            Err(Box::new(std::io::Error::new(
138                std::io::ErrorKind::Other,
139                "No reply subject, not a JetStream message",
140            )))
141        }
142    }
143
144    /// Acknowledges a message delivery by sending `+ACK` to the server
145    /// and awaits for confirmation for the server that it received the message.
146    /// Useful if user wants to ensure `exactly once` semantics.
147    ///
148    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
149    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
150    ///
151    /// # Examples
152    ///
153    /// ```no_run
154    /// # #[tokio::main]
155    /// # async fn main() -> Result<(), async_nats::Error> {
156    /// use futures::StreamExt;
157    /// let client = async_nats::connect("localhost:4222").await?;
158    /// let jetstream = async_nats::jetstream::new(client);
159    ///
160    /// let consumer = jetstream
161    ///     .get_stream("events")
162    ///     .await?
163    ///     .get_consumer("pull")
164    ///     .await?;
165    ///
166    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
167    ///
168    /// while let Some(message) = messages.next().await {
169    ///     message?.double_ack().await?;
170    /// }
171    /// # Ok(())
172    /// # }
173    /// ```
174    pub async fn double_ack(&self) -> Result<(), Error> {
175        if let Some(ref reply) = self.reply {
176            let inbox = self.context.client.new_inbox();
177            let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
178            self.context
179                .client
180                .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
181                .await?;
182            match tokio::time::timeout(self.context.timeout, subscription.next())
183                .await
184                .map_err(|_| {
185                    std::io::Error::new(
186                        std::io::ErrorKind::TimedOut,
187                        "double ack response timed out",
188                    )
189                })? {
190                Some(_) => Ok(()),
191                None => Err(Box::new(std::io::Error::new(
192                    std::io::ErrorKind::Other,
193                    "subscription dropped",
194                ))),
195            }
196        } else {
197            Err(Box::new(std::io::Error::new(
198                std::io::ErrorKind::Other,
199                "No reply subject, not a JetStream message",
200            )))
201        }
202    }
203
204    /// Returns the `JetStream` message ID
205    /// if this is a `JetStream` message.
206    #[allow(clippy::mixed_read_write_in_expression)]
207    pub fn info(&self) -> Result<Info<'_>, Error> {
208        const PREFIX: &str = "$JS.ACK.";
209        const SKIP: usize = PREFIX.len();
210
211        let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
212            std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
213        })?;
214
215        if !reply.starts_with(PREFIX) {
216            return Err(Box::new(std::io::Error::new(
217                std::io::ErrorKind::Other,
218                "did not found proper prefix",
219            )));
220        }
221
222        reply = &reply[SKIP..];
223
224        let mut split = reply.split('.');
225
226        // we should avoid allocating to prevent
227        // large performance degradations in
228        // parsing this.
229        let mut tokens: [Option<&str>; 10] = [None; 10];
230        let mut n_tokens = 0;
231        for each_token in &mut tokens {
232            if let Some(token) = split.next() {
233                *each_token = Some(token);
234                n_tokens += 1;
235            }
236        }
237
238        let mut token_index = 0;
239
240        macro_rules! try_parse {
241            () => {
242                match str::parse(try_parse!(str)) {
243                    Ok(parsed) => parsed,
244                    Err(e) => {
245                        return Err(Box::new(e));
246                    }
247                }
248            };
249            (str) => {
250                if let Some(next) = tokens[token_index].take() {
251                    #[allow(unused)]
252                    {
253                        // this isn't actually unused, but it's
254                        // difficult for the compiler to infer this.
255                        token_index += 1;
256                    }
257                    next
258                } else {
259                    return Err(Box::new(std::io::Error::new(
260                        std::io::ErrorKind::Other,
261                        "too few tokens",
262                    )));
263                }
264            };
265        }
266
267        // now we can try to parse the tokens to
268        // individual types. We use an if-else
269        // chain instead of a match because it
270        // produces more optimal code usually,
271        // and we want to try the 9 (11 - the first 2)
272        // case first because we expect it to
273        // be the most common. We use >= to be
274        // future-proof.
275        if n_tokens >= 9 {
276            Ok(Info {
277                domain: {
278                    let domain: &str = try_parse!(str);
279                    if domain == "_" {
280                        None
281                    } else {
282                        Some(domain)
283                    }
284                },
285                acc_hash: Some(try_parse!(str)),
286                stream: try_parse!(str),
287                consumer: try_parse!(str),
288                delivered: try_parse!(),
289                stream_sequence: try_parse!(),
290                consumer_sequence: try_parse!(),
291                published: {
292                    let nanos: i128 = try_parse!();
293                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
294                },
295                pending: try_parse!(),
296                token: if n_tokens >= 9 {
297                    Some(try_parse!(str))
298                } else {
299                    None
300                },
301            })
302        } else if n_tokens == 7 {
303            // we expect this to be increasingly rare, as older
304            // servers are phased out.
305            Ok(Info {
306                domain: None,
307                acc_hash: None,
308                stream: try_parse!(str),
309                consumer: try_parse!(str),
310                delivered: try_parse!(),
311                stream_sequence: try_parse!(),
312                consumer_sequence: try_parse!(),
313                published: {
314                    let nanos: i128 = try_parse!();
315                    OffsetDateTime::from_unix_timestamp_nanos(nanos)?
316                },
317                pending: try_parse!(),
318                token: None,
319            })
320        } else {
321            Err(Box::new(std::io::Error::new(
322                std::io::ErrorKind::Other,
323                "bad token number",
324            )))
325        }
326    }
327}
328
329/// A lightweight struct useful for decoupling message contents and the ability to ack it.
330pub struct Acker {
331    context: Context,
332    reply: Option<Subject>,
333}
334
335// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
336// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
337// Creating separate function to ack just to avoid one duplication is not worth it either.
338impl Acker {
339    pub fn new(context: Context, reply: Option<Subject>) -> Self {
340        Self { context, reply }
341    }
342    /// Acknowledges a message delivery by sending `+ACK` to the server.
343    ///
344    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
345    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
346    ///
347    /// # Examples
348    ///
349    /// ```no_run
350    /// # #[tokio::main]
351    /// # async fn main() -> Result<(), async_nats::Error> {
352    /// use async_nats::jetstream::consumer::PullConsumer;
353    /// use async_nats::jetstream::Message;
354    /// use futures::StreamExt;
355    /// let client = async_nats::connect("localhost:4222").await?;
356    /// let jetstream = async_nats::jetstream::new(client);
357    ///
358    /// let consumer: PullConsumer = jetstream
359    ///     .get_stream("events")
360    ///     .await?
361    ///     .get_consumer("pull")
362    ///     .await?;
363    ///
364    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
365    ///
366    /// while let Some(message) = messages.next().await {
367    ///     let (message, acker) = message.map(Message::split)?;
368    ///     // Do something with the message. Ownership can be taken over `Message`
369    ///     // while retaining ability to ack later.
370    ///     println!("message: {:?}", message);
371    ///     // Ack it. `Message` may be dropped already.
372    ///     acker.ack().await?;
373    /// }
374    /// # Ok(())
375    /// # }
376    /// ```
377    pub async fn ack(&self) -> Result<(), Error> {
378        if let Some(ref reply) = self.reply {
379            self.context
380                .client
381                .publish(reply.to_owned(), "".into())
382                .map_err(Error::from)
383                .await
384        } else {
385            Err(Box::new(std::io::Error::new(
386                std::io::ErrorKind::Other,
387                "No reply subject, not a JetStream message",
388            )))
389        }
390    }
391
392    /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
393    ///
394    /// # Examples
395    ///
396    /// ```no_run
397    /// # #[tokio::main]
398    /// # async fn main() -> Result<(), async_nats::Error> {
399    /// use async_nats::jetstream::consumer::PullConsumer;
400    /// use async_nats::jetstream::AckKind;
401    /// use async_nats::jetstream::Message;
402    /// use futures::StreamExt;
403    /// let client = async_nats::connect("localhost:4222").await?;
404    /// let jetstream = async_nats::jetstream::new(client);
405    ///
406    /// let consumer: PullConsumer = jetstream
407    ///     .get_stream("events")
408    ///     .await?
409    ///     .get_consumer("pull")
410    ///     .await?;
411    ///
412    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
413    ///
414    /// while let Some(message) = messages.next().await {
415    ///     let (message, acker) = message.map(Message::split)?;
416    ///     // Do something with the message. Ownership can be taken over `Message`.
417    ///     // while retaining ability to ack later.
418    ///     println!("message: {:?}", message);
419    ///     // Ack it. `Message` may be dropped already.
420    ///     acker.ack_with(AckKind::Nak(None)).await?;
421    /// }
422    /// # Ok(())
423    /// # }
424    /// ```
425    pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
426        if let Some(ref reply) = self.reply {
427            self.context
428                .client
429                .publish(reply.to_owned(), kind.into())
430                .map_err(Error::from)
431                .await
432        } else {
433            Err(Box::new(std::io::Error::new(
434                std::io::ErrorKind::Other,
435                "No reply subject, not a JetStream message",
436            )))
437        }
438    }
439
440    /// Acknowledges a message delivery by sending `+ACK` to the server
441    /// and awaits for confirmation for the server that it received the message.
442    /// Useful if user wants to ensure `exactly once` semantics.
443    ///
444    /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
445    /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
446    ///
447    /// # Examples
448    ///
449    /// ```no_run
450    /// # #[tokio::main]
451    /// # async fn main() -> Result<(), async_nats::Error> {
452    /// use async_nats::jetstream::Message;
453    /// use futures::StreamExt;
454    /// let client = async_nats::connect("localhost:4222").await?;
455    /// let jetstream = async_nats::jetstream::new(client);
456    ///
457    /// let consumer = jetstream
458    ///     .get_stream("events")
459    ///     .await?
460    ///     .get_consumer("pull")
461    ///     .await?;
462    ///
463    /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
464    ///
465    /// while let Some(message) = messages.next().await {
466    ///     let (message, acker) = message.map(Message::split)?;
467    ///     // Do something with the message. Ownership can be taken over `Message`.
468    ///     // while retaining ability to ack later.
469    ///     println!("message: {:?}", message);
470    ///     // Ack it. `Message` may be dropped already.
471    ///     acker.double_ack().await?;
472    /// }
473    /// # Ok(())
474    /// # }
475    /// ```
476    pub async fn double_ack(&self) -> Result<(), Error> {
477        if let Some(ref reply) = self.reply {
478            let inbox = self.context.client.new_inbox();
479            let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
480            self.context
481                .client
482                .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
483                .await?;
484            match tokio::time::timeout(self.context.timeout, subscription.next())
485                .await
486                .map_err(|_| {
487                    std::io::Error::new(
488                        std::io::ErrorKind::TimedOut,
489                        "double ack response timed out",
490                    )
491                })? {
492                Some(_) => Ok(()),
493                None => Err(Box::new(std::io::Error::new(
494                    std::io::ErrorKind::Other,
495                    "subscription dropped",
496                ))),
497            }
498        } else {
499            Err(Box::new(std::io::Error::new(
500                std::io::ErrorKind::Other,
501                "No reply subject, not a JetStream message",
502            )))
503        }
504    }
505}
506/// The kinds of response used for acknowledging a processed message.
507#[derive(Debug, Clone, Copy)]
508pub enum AckKind {
509    /// Acknowledges a message was completely handled.
510    Ack,
511    /// Signals that the message will not be processed now
512    /// and processing can move onto the next message, NAK'd
513    /// message will be retried.
514    Nak(Option<Duration>),
515    /// When sent before the AckWait period indicates that
516    /// work is ongoing and the period should be extended by
517    /// another equal to AckWait.
518    Progress,
519    /// Acknowledges the message was handled and requests
520    /// delivery of the next message to the reply subject.
521    /// Only applies to Pull-mode.
522    Next,
523    /// Instructs the server to stop redelivery of a message
524    /// without acknowledging it as successfully processed.
525    Term,
526}
527
528impl From<AckKind> for Bytes {
529    fn from(kind: AckKind) -> Self {
530        use AckKind::*;
531        match kind {
532            Ack => Bytes::from_static(b"+ACK"),
533            Nak(maybe_duration) => match maybe_duration {
534                None => Bytes::from_static(b"-NAK"),
535                Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
536            },
537            Progress => Bytes::from_static(b"+WPI"),
538            Next => Bytes::from_static(b"+NXT"),
539            Term => Bytes::from_static(b"+TERM"),
540        }
541    }
542}
543
544/// Information about a received message
545#[derive(Debug, Clone)]
546pub struct Info<'a> {
547    /// Optional domain, present in servers post-ADR-15
548    pub domain: Option<&'a str>,
549    /// Optional account hash, present in servers post-ADR-15
550    pub acc_hash: Option<&'a str>,
551    /// The stream name
552    pub stream: &'a str,
553    /// The consumer name
554    pub consumer: &'a str,
555    /// The stream sequence number associated with this message
556    pub stream_sequence: u64,
557    /// The consumer sequence number associated with this message
558    pub consumer_sequence: u64,
559    /// The number of delivery attempts for this message
560    pub delivered: i64,
561    /// the number of messages known by the server to be pending to this consumer
562    pub pending: u64,
563    /// the time that this message was received by the server from its publisher
564    pub published: time::OffsetDateTime,
565    /// Optional token, present in servers post-ADR-15
566    pub token: Option<&'a str>,
567}