async_nats/jetstream/message.rs
1// Copyright 2020-2022 The NATS Authors
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! A wrapped `crate::Message` with `JetStream` related methods.
15use super::context::Context;
16use crate::header::{IntoHeaderName, IntoHeaderValue};
17use crate::subject::ToSubject;
18use crate::{error, header, message, Error, HeaderValue};
19use crate::{subject::Subject, HeaderMap};
20use bytes::Bytes;
21use futures_util::future::TryFutureExt;
22use futures_util::StreamExt;
23use std::fmt::Display;
24use std::{mem, time::Duration};
25use time::format_description::well_known::Rfc3339;
26use time::OffsetDateTime;
27
28/// A message received directly from the stream, without leveraging a consumer.
29#[derive(Debug, Clone)]
30pub struct StreamMessage {
31 pub subject: Subject,
32 pub sequence: u64,
33 pub headers: HeaderMap,
34 pub payload: Bytes,
35 pub time: OffsetDateTime,
36}
37
38/// An outbound message to be published.
39/// Does not contain status or description which are valid only for inbound messages.
40pub struct OutboundMessage {
41 pub subject: Subject,
42 pub payload: Bytes,
43 pub headers: Option<HeaderMap>,
44}
45
46impl OutboundMessage {
47 pub fn new(subject: Subject, payload: Bytes, headers: Option<HeaderMap>) -> Self {
48 Self {
49 subject,
50 payload,
51 headers,
52 }
53 }
54}
55
56impl From<OutboundMessage> for message::OutboundMessage {
57 fn from(message: OutboundMessage) -> Self {
58 message::OutboundMessage {
59 subject: message.subject,
60 payload: message.payload,
61 headers: message.headers,
62 reply: None,
63 }
64 }
65}
66
67/// Used for building customized `publish` message.
68#[derive(Default, Clone, Debug)]
69pub struct PublishMessage {
70 pub(crate) payload: Bytes,
71 pub(crate) headers: Option<header::HeaderMap>,
72}
73impl PublishMessage {
74 /// Creates a new custom Publish struct to be used with.
75 pub fn build() -> Self {
76 Default::default()
77 }
78
79 /// Sets the payload for the message.
80 pub fn payload(mut self, payload: Bytes) -> Self {
81 self.payload = payload;
82 self
83 }
84 /// Adds headers to the message.
85 pub fn headers(mut self, headers: HeaderMap) -> Self {
86 self.headers = Some(headers);
87 self
88 }
89 /// A shorthand to add a single header.
90 pub fn header<N: IntoHeaderName, V: IntoHeaderValue>(mut self, name: N, value: V) -> Self {
91 self.headers
92 .get_or_insert(header::HeaderMap::new())
93 .insert(name, value);
94 self
95 }
96 /// Sets the `Nats-Msg-Id` header, that is used by stream deduplicate window.
97 pub fn message_id<T: AsRef<str>>(self, id: T) -> Self {
98 self.header(header::NATS_MESSAGE_ID, id.as_ref())
99 }
100 /// Sets expected last message ID.
101 /// It sets the `Nats-Expected-Last-Msg-Id` header with provided value.
102 pub fn expected_last_message_id<T: AsRef<str>>(self, last_message_id: T) -> Self {
103 self.header(
104 header::NATS_EXPECTED_LAST_MESSAGE_ID,
105 last_message_id.as_ref(),
106 )
107 }
108 /// Sets the last expected stream sequence.
109 /// It sets the `Nats-Expected-Last-Sequence` header with provided value.
110 pub fn expected_last_sequence(self, last_sequence: u64) -> Self {
111 self.header(
112 header::NATS_EXPECTED_LAST_SEQUENCE,
113 HeaderValue::from(last_sequence),
114 )
115 }
116 /// Sets the last expected stream sequence for a subject this message will be published to.
117 /// It sets the `Nats-Expected-Last-Subject-Sequence` header with provided value.
118 pub fn expected_last_subject_sequence(self, subject_sequence: u64) -> Self {
119 self.header(
120 header::NATS_EXPECTED_LAST_SUBJECT_SEQUENCE,
121 HeaderValue::from(subject_sequence),
122 )
123 }
124 /// Sets the expected stream name.
125 /// It sets the `Nats-Expected-Stream` header with provided value.
126 pub fn expected_stream<T: AsRef<str>>(self, stream: T) -> Self {
127 self.header(
128 header::NATS_EXPECTED_STREAM,
129 HeaderValue::from(stream.as_ref()),
130 )
131 }
132
133 #[cfg(feature = "server_2_11")]
134 /// Sets TTL for a single message.
135 /// It sets the `Nats-TTL` header with provided value.
136 pub fn ttl(self, ttl: Duration) -> Self {
137 self.header(header::NATS_MESSAGE_TTL, ttl.as_secs().to_string())
138 }
139
140 /// Creates an [crate::jetstream::message::OutboundMessage] that can be sent using
141 /// [crate::jetstream::context::traits::Publisher::publish_message].
142 pub fn outbound_message<S: ToSubject>(self, subject: S) -> OutboundMessage {
143 OutboundMessage {
144 subject: subject.to_subject(),
145 payload: self.payload,
146 headers: self.headers,
147 }
148 }
149}
150
151#[derive(Clone, Debug)]
152pub struct Message {
153 pub message: crate::Message,
154 pub context: Context,
155}
156
157impl TryFrom<crate::Message> for StreamMessage {
158 type Error = StreamMessageError;
159
160 fn try_from(message: crate::Message) -> Result<Self, Self::Error> {
161 let headers = message.headers.ok_or_else(|| {
162 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "no headers")
163 })?;
164
165 let sequence = headers
166 .get_last(header::NATS_SEQUENCE)
167 .ok_or_else(|| {
168 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "sequence")
169 })
170 .and_then(|seq| {
171 seq.as_str().parse().map_err(|err| {
172 StreamMessageError::with_source(
173 StreamMessageErrorKind::ParseError,
174 format!("could not parse sequence header: {err}"),
175 )
176 })
177 })?;
178
179 let time = headers
180 .get_last(header::NATS_TIME_STAMP)
181 .ok_or_else(|| {
182 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "timestamp")
183 })
184 .and_then(|time| {
185 OffsetDateTime::parse(time.as_str(), &Rfc3339).map_err(|err| {
186 StreamMessageError::with_source(
187 StreamMessageErrorKind::ParseError,
188 format!("could not parse timestamp header: {err}"),
189 )
190 })
191 })?;
192
193 let subject = headers
194 .get_last(header::NATS_SUBJECT)
195 .ok_or_else(|| {
196 StreamMessageError::with_source(StreamMessageErrorKind::MissingHeader, "subject")
197 })?
198 .as_str()
199 .into();
200
201 Ok(StreamMessage {
202 subject,
203 sequence,
204 headers,
205 payload: message.payload,
206 time,
207 })
208 }
209}
210
211#[derive(Debug, Clone, PartialEq)]
212pub enum StreamMessageErrorKind {
213 MissingHeader,
214 ParseError,
215}
216
217/// Error returned when library is unable to parse message got directly from the stream.
218pub type StreamMessageError = error::Error<StreamMessageErrorKind>;
219
220impl Display for StreamMessageErrorKind {
221 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
222 match self {
223 StreamMessageErrorKind::MissingHeader => write!(f, "missing message header"),
224 StreamMessageErrorKind::ParseError => write!(f, "parse error"),
225 }
226 }
227}
228
229impl std::ops::Deref for Message {
230 type Target = crate::Message;
231
232 fn deref(&self) -> &Self::Target {
233 &self.message
234 }
235}
236
237impl From<Message> for crate::Message {
238 fn from(source: Message) -> crate::Message {
239 source.message
240 }
241}
242
243impl Message {
244 /// Splits [Message] into [Acker] and [crate::Message].
245 /// This can help reduce memory footprint if [Message] can be dropped before acking,
246 /// for example when it's transformed into another structure and acked later
247 pub fn split(mut self) -> (crate::Message, Acker) {
248 let reply = mem::take(&mut self.message.reply);
249 (
250 self.message,
251 Acker {
252 context: self.context,
253 reply,
254 },
255 )
256 }
257 /// Acknowledges a message delivery by sending `+ACK` to the server.
258 ///
259 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
260 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
261 ///
262 /// # Examples
263 ///
264 /// ```no_run
265 /// # #[tokio::main]
266 /// # async fn main() -> Result<(), async_nats::Error> {
267 /// use async_nats::jetstream::consumer::PullConsumer;
268 /// use futures_util::StreamExt;
269 /// let client = async_nats::connect("localhost:4222").await?;
270 /// let jetstream = async_nats::jetstream::new(client);
271 ///
272 /// let consumer: PullConsumer = jetstream
273 /// .get_stream("events")
274 /// .await?
275 /// .get_consumer("pull")
276 /// .await?;
277 ///
278 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
279 ///
280 /// while let Some(message) = messages.next().await {
281 /// message?.ack().await?;
282 /// }
283 /// # Ok(())
284 /// # }
285 /// ```
286 pub async fn ack(&self) -> Result<(), Error> {
287 if let Some(ref reply) = self.reply {
288 self.context
289 .client
290 .publish(reply.clone(), "".into())
291 .map_err(Error::from)
292 .await
293 } else {
294 Err(Box::new(std::io::Error::other(
295 "No reply subject, not a JetStream message",
296 )))
297 }
298 }
299
300 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
301 ///
302 /// # Examples
303 ///
304 /// ```no_run
305 /// # #[tokio::main]
306 /// # async fn main() -> Result<(), async_nats::Error> {
307 /// use async_nats::jetstream::consumer::PullConsumer;
308 /// use async_nats::jetstream::AckKind;
309 /// use futures_util::StreamExt;
310 /// let client = async_nats::connect("localhost:4222").await?;
311 /// let jetstream = async_nats::jetstream::new(client);
312 ///
313 /// let consumer: PullConsumer = jetstream
314 /// .get_stream("events")
315 /// .await?
316 /// .get_consumer("pull")
317 /// .await?;
318 ///
319 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
320 ///
321 /// while let Some(message) = messages.next().await {
322 /// message?.ack_with(AckKind::Nak(None)).await?;
323 /// }
324 /// # Ok(())
325 /// # }
326 /// ```
327 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
328 if let Some(ref reply) = self.reply {
329 self.context
330 .client
331 .publish(reply.to_owned(), kind.into())
332 .map_err(Error::from)
333 .await
334 } else {
335 Err(Box::new(std::io::Error::other(
336 "No reply subject, not a JetStream message",
337 )))
338 }
339 }
340
341 /// Acknowledges a message delivery by sending `+ACK` to the server
342 /// and awaits for confirmation for the server that it received the message.
343 /// Useful if user wants to ensure `exactly once` semantics.
344 ///
345 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
346 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
347 ///
348 /// # Examples
349 ///
350 /// ```no_run
351 /// # #[tokio::main]
352 /// # async fn main() -> Result<(), async_nats::Error> {
353 /// use futures_util::StreamExt;
354 /// let client = async_nats::connect("localhost:4222").await?;
355 /// let jetstream = async_nats::jetstream::new(client);
356 ///
357 /// let consumer = jetstream
358 /// .get_stream("events")
359 /// .await?
360 /// .get_consumer("pull")
361 /// .await?;
362 ///
363 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
364 ///
365 /// while let Some(message) = messages.next().await {
366 /// message?.double_ack().await?;
367 /// }
368 /// # Ok(())
369 /// # }
370 /// ```
371 pub async fn double_ack(&self) -> Result<(), Error> {
372 if let Some(ref reply) = self.reply {
373 let inbox = self.context.client.new_inbox();
374 let mut subscription = self.context.client.subscribe(inbox.clone()).await?;
375 self.context
376 .client
377 .publish_with_reply(reply.clone(), inbox, AckKind::Ack.into())
378 .await?;
379 match tokio::time::timeout(self.context.timeout, subscription.next())
380 .await
381 .map_err(|_| {
382 std::io::Error::new(
383 std::io::ErrorKind::TimedOut,
384 "double ack response timed out",
385 )
386 })? {
387 Some(_) => Ok(()),
388 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
389 }
390 } else {
391 Err(Box::new(std::io::Error::other(
392 "No reply subject, not a JetStream message",
393 )))
394 }
395 }
396
397 /// Returns the `JetStream` message ID
398 /// if this is a `JetStream` message.
399 #[allow(clippy::mixed_read_write_in_expression)]
400 pub fn info(&self) -> Result<Info<'_>, Error> {
401 const PREFIX: &str = "$JS.ACK.";
402 const SKIP: usize = PREFIX.len();
403
404 let mut reply: &str = self.reply.as_ref().ok_or_else(|| {
405 std::io::Error::new(std::io::ErrorKind::NotFound, "did not found reply subject")
406 })?;
407
408 if !reply.starts_with(PREFIX) {
409 return Err(Box::new(std::io::Error::other(
410 "did not found proper prefix",
411 )));
412 }
413
414 reply = &reply[SKIP..];
415
416 let mut split = reply.split('.');
417
418 // we should avoid allocating to prevent
419 // large performance degradations in
420 // parsing this.
421 let mut tokens: [Option<&str>; 10] = [None; 10];
422 let mut n_tokens = 0;
423 for each_token in &mut tokens {
424 if let Some(token) = split.next() {
425 *each_token = Some(token);
426 n_tokens += 1;
427 }
428 }
429
430 let mut token_index = 0;
431
432 macro_rules! try_parse {
433 () => {
434 match str::parse(try_parse!(str)) {
435 Ok(parsed) => parsed,
436 Err(e) => {
437 return Err(Box::new(e));
438 }
439 }
440 };
441 (str) => {
442 if let Some(next) = tokens[token_index].take() {
443 #[allow(unused)]
444 {
445 // this isn't actually unused, but it's
446 // difficult for the compiler to infer this.
447 token_index += 1;
448 }
449 next
450 } else {
451 return Err(Box::new(std::io::Error::other("too few tokens")));
452 }
453 };
454 }
455
456 // now we can try to parse the tokens to
457 // individual types. We use an if-else
458 // chain instead of a match because it
459 // produces more optimal code usually,
460 // and we want to try the 9 (11 - the first 2)
461 // case first because we expect it to
462 // be the most common. We use >= to be
463 // future-proof.
464 if n_tokens >= 9 {
465 Ok(Info {
466 domain: {
467 let domain: &str = try_parse!(str);
468 if domain == "_" {
469 None
470 } else {
471 Some(domain)
472 }
473 },
474 acc_hash: Some(try_parse!(str)),
475 stream: try_parse!(str),
476 consumer: try_parse!(str),
477 delivered: try_parse!(),
478 stream_sequence: try_parse!(),
479 consumer_sequence: try_parse!(),
480 published: {
481 let nanos: i128 = try_parse!();
482 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
483 },
484 pending: try_parse!(),
485 token: if n_tokens >= 9 {
486 Some(try_parse!(str))
487 } else {
488 None
489 },
490 })
491 } else if n_tokens == 7 {
492 // we expect this to be increasingly rare, as older
493 // servers are phased out.
494 Ok(Info {
495 domain: None,
496 acc_hash: None,
497 stream: try_parse!(str),
498 consumer: try_parse!(str),
499 delivered: try_parse!(),
500 stream_sequence: try_parse!(),
501 consumer_sequence: try_parse!(),
502 published: {
503 let nanos: i128 = try_parse!();
504 OffsetDateTime::from_unix_timestamp_nanos(nanos)?
505 },
506 pending: try_parse!(),
507 token: None,
508 })
509 } else {
510 Err(Box::new(std::io::Error::other("bad token number")))
511 }
512 }
513}
514
515/// A lightweight struct useful for decoupling message contents and the ability to ack it.
516pub struct Acker {
517 context: Context,
518 reply: Option<Subject>,
519}
520
521// TODO(tp): This should be async trait to avoid duplication of code. Will be refactored into one when async traits are available.
522// The async-trait crate is not a solution here, as it would mean we're allocating at every ack.
523// Creating separate function to ack just to avoid one duplication is not worth it either.
524impl Acker {
525 pub fn new(context: Context, reply: Option<Subject>) -> Self {
526 Self { context, reply }
527 }
528 /// Acknowledges a message delivery by sending `+ACK` to the server.
529 ///
530 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
531 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
532 ///
533 /// # Examples
534 ///
535 /// ```no_run
536 /// # #[tokio::main]
537 /// # async fn main() -> Result<(), async_nats::Error> {
538 /// use async_nats::jetstream::consumer::PullConsumer;
539 /// use async_nats::jetstream::Message;
540 /// use futures_util::StreamExt;
541 /// let client = async_nats::connect("localhost:4222").await?;
542 /// let jetstream = async_nats::jetstream::new(client);
543 ///
544 /// let consumer: PullConsumer = jetstream
545 /// .get_stream("events")
546 /// .await?
547 /// .get_consumer("pull")
548 /// .await?;
549 ///
550 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
551 ///
552 /// while let Some(message) = messages.next().await {
553 /// let (message, acker) = message.map(Message::split)?;
554 /// // Do something with the message. Ownership can be taken over `Message`
555 /// // while retaining ability to ack later.
556 /// println!("message: {:?}", message);
557 /// // Ack it. `Message` may be dropped already.
558 /// acker.ack().await?;
559 /// }
560 /// # Ok(())
561 /// # }
562 /// ```
563 pub async fn ack(&self) -> Result<(), Error> {
564 if let Some(ref reply) = self.reply {
565 self.context
566 .client
567 .publish(reply.to_owned(), "".into())
568 .map_err(Error::from)
569 .await
570 } else {
571 Err(Box::new(std::io::Error::other(
572 "No reply subject, not a JetStream message",
573 )))
574 }
575 }
576
577 /// Acknowledges a message delivery by sending a chosen [AckKind] variant to the server.
578 ///
579 /// # Examples
580 ///
581 /// ```no_run
582 /// # #[tokio::main]
583 /// # async fn main() -> Result<(), async_nats::Error> {
584 /// use async_nats::jetstream::consumer::PullConsumer;
585 /// use async_nats::jetstream::AckKind;
586 /// use async_nats::jetstream::Message;
587 /// use futures_util::StreamExt;
588 /// let client = async_nats::connect("localhost:4222").await?;
589 /// let jetstream = async_nats::jetstream::new(client);
590 ///
591 /// let consumer: PullConsumer = jetstream
592 /// .get_stream("events")
593 /// .await?
594 /// .get_consumer("pull")
595 /// .await?;
596 ///
597 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
598 ///
599 /// while let Some(message) = messages.next().await {
600 /// let (message, acker) = message.map(Message::split)?;
601 /// // Do something with the message. Ownership can be taken over `Message`.
602 /// // while retaining ability to ack later.
603 /// println!("message: {:?}", message);
604 /// // Ack it. `Message` may be dropped already.
605 /// acker.ack_with(AckKind::Nak(None)).await?;
606 /// }
607 /// # Ok(())
608 /// # }
609 /// ```
610 pub async fn ack_with(&self, kind: AckKind) -> Result<(), Error> {
611 if let Some(ref reply) = self.reply {
612 self.context
613 .client
614 .publish(reply.to_owned(), kind.into())
615 .map_err(Error::from)
616 .await
617 } else {
618 Err(Box::new(std::io::Error::other(
619 "No reply subject, not a JetStream message",
620 )))
621 }
622 }
623
624 /// Acknowledges a message delivery by sending `+ACK` to the server
625 /// and awaits for confirmation for the server that it received the message.
626 /// Useful if user wants to ensure `exactly once` semantics.
627 ///
628 /// If [AckPolicy][crate::jetstream::consumer::AckPolicy] is set to `All` or `Explicit`, messages has to be acked.
629 /// Otherwise redeliveries will occur and [Consumer][crate::jetstream::consumer::Consumer] will not be able to advance.
630 ///
631 /// # Examples
632 ///
633 /// ```no_run
634 /// # #[tokio::main]
635 /// # async fn main() -> Result<(), async_nats::Error> {
636 /// use async_nats::jetstream::Message;
637 /// use futures_util::StreamExt;
638 /// let client = async_nats::connect("localhost:4222").await?;
639 /// let jetstream = async_nats::jetstream::new(client);
640 ///
641 /// let consumer = jetstream
642 /// .get_stream("events")
643 /// .await?
644 /// .get_consumer("pull")
645 /// .await?;
646 ///
647 /// let mut messages = consumer.fetch().max_messages(100).messages().await?;
648 ///
649 /// while let Some(message) = messages.next().await {
650 /// let (message, acker) = message.map(Message::split)?;
651 /// // Do something with the message. Ownership can be taken over `Message`.
652 /// // while retaining ability to ack later.
653 /// println!("message: {:?}", message);
654 /// // Ack it. `Message` may be dropped already.
655 /// acker.double_ack().await?;
656 /// }
657 /// # Ok(())
658 /// # }
659 /// ```
660 pub async fn double_ack(&self) -> Result<(), Error> {
661 if let Some(ref reply) = self.reply {
662 let inbox = self.context.client.new_inbox();
663 let mut subscription = self.context.client.subscribe(inbox.to_owned()).await?;
664 self.context
665 .client
666 .publish_with_reply(reply.to_owned(), inbox, AckKind::Ack.into())
667 .await?;
668 match tokio::time::timeout(self.context.timeout, subscription.next())
669 .await
670 .map_err(|_| {
671 std::io::Error::new(
672 std::io::ErrorKind::TimedOut,
673 "double ack response timed out",
674 )
675 })? {
676 Some(_) => Ok(()),
677 None => Err(Box::new(std::io::Error::other("subscription dropped"))),
678 }
679 } else {
680 Err(Box::new(std::io::Error::other(
681 "No reply subject, not a JetStream message",
682 )))
683 }
684 }
685}
686/// The kinds of response used for acknowledging a processed message.
687#[derive(Debug, Clone, Copy)]
688pub enum AckKind {
689 /// Acknowledges a message was completely handled.
690 Ack,
691 /// Signals that the message will not be processed now
692 /// and processing can move onto the next message, NAK'd
693 /// message will be retried.
694 Nak(Option<Duration>),
695 /// When sent before the AckWait period indicates that
696 /// work is ongoing and the period should be extended by
697 /// another equal to AckWait.
698 Progress,
699 /// Acknowledges the message was handled and requests
700 /// delivery of the next message to the reply subject.
701 /// Only applies to Pull-mode.
702 Next,
703 /// Instructs the server to stop redelivery of a message
704 /// without acknowledging it as successfully processed.
705 Term,
706}
707
708impl From<AckKind> for Bytes {
709 fn from(kind: AckKind) -> Self {
710 use AckKind::*;
711 match kind {
712 Ack => Bytes::from_static(b"+ACK"),
713 Nak(maybe_duration) => match maybe_duration {
714 None => Bytes::from_static(b"-NAK"),
715 Some(duration) => format!("-NAK {{\"delay\":{}}}", duration.as_nanos()).into(),
716 },
717 Progress => Bytes::from_static(b"+WPI"),
718 Next => Bytes::from_static(b"+NXT"),
719 Term => Bytes::from_static(b"+TERM"),
720 }
721 }
722}
723
724/// Information about a received message
725#[derive(Debug, Clone)]
726pub struct Info<'a> {
727 /// Optional domain, present in servers post-ADR-15
728 pub domain: Option<&'a str>,
729 /// Optional account hash, present in servers post-ADR-15
730 pub acc_hash: Option<&'a str>,
731 /// The stream name
732 pub stream: &'a str,
733 /// The consumer name
734 pub consumer: &'a str,
735 /// The stream sequence number associated with this message
736 pub stream_sequence: u64,
737 /// The consumer sequence number associated with this message
738 pub consumer_sequence: u64,
739 /// The number of delivery attempts for this message
740 pub delivered: i64,
741 /// the number of messages known by the server to be pending to this consumer
742 pub pending: u64,
743 /// the time that this message was received by the server from its publisher
744 pub published: time::OffsetDateTime,
745 /// Optional token, present in servers post-ADR-15
746 pub token: Option<&'a str>,
747}