async_nats_wrpc/jetstream/message.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::subject::Subject;
17use crate::Error;
18use bytes::Bytes;
19use futures::future::TryFutureExt;
20use futures::StreamExt;
21use std::{mem, time::Duration};
22use time::OffsetDateTime;
23
24#[derive(Clone, Debug)]
25pub struct Message {
26 pub message: crate::Message,
27 pub context: Context,
28}
29
30impl std::ops::Deref for Message {
31 type Target = crate::Message;
32
33 fn deref(&self) -> &Self::Target {
34 &self.message
35 }
36}
37
38impl From<Message> for crate::Message {
39 fn from(source: Message) -> crate::Message {
40 source.message
41 }
42}
43
44impl Message {
45 /// Splits [Message] into [Acker] and [crate::Message].
46 /// This can help reduce memory footprint if [Message] can be dropped before acking,
47 /// for example when it's transformed into another structure and acked later
48 pub fn split(mut self) -> (crate::Message, Acker) {
49 let reply = mem::take(&mut self.message.reply);
50 (
51 self.message,
52 Acker {
53 context: self.context,
54 reply,
55 },
56 )
57 }
58 /// Acknowledges a message delivery by sending `+ACK` to the server.
59 ///
60 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
61 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
62 ///
63 /// # Examples
64 ///
65 /// ```no_run
66 /// # #[tokio::main]
67 /// # async fn main() -> Result<(), async_nats::Error> {
68 /// use async_nats::jetstream::consumer::PullConsumer;
69 /// use futures::StreamExt;
70 /// let client = async_nats::connect("localhost:4222").await?;
71 /// let jetstream = async_nats::jetstream::new(client);
72 ///
73 /// let consumer: PullConsumer = jetstream
74 /// .get_stream("events")
75 /// .await?
76 /// .get_consumer("pull")
77 /// .await?;
78 ///
79 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
80 ///
81 /// while let Some(message) = messages.next().await {
82 /// message?.ack().await?;
83 /// }
84 /// # Ok(())
85 /// # }
86 /// ```
87 pub async fn ack(&self) -> Result<(), Error> {
88 if let Some(ref reply) = self.reply {
89 self.context
90 .client
91 .publish(reply.clone(), "".into())
92 .map_err(Error::from)
93 .await
94 } else {
95 Err(Box::new(std::io::Error::new(
96 std::io::ErrorKind::Other,
97 "No reply subject, not a JetStream message",
98 )))
99 }
100 }
101
102 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
103 ///
104 /// # Examples
105 ///
106 /// ```no_run
107 /// # #[tokio::main]
108 /// # async fn main() -> Result<(), async_nats::Error> {
109 /// use async_nats::jetstream::consumer::PullConsumer;
110 /// use async_nats::jetstream::AckKind;
111 /// use futures::StreamExt;
112 /// let client = async_nats::connect("localhost:4222").await?;
113 /// let jetstream = async_nats::jetstream::new(client);
114 ///
115 /// let consumer: PullConsumer = jetstream
116 /// .get_stream("events")
117 /// .await?
118 /// .get_consumer("pull")
119 /// .await?;
120 ///
121 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
122 ///
123 /// while let Some(message) = messages.next().await {
124 /// message?.ack_with(AckKind::Nak(None)).await?;
125 /// }
126 /// # Ok(())
127 /// # }
128 /// ```
129 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
130 if let Some(ref reply) = self.reply {
131 self.context
132 .client
133 .publish(reply.to_owned(), kind.into())
134 .map_err(Error::from)
135 .await
136 } else {
137 Err(Box::new(std::io::Error::new(
138 std::io::ErrorKind::Other,
139 "No reply subject, not a JetStream message",
140 )))
141 }
142 }
143
144 /// Acknowledges a message delivery by sending `+ACK` to the server
145 /// and awaits for confirmation for the server that it received the message.
146 /// Useful if user wants to ensure `exactly once` semantics.
147 ///
148 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
149 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
150 ///
151 /// # Examples
152 ///
153 /// ```no_run
154 /// # #[tokio::main]
155 /// # async fn main() -> Result<(), async_nats::Error> {
156 /// use futures::StreamExt;
157 /// let client = async_nats::connect("localhost:4222").await?;
158 /// let jetstream = async_nats::jetstream::new(client);
159 ///
160 /// let consumer = jetstream
161 /// .get_stream("events")
162 /// .await?
163 /// .get_consumer("pull")
164 /// .await?;
165 ///
166 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
167 ///
168 /// while let Some(message) = messages.next().await {
169 /// message?.double_ack().await?;
170 /// }
171 /// # Ok(())
172 /// # }
173 /// ```
174 pub async fn double_ack(&self) -> Result<(), Error> {
175 if let Some(ref reply) = self.reply {
176 let inbox = self.context.client.new_inbox();
177 let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
178 self.context
179 .client
180 .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
181 .await?;
182 match tokio::time::timeout(self.context.timeout, subscription.next())
183 .await
184 .map_err(|_| {
185 std::io::Error::new(
186 std::io::ErrorKind::TimedOut,
187 "double ack response timed out",
188 )
189 })? {
190 Some(_) => Ok(()),
191 None => Err(Box::new(std::io::Error::new(
192 std::io::ErrorKind::Other,
193 "subscription dropped",
194 ))),
195 }
196 } else {
197 Err(Box::new(std::io::Error::new(
198 std::io::ErrorKind::Other,
199 "No reply subject, not a JetStream message",
200 )))
201 }
202 }
203
204 /// Returns the `JetStream` message ID
205 /// if this is a `JetStream` message.
206 #[allow(clippy::mixed_read_write_in_expression)]
207 pub fn info(&self) -> Result<Info<'_>, Error> {
208 const PREFIX: &str = "$JS.ACK.";
209 const SKIP: usize = PREFIX.len();
210
211 let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
212 std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
213 })?;
214
215 if !reply.starts_with(PREFIX) {
216 return Err(Box::new(std::io::Error::new(
217 std::io::ErrorKind::Other,
218 "did not found proper prefix",
219 )));
220 }
221
222 reply = &reply[SKIP..];
223
224 let mut split = reply.split('.');
225
226 // we should avoid allocating to prevent
227 // large performance degradations in
228 // parsing this.
229 let mut tokens: [Option<&str>; 10] = [None; 10];
230 let mut n_tokens = 0;
231 for each_token in &mut tokens {
232 if let Some(token) = split.next() {
233 *each_token = Some(token);
234 n_tokens += 1;
235 }
236 }
237
238 let mut token_index = 0;
239
240 macro_rules! try_parse {
241 () => {
242 match str::parse(try_parse!(str)) {
243 Ok(parsed) => parsed,
244 Err(e) => {
245 return Err(Box::new(e));
246 }
247 }
248 };
249 (str) => {
250 if let Some(next) = tokens[token_index].take() {
251 #[allow(unused)]
252 {
253 // this isn't actually unused, but it's
254 // difficult for the compiler to infer this.
255 token_index += 1;
256 }
257 next
258 } else {
259 return Err(Box::new(std::io::Error::new(
260 std::io::ErrorKind::Other,
261 "too few tokens",
262 )));
263 }
264 };
265 }
266
267 // now we can try to parse the tokens to
268 // individual types. We use an if-else
269 // chain instead of a match because it
270 // produces more optimal code usually,
271 // and we want to try the 9 (11 - the first 2)
272 // case first because we expect it to
273 // be the most common. We use >= to be
274 // future-proof.
275 if n_tokens >= 9 {
276 Ok(Info {
277 domain: {
278 let domain: &str = try_parse!(str);
279 if domain == "_" {
280 None
281 } else {
282 Some(domain)
283 }
284 },
285 acc_hash: Some(try_parse!(str)),
286 stream: try_parse!(str),
287 consumer: try_parse!(str),
288 delivered: try_parse!(),
289 stream_sequence: try_parse!(),
290 consumer_sequence: try_parse!(),
291 published: {
292 let nanos: i128 = try_parse!();
293 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
294 },
295 pending: try_parse!(),
296 token: if n_tokens >= 9 {
297 Some(try_parse!(str))
298 } else {
299 None
300 },
301 })
302 } else if n_tokens == 7 {
303 // we expect this to be increasingly rare, as older
304 // servers are phased out.
305 Ok(Info {
306 domain: None,
307 acc_hash: None,
308 stream: try_parse!(str),
309 consumer: try_parse!(str),
310 delivered: try_parse!(),
311 stream_sequence: try_parse!(),
312 consumer_sequence: try_parse!(),
313 published: {
314 let nanos: i128 = try_parse!();
315 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
316 },
317 pending: try_parse!(),
318 token: None,
319 })
320 } else {
321 Err(Box::new(std::io::Error::new(
322 std::io::ErrorKind::Other,
323 "bad token number",
324 )))
325 }
326 }
327}
328
329/// A lightweight struct useful for decoupling message contents and the ability to ack it.
330pub struct Acker {
331 context: Context,
332 reply: Option<Subject>,
333}
334
335// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
336// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
337// Creating separate function to ack just to avoid one duplication is not worth it either.
338impl Acker {
339 pub fn new(context: Context, reply: Option<Subject>) -> Self {
340 Self { context, reply }
341 }
342 /// Acknowledges a message delivery by sending `+ACK` to the server.
343 ///
344 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
345 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
346 ///
347 /// # Examples
348 ///
349 /// ```no_run
350 /// # #[tokio::main]
351 /// # async fn main() -> Result<(), async_nats::Error> {
352 /// use async_nats::jetstream::consumer::PullConsumer;
353 /// use async_nats::jetstream::Message;
354 /// use futures::StreamExt;
355 /// let client = async_nats::connect("localhost:4222").await?;
356 /// let jetstream = async_nats::jetstream::new(client);
357 ///
358 /// let consumer: PullConsumer = jetstream
359 /// .get_stream("events")
360 /// .await?
361 /// .get_consumer("pull")
362 /// .await?;
363 ///
364 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
365 ///
366 /// while let Some(message) = messages.next().await {
367 /// let (message, acker) = message.map(Message::split)?;
368 /// // Do something with the message. Ownership can be taken over `Message`
369 /// // while retaining ability to ack later.
370 /// println!("message: {:?}", message);
371 /// // Ack it. `Message` may be dropped already.
372 /// acker.ack().await?;
373 /// }
374 /// # Ok(())
375 /// # }
376 /// ```
377 pub async fn ack(&self) -> Result<(), Error> {
378 if let Some(ref reply) = self.reply {
379 self.context
380 .client
381 .publish(reply.to_owned(), "".into())
382 .map_err(Error::from)
383 .await
384 } else {
385 Err(Box::new(std::io::Error::new(
386 std::io::ErrorKind::Other,
387 "No reply subject, not a JetStream message",
388 )))
389 }
390 }
391
392 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
393 ///
394 /// # Examples
395 ///
396 /// ```no_run
397 /// # #[tokio::main]
398 /// # async fn main() -> Result<(), async_nats::Error> {
399 /// use async_nats::jetstream::consumer::PullConsumer;
400 /// use async_nats::jetstream::AckKind;
401 /// use async_nats::jetstream::Message;
402 /// use futures::StreamExt;
403 /// let client = async_nats::connect("localhost:4222").await?;
404 /// let jetstream = async_nats::jetstream::new(client);
405 ///
406 /// let consumer: PullConsumer = jetstream
407 /// .get_stream("events")
408 /// .await?
409 /// .get_consumer("pull")
410 /// .await?;
411 ///
412 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
413 ///
414 /// while let Some(message) = messages.next().await {
415 /// let (message, acker) = message.map(Message::split)?;
416 /// // Do something with the message. Ownership can be taken over `Message`.
417 /// // while retaining ability to ack later.
418 /// println!("message: {:?}", message);
419 /// // Ack it. `Message` may be dropped already.
420 /// acker.ack_with(AckKind::Nak(None)).await?;
421 /// }
422 /// # Ok(())
423 /// # }
424 /// ```
425 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
426 if let Some(ref reply) = self.reply {
427 self.context
428 .client
429 .publish(reply.to_owned(), kind.into())
430 .map_err(Error::from)
431 .await
432 } else {
433 Err(Box::new(std::io::Error::new(
434 std::io::ErrorKind::Other,
435 "No reply subject, not a JetStream message",
436 )))
437 }
438 }
439
440 /// Acknowledges a message delivery by sending `+ACK` to the server
441 /// and awaits for confirmation for the server that it received the message.
442 /// Useful if user wants to ensure `exactly once` semantics.
443 ///
444 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
445 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
446 ///
447 /// # Examples
448 ///
449 /// ```no_run
450 /// # #[tokio::main]
451 /// # async fn main() -> Result<(), async_nats::Error> {
452 /// use async_nats::jetstream::Message;
453 /// use futures::StreamExt;
454 /// let client = async_nats::connect("localhost:4222").await?;
455 /// let jetstream = async_nats::jetstream::new(client);
456 ///
457 /// let consumer = jetstream
458 /// .get_stream("events")
459 /// .await?
460 /// .get_consumer("pull")
461 /// .await?;
462 ///
463 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
464 ///
465 /// while let Some(message) = messages.next().await {
466 /// let (message, acker) = message.map(Message::split)?;
467 /// // Do something with the message. Ownership can be taken over `Message`.
468 /// // while retaining ability to ack later.
469 /// println!("message: {:?}", message);
470 /// // Ack it. `Message` may be dropped already.
471 /// acker.double_ack().await?;
472 /// }
473 /// # Ok(())
474 /// # }
475 /// ```
476 pub async fn double_ack(&self) -> Result<(), Error> {
477 if let Some(ref reply) = self.reply {
478 let inbox = self.context.client.new_inbox();
479 let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
480 self.context
481 .client
482 .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
483 .await?;
484 match tokio::time::timeout(self.context.timeout, subscription.next())
485 .await
486 .map_err(|_| {
487 std::io::Error::new(
488 std::io::ErrorKind::TimedOut,
489 "double ack response timed out",
490 )
491 })? {
492 Some(_) => Ok(()),
493 None => Err(Box::new(std::io::Error::new(
494 std::io::ErrorKind::Other,
495 "subscription dropped",
496 ))),
497 }
498 } else {
499 Err(Box::new(std::io::Error::new(
500 std::io::ErrorKind::Other,
501 "No reply subject, not a JetStream message",
502 )))
503 }
504 }
505}
506/// The kinds of response used for acknowledging a processed message.
507#[derive(Debug, Clone, Copy)]
508pub enum AckKind {
509 /// Acknowledges a message was completely handled.
510 Ack,
511 /// Signals that the message will not be processed now
512 /// and processing can move onto the next message, NAK'd
513 /// message will be retried.
514 Nak(Option<Duration>),
515 /// When sent before the AckWait period indicates that
516 /// work is ongoing and the period should be extended by
517 /// another equal to AckWait.
518 Progress,
519 /// Acknowledges the message was handled and requests
520 /// delivery of the next message to the reply subject.
521 /// Only applies to Pull-mode.
522 Next,
523 /// Instructs the server to stop redelivery of a message
524 /// without acknowledging it as successfully processed.
525 Term,
526}
527
528impl From<AckKind> for Bytes {
529 fn from(kind: AckKind) -> Self {
530 use AckKind::*;
531 match kind {
532 Ack => Bytes::from_static(b"+ACK"),
533 Nak(maybe_duration) => match maybe_duration {
534 None => Bytes::from_static(b"-NAK"),
535 Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
536 },
537 Progress => Bytes::from_static(b"+WPI"),
538 Next => Bytes::from_static(b"+NXT"),
539 Term => Bytes::from_static(b"+TERM"),
540 }
541 }
542}
543
544/// Information about a received message
545#[derive(Debug, Clone)]
546pub struct Info<'a> {
547 /// Optional domain, present in servers post-ADR-15
548 pub domain: Option<&'a str>,
549 /// Optional account hash, present in servers post-ADR-15
550 pub acc_hash: Option<&'a str>,
551 /// The stream name
552 pub stream: &'a str,
553 /// The consumer name
554 pub consumer: &'a str,
555 /// The stream sequence number associated with this message
556 pub stream_sequence: u64,
557 /// The consumer sequence number associated with this message
558 pub consumer_sequence: u64,
559 /// The number of delivery attempts for this message
560 pub delivered: i64,
561 /// the number of messages known by the server to be pending to this consumer
562 pub pending: u64,
563 /// the time that this message was received by the server from its publisher
564 pub published: time::OffsetDateTime,
565 /// Optional token, present in servers post-ADR-15
566 pub token: Option<&'a str>,
567}