async_nats/jetstream/message.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::{error, header, Error};
17use crate::{subject::Subject, HeaderMap};
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::fmt::Display;
22use std::{mem, time::Duration};
23use time::format_description::well_known::Rfc3339;
24use time::OffsetDateTime;
25
26/// A message received directly from the stream, without leveraging a consumer.
27#[derive(Debug, Clone)]
28pub struct StreamMessage {
29 pub subject: Subject,
30 pub sequence: u64,
31 pub headers: HeaderMap,
32 pub payload: Bytes,
33 pub time: OffsetDateTime,
34}
35
36#[derive(Clone, Debug)]
37pub struct Message {
38 pub message: crate::Message,
39 pub context: Context,
40}
41
42impl TryFrom<crate::Message> for StreamMessage {
43 type Error = StreamMessageError;
44
45 fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
46 let headers = message.headers.ok_or_else(|| {
47 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
48 })?;
49
50 let sequence = headers
51 .get_last(header::NATS_SEQUENCE)
52 .ok_or_else(|| {
53 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
54 })
55 .and_then(|seq| {
56 seq.as_str().parse().map_err(|err| {
57 StreamMessageError::with_source(
58 StreamMessageErrorKind::ParseError,
59 format!("could not parse sequence header: {}", err),
60 )
61 })
62 })?;
63
64 let time = headers
65 .get_last(header::NATS_TIME_STAMP)
66 .ok_or_else(|| {
67 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
68 })
69 .and_then(|time| {
70 OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
71 StreamMessageError::with_source(
72 StreamMessageErrorKind::ParseError,
73 format!("could not parse timestamp header: {}", err),
74 )
75 })
76 })?;
77
78 let subject = headers
79 .get_last(header::NATS_SUBJECT)
80 .ok_or_else(|| {
81 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
82 })?
83 .as_str()
84 .into();
85
86 Ok(StreamMessage {
87 subject,
88 sequence,
89 headers,
90 payload: message.payload,
91 time,
92 })
93 }
94}
95
96#[derive(Debug, Clone, PartialEq)]
97pub enum StreamMessageErrorKind {
98 MissingHeader,
99 ParseError,
100}
101
102/// Error returned when library is unable to parse message got directly from the stream.
103pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
104
105impl Display for StreamMessageErrorKind {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 match self {
108 StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
109 StreamMessageErrorKind::ParseError => write!(f, "parse error"),
110 }
111 }
112}
113
114impl std::ops::Deref for Message {
115 type Target = crate::Message;
116
117 fn deref(&self) -> &Self::Target {
118 &self.message
119 }
120}
121
122impl From<Message> for crate::Message {
123 fn from(source: Message) -> crate::Message {
124 source.message
125 }
126}
127
128impl Message {
129 /// Splits [Message] into [Acker] and [crate::Message].
130 /// This can help reduce memory footprint if [Message] can be dropped before acking,
131 /// for example when it's transformed into another structure and acked later
132 pub fn split(mut self) -> (crate::Message, Acker) {
133 let reply = mem::take(&mut self.message.reply);
134 (
135 self.message,
136 Acker {
137 context: self.context,
138 reply,
139 },
140 )
141 }
142 /// Acknowledges a message delivery by sending `+ACK` to the server.
143 ///
144 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
145 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
146 ///
147 /// # Examples
148 ///
149 /// ```no_run
150 /// # #[tokio::main]
151 /// # async fn main() -> Result<(), async_nats::Error> {
152 /// use async_nats::jetstream::consumer::PullConsumer;
153 /// use futures::StreamExt;
154 /// let client = async_nats::connect("localhost:4222").await?;
155 /// let jetstream = async_nats::jetstream::new(client);
156 ///
157 /// let consumer: PullConsumer = jetstream
158 /// .get_stream("events")
159 /// .await?
160 /// .get_consumer("pull")
161 /// .await?;
162 ///
163 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
164 ///
165 /// while let Some(message) = messages.next().await {
166 /// message?.ack().await?;
167 /// }
168 /// # Ok(())
169 /// # }
170 /// ```
171 pub async fn ack(&self) -> Result<(), Error> {
172 if let Some(ref reply) = self.reply {
173 self.context
174 .client
175 .publish(reply.clone(), "".into())
176 .map_err(Error::from)
177 .await
178 } else {
179 Err(Box::new(std::io::Error::other(
180 "No reply subject, not a JetStream message",
181 )))
182 }
183 }
184
185 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
186 ///
187 /// # Examples
188 ///
189 /// ```no_run
190 /// # #[tokio::main]
191 /// # async fn main() -> Result<(), async_nats::Error> {
192 /// use async_nats::jetstream::consumer::PullConsumer;
193 /// use async_nats::jetstream::AckKind;
194 /// use futures::StreamExt;
195 /// let client = async_nats::connect("localhost:4222").await?;
196 /// let jetstream = async_nats::jetstream::new(client);
197 ///
198 /// let consumer: PullConsumer = jetstream
199 /// .get_stream("events")
200 /// .await?
201 /// .get_consumer("pull")
202 /// .await?;
203 ///
204 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
205 ///
206 /// while let Some(message) = messages.next().await {
207 /// message?.ack_with(AckKind::Nak(None)).await?;
208 /// }
209 /// # Ok(())
210 /// # }
211 /// ```
212 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
213 if let Some(ref reply) = self.reply {
214 self.context
215 .client
216 .publish(reply.to_owned(), kind.into())
217 .map_err(Error::from)
218 .await
219 } else {
220 Err(Box::new(std::io::Error::other(
221 "No reply subject, not a JetStream message",
222 )))
223 }
224 }
225
226 /// Acknowledges a message delivery by sending `+ACK` to the server
227 /// and awaits for confirmation for the server that it received the message.
228 /// Useful if user wants to ensure `exactly once` semantics.
229 ///
230 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
231 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
232 ///
233 /// # Examples
234 ///
235 /// ```no_run
236 /// # #[tokio::main]
237 /// # async fn main() -> Result<(), async_nats::Error> {
238 /// use futures::StreamExt;
239 /// let client = async_nats::connect("localhost:4222").await?;
240 /// let jetstream = async_nats::jetstream::new(client);
241 ///
242 /// let consumer = jetstream
243 /// .get_stream("events")
244 /// .await?
245 /// .get_consumer("pull")
246 /// .await?;
247 ///
248 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
249 ///
250 /// while let Some(message) = messages.next().await {
251 /// message?.double_ack().await?;
252 /// }
253 /// # Ok(())
254 /// # }
255 /// ```
256 pub async fn double_ack(&self) -> Result<(), Error> {
257 if let Some(ref reply) = self.reply {
258 let inbox = self.context.client.new_inbox();
259 let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
260 self.context
261 .client
262 .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
263 .await?;
264 match tokio::time::timeout(self.context.timeout, subscription.next())
265 .await
266 .map_err(|_| {
267 std::io::Error::new(
268 std::io::ErrorKind::TimedOut,
269 "double ack response timed out",
270 )
271 })? {
272 Some(_) => Ok(()),
273 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
274 }
275 } else {
276 Err(Box::new(std::io::Error::other(
277 "No reply subject, not a JetStream message",
278 )))
279 }
280 }
281
282 /// Returns the `JetStream` message ID
283 /// if this is a `JetStream` message.
284 #[allow(clippy::mixed_read_write_in_expression)]
285 pub fn info(&self) -> Result<Info<'_>, Error> {
286 const PREFIX: &str = "$JS.ACK.";
287 const SKIP: usize = PREFIX.len();
288
289 let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
290 std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
291 })?;
292
293 if !reply.starts_with(PREFIX) {
294 return Err(Box::new(std::io::Error::other(
295 "did not found proper prefix",
296 )));
297 }
298
299 reply = &reply[SKIP..];
300
301 let mut split = reply.split('.');
302
303 // we should avoid allocating to prevent
304 // large performance degradations in
305 // parsing this.
306 let mut tokens: [Option<&str>; 10] = [None; 10];
307 let mut n_tokens = 0;
308 for each_token in &mut tokens {
309 if let Some(token) = split.next() {
310 *each_token = Some(token);
311 n_tokens += 1;
312 }
313 }
314
315 let mut token_index = 0;
316
317 macro_rules! try_parse {
318 () => {
319 match str::parse(try_parse!(str)) {
320 Ok(parsed) => parsed,
321 Err(e) => {
322 return Err(Box::new(e));
323 }
324 }
325 };
326 (str) => {
327 if let Some(next) = tokens[token_index].take() {
328 #[allow(unused)]
329 {
330 // this isn't actually unused, but it's
331 // difficult for the compiler to infer this.
332 token_index += 1;
333 }
334 next
335 } else {
336 return Err(Box::new(std::io::Error::other("too few tokens")));
337 }
338 };
339 }
340
341 // now we can try to parse the tokens to
342 // individual types. We use an if-else
343 // chain instead of a match because it
344 // produces more optimal code usually,
345 // and we want to try the 9 (11 - the first 2)
346 // case first because we expect it to
347 // be the most common. We use >= to be
348 // future-proof.
349 if n_tokens >= 9 {
350 Ok(Info {
351 domain: {
352 let domain: &str = try_parse!(str);
353 if domain == "_" {
354 None
355 } else {
356 Some(domain)
357 }
358 },
359 acc_hash: Some(try_parse!(str)),
360 stream: try_parse!(str),
361 consumer: try_parse!(str),
362 delivered: try_parse!(),
363 stream_sequence: try_parse!(),
364 consumer_sequence: try_parse!(),
365 published: {
366 let nanos: i128 = try_parse!();
367 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
368 },
369 pending: try_parse!(),
370 token: if n_tokens >= 9 {
371 Some(try_parse!(str))
372 } else {
373 None
374 },
375 })
376 } else if n_tokens == 7 {
377 // we expect this to be increasingly rare, as older
378 // servers are phased out.
379 Ok(Info {
380 domain: None,
381 acc_hash: None,
382 stream: try_parse!(str),
383 consumer: try_parse!(str),
384 delivered: try_parse!(),
385 stream_sequence: try_parse!(),
386 consumer_sequence: try_parse!(),
387 published: {
388 let nanos: i128 = try_parse!();
389 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
390 },
391 pending: try_parse!(),
392 token: None,
393 })
394 } else {
395 Err(Box::new(std::io::Error::other("bad token number")))
396 }
397 }
398}
399
400/// A lightweight struct useful for decoupling message contents and the ability to ack it.
401pub struct Acker {
402 context: Context,
403 reply: Option<Subject>,
404}
405
406// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
407// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
408// Creating separate function to ack just to avoid one duplication is not worth it either.
409impl Acker {
410 pub fn new(context: Context, reply: Option<Subject>) -> Self {
411 Self { context, reply }
412 }
413 /// Acknowledges a message delivery by sending `+ACK` to the server.
414 ///
415 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
416 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
417 ///
418 /// # Examples
419 ///
420 /// ```no_run
421 /// # #[tokio::main]
422 /// # async fn main() -> Result<(), async_nats::Error> {
423 /// use async_nats::jetstream::consumer::PullConsumer;
424 /// use async_nats::jetstream::Message;
425 /// use futures::StreamExt;
426 /// let client = async_nats::connect("localhost:4222").await?;
427 /// let jetstream = async_nats::jetstream::new(client);
428 ///
429 /// let consumer: PullConsumer = jetstream
430 /// .get_stream("events")
431 /// .await?
432 /// .get_consumer("pull")
433 /// .await?;
434 ///
435 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
436 ///
437 /// while let Some(message) = messages.next().await {
438 /// let (message, acker) = message.map(Message::split)?;
439 /// // Do something with the message. Ownership can be taken over `Message`
440 /// // while retaining ability to ack later.
441 /// println!("message: {:?}", message);
442 /// // Ack it. `Message` may be dropped already.
443 /// acker.ack().await?;
444 /// }
445 /// # Ok(())
446 /// # }
447 /// ```
448 pub async fn ack(&self) -> Result<(), Error> {
449 if let Some(ref reply) = self.reply {
450 self.context
451 .client
452 .publish(reply.to_owned(), "".into())
453 .map_err(Error::from)
454 .await
455 } else {
456 Err(Box::new(std::io::Error::other(
457 "No reply subject, not a JetStream message",
458 )))
459 }
460 }
461
462 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
463 ///
464 /// # Examples
465 ///
466 /// ```no_run
467 /// # #[tokio::main]
468 /// # async fn main() -> Result<(), async_nats::Error> {
469 /// use async_nats::jetstream::consumer::PullConsumer;
470 /// use async_nats::jetstream::AckKind;
471 /// use async_nats::jetstream::Message;
472 /// use futures::StreamExt;
473 /// let client = async_nats::connect("localhost:4222").await?;
474 /// let jetstream = async_nats::jetstream::new(client);
475 ///
476 /// let consumer: PullConsumer = jetstream
477 /// .get_stream("events")
478 /// .await?
479 /// .get_consumer("pull")
480 /// .await?;
481 ///
482 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
483 ///
484 /// while let Some(message) = messages.next().await {
485 /// let (message, acker) = message.map(Message::split)?;
486 /// // Do something with the message. Ownership can be taken over `Message`.
487 /// // while retaining ability to ack later.
488 /// println!("message: {:?}", message);
489 /// // Ack it. `Message` may be dropped already.
490 /// acker.ack_with(AckKind::Nak(None)).await?;
491 /// }
492 /// # Ok(())
493 /// # }
494 /// ```
495 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
496 if let Some(ref reply) = self.reply {
497 self.context
498 .client
499 .publish(reply.to_owned(), kind.into())
500 .map_err(Error::from)
501 .await
502 } else {
503 Err(Box::new(std::io::Error::other(
504 "No reply subject, not a JetStream message",
505 )))
506 }
507 }
508
509 /// Acknowledges a message delivery by sending `+ACK` to the server
510 /// and awaits for confirmation for the server that it received the message.
511 /// Useful if user wants to ensure `exactly once` semantics.
512 ///
513 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
514 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
515 ///
516 /// # Examples
517 ///
518 /// ```no_run
519 /// # #[tokio::main]
520 /// # async fn main() -> Result<(), async_nats::Error> {
521 /// use async_nats::jetstream::Message;
522 /// use futures::StreamExt;
523 /// let client = async_nats::connect("localhost:4222").await?;
524 /// let jetstream = async_nats::jetstream::new(client);
525 ///
526 /// let consumer = jetstream
527 /// .get_stream("events")
528 /// .await?
529 /// .get_consumer("pull")
530 /// .await?;
531 ///
532 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
533 ///
534 /// while let Some(message) = messages.next().await {
535 /// let (message, acker) = message.map(Message::split)?;
536 /// // Do something with the message. Ownership can be taken over `Message`.
537 /// // while retaining ability to ack later.
538 /// println!("message: {:?}", message);
539 /// // Ack it. `Message` may be dropped already.
540 /// acker.double_ack().await?;
541 /// }
542 /// # Ok(())
543 /// # }
544 /// ```
545 pub async fn double_ack(&self) -> Result<(), Error> {
546 if let Some(ref reply) = self.reply {
547 let inbox = self.context.client.new_inbox();
548 let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
549 self.context
550 .client
551 .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
552 .await?;
553 match tokio::time::timeout(self.context.timeout, subscription.next())
554 .await
555 .map_err(|_| {
556 std::io::Error::new(
557 std::io::ErrorKind::TimedOut,
558 "double ack response timed out",
559 )
560 })? {
561 Some(_) => Ok(()),
562 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
563 }
564 } else {
565 Err(Box::new(std::io::Error::other(
566 "No reply subject, not a JetStream message",
567 )))
568 }
569 }
570}
571/// The kinds of response used for acknowledging a processed message.
572#[derive(Debug, Clone, Copy)]
573pub enum AckKind {
574 /// Acknowledges a message was completely handled.
575 Ack,
576 /// Signals that the message will not be processed now
577 /// and processing can move onto the next message, NAK'd
578 /// message will be retried.
579 Nak(Option<Duration>),
580 /// When sent before the AckWait period indicates that
581 /// work is ongoing and the period should be extended by
582 /// another equal to AckWait.
583 Progress,
584 /// Acknowledges the message was handled and requests
585 /// delivery of the next message to the reply subject.
586 /// Only applies to Pull-mode.
587 Next,
588 /// Instructs the server to stop redelivery of a message
589 /// without acknowledging it as successfully processed.
590 Term,
591}
592
593impl From<AckKind> for Bytes {
594 fn from(kind: AckKind) -> Self {
595 use AckKind::*;
596 match kind {
597 Ack => Bytes::from_static(b"+ACK"),
598 Nak(maybe_duration) => match maybe_duration {
599 None => Bytes::from_static(b"-NAK"),
600 Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
601 },
602 Progress => Bytes::from_static(b"+WPI"),
603 Next => Bytes::from_static(b"+NXT"),
604 Term => Bytes::from_static(b"+TERM"),
605 }
606 }
607}
608
609/// Information about a received message
610#[derive(Debug, Clone)]
611pub struct Info<'a> {
612 /// Optional domain, present in servers post-ADR-15
613 pub domain: Option<&'a str>,
614 /// Optional account hash, present in servers post-ADR-15
615 pub acc_hash: Option<&'a str>,
616 /// The stream name
617 pub stream: &'a str,
618 /// The consumer name
619 pub consumer: &'a str,
620 /// The stream sequence number associated with this message
621 pub stream_sequence: u64,
622 /// The consumer sequence number associated with this message
623 pub consumer_sequence: u64,
624 /// The number of delivery attempts for this message
625 pub delivered: i64,
626 /// the number of messages known by the server to be pending to this consumer
627 pub pending: u64,
628 /// the time that this message was received by the server from its publisher
629 pub published: time::OffsetDateTime,
630 /// Optional token, present in servers post-ADR-15
631 pub token: Option<&'a str>,
632}