Skip to main content

ruststream_nats/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::sync::OnceLock;
4
5use ruststream::{AckError, Headers, IncomingMessage};
6
7use crate::convert::headers_from_nats;
8
9/// A NATS delivery. Two flavours: core NATS (no ack) and `JetStream` (real ack/nack/redelivery).
10///
11/// Both variants are boxed to keep the enum compact; the wrapped `async_nats` messages are large.
12pub enum NatsMessage {
13    /// A core NATS subject delivery. Acknowledgement is not supported.
14    Core(Box<CoreMessage>),
15    /// A `JetStream` pull-consumer delivery with full ack support.
16    JetStream(Box<JetStreamMessage>),
17}
18
19impl std::fmt::Debug for NatsMessage {
20    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21        match self {
22            Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
23            Self::JetStream(_) => f
24                .debug_struct("NatsMessage::JetStream")
25                .finish_non_exhaustive(),
26        }
27    }
28}
29
30/// Wrapper around an `async_nats::Message` from a core (non-JetStream) subscription.
31pub struct CoreMessage {
32    inner: async_nats::Message,
33    headers: Headers,
34}
35
36impl std::fmt::Debug for CoreMessage {
37    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        f.debug_struct("CoreMessage")
39            .field("subject", &self.inner.subject.as_str())
40            .field("payload_len", &self.inner.payload.len())
41            .finish_non_exhaustive()
42    }
43}
44
45impl CoreMessage {
46    pub(crate) fn new(inner: async_nats::Message) -> Self {
47        let headers = headers_from_nats(inner.headers.as_ref());
48        Self { inner, headers }
49    }
50}
51
52/// Wrapper around an `async_nats::jetstream::Message` with ack semantics.
53pub struct JetStreamMessage {
54    inner: async_nats::jetstream::Message,
55    headers: Headers,
56}
57
58impl std::fmt::Debug for JetStreamMessage {
59    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60        f.debug_struct("JetStreamMessage")
61            .field("subject", &self.inner.message.subject.as_str())
62            .field("payload_len", &self.inner.message.payload.len())
63            .finish_non_exhaustive()
64    }
65}
66
67impl JetStreamMessage {
68    pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
69        let headers = headers_from_nats(inner.message.headers.as_ref());
70        Self { inner, headers }
71    }
72}
73
74fn empty_headers() -> &'static Headers {
75    static EMPTY: OnceLock<Headers> = OnceLock::new();
76    EMPTY.get_or_init(Headers::new)
77}
78
79impl IncomingMessage for NatsMessage {
80    fn payload(&self) -> &[u8] {
81        match self {
82            Self::Core(m) => &m.inner.payload,
83            Self::JetStream(m) => &m.inner.message.payload,
84        }
85    }
86
87    fn headers(&self) -> &Headers {
88        match self {
89            Self::Core(m) => &m.headers,
90            Self::JetStream(m) => &m.headers,
91        }
92    }
93
94    async fn ack(self) -> Result<(), AckError> {
95        match self {
96            Self::Core(_) => Err(AckError::Unsupported),
97            Self::JetStream(m) => m
98                .inner
99                .ack()
100                .await
101                .map_err(|err| AckError::Broker(format_err(err))),
102        }
103    }
104
105    async fn nack(self, requeue: bool) -> Result<(), AckError> {
106        match self {
107            Self::Core(_) => Err(AckError::Unsupported),
108            Self::JetStream(m) => {
109                use async_nats::jetstream::AckKind;
110                let kind = if requeue {
111                    AckKind::Nak(None)
112                } else {
113                    AckKind::Term
114                };
115                m.inner
116                    .ack_with(kind)
117                    .await
118                    .map_err(|err| AckError::Broker(format_err(err)))
119            }
120        }
121    }
122}
123
124fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
125where
126    E: std::fmt::Display + Send + Sync + 'static,
127{
128    let msg = err.to_string();
129    Box::<dyn std::error::Error + Send + Sync>::from(msg)
130}
131
132#[allow(dead_code)]
133fn _empty_headers_keepalive() -> &'static Headers {
134    empty_headers()
135}