Skip to main content

ruststream_nats/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::fmt::{Debug, Formatter};
4use std::sync::OnceLock;
5
6use async_nats::jetstream::AckKind;
7use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
8
9use crate::convert::headers_from_nats;
10
11/// A NATS delivery. Two flavours: core NATS (no ack) and `JetStream` (real ack/nack/redelivery).
12///
13/// Both variants are boxed to keep the enum compact; the wrapped `async_nats` messages are large.
14pub enum NatsMessage {
15    /// A core NATS subject delivery. Acknowledgement is not supported.
16    Core(Box<CoreMessage>),
17    /// A `JetStream` pull-consumer delivery with full ack support.
18    JetStream(Box<JetStreamMessage>),
19}
20
21impl Debug for NatsMessage {
22    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23        match self {
24            Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
25            Self::JetStream(_) => f
26                .debug_struct("NatsMessage::JetStream")
27                .finish_non_exhaustive(),
28        }
29    }
30}
31
32/// Wrapper around an `async_nats::Message` from a core (non-JetStream) subscription.
33pub struct CoreMessage {
34    inner: async_nats::Message,
35    headers: Headers,
36}
37
38impl Debug for CoreMessage {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("CoreMessage")
41            .field("subject", &self.inner.subject.as_str())
42            .field("payload_len", &self.inner.payload.len())
43            .finish_non_exhaustive()
44    }
45}
46
47impl CoreMessage {
48    pub(crate) fn new(inner: async_nats::Message) -> Self {
49        let mut headers = headers_from_nats(inner.headers.as_ref());
50        // NATS carries the request inbox in the wire-level `reply` field, not in a header.
51        // Surface it as the well-known `reply-to` header so framework handlers can respond
52        // (the in-memory testing broker already exposes it this way). The wire field is
53        // authoritative: it overrides a literal `reply-to` header if both are present.
54        // JetStream deliveries are excluded on purpose - there `reply` is the ack inbox.
55        if let Some(reply) = inner.reply.as_ref() {
56            headers.insert("reply-to", reply.as_str().to_owned());
57        }
58        Self { inner, headers }
59    }
60}
61
62/// Wrapper around an `async_nats::jetstream::Message` with ack semantics.
63pub struct JetStreamMessage {
64    inner: async_nats::jetstream::Message,
65    headers: Headers,
66}
67
68impl Debug for JetStreamMessage {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("JetStreamMessage")
71            .field("subject", &self.inner.message.subject.as_str())
72            .field("payload_len", &self.inner.message.payload.len())
73            .finish_non_exhaustive()
74    }
75}
76
77impl JetStreamMessage {
78    pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
79        let headers = headers_from_nats(inner.message.headers.as_ref());
80        Self { inner, headers }
81    }
82
83    /// The native `JetStream` delivery metadata (stream/consumer name and sequences, redelivery
84    /// count, pending count), parsed from the `$JS.ACK` reply subject.
85    ///
86    /// Returns `None` when the reply subject is absent or malformed - i.e. the underlying
87    /// `async_nats` parse failed - so a caller building a context can fall back to "no metadata"
88    /// rather than surfacing an error on the per-delivery hot path.
89    pub(crate) fn info(&self) -> Option<async_nats::jetstream::message::Info<'_>> {
90        self.inner.info().ok()
91    }
92}
93
94fn empty_headers() -> &'static Headers {
95    static EMPTY: OnceLock<Headers> = OnceLock::new();
96    EMPTY.get_or_init(Headers::new)
97}
98
99impl IncomingMessage for NatsMessage {
100    fn payload(&self) -> &[u8] {
101        match self {
102            Self::Core(m) => &m.inner.payload,
103            Self::JetStream(m) => &m.inner.message.payload,
104        }
105    }
106
107    fn headers(&self) -> &Headers {
108        match self {
109            Self::Core(m) => &m.headers,
110            Self::JetStream(m) => &m.headers,
111        }
112    }
113
114    async fn ack(self) -> Result<(), AckError> {
115        match self {
116            Self::Core(_) => Err(AckError::Unsupported),
117            Self::JetStream(m) => m
118                .inner
119                .ack()
120                .await
121                .map_err(|err| AckError::Broker(format_err(err))),
122        }
123    }
124
125    async fn nack(self, requeue: bool) -> Result<(), AckError> {
126        match self {
127            Self::Core(_) => Err(AckError::Unsupported),
128            Self::JetStream(m) => {
129                let kind = if requeue {
130                    AckKind::Nak(None)
131                } else {
132                    AckKind::Term
133                };
134                m.inner
135                    .ack_with(kind)
136                    .await
137                    .map_err(|err| AckError::Broker(format_err(err)))
138            }
139        }
140    }
141}
142
143/// The well-known header key for per-message routing / partitioning.
144///
145/// Set this header on outgoing messages to control key-based fan-out when the runtime is
146/// configured with `workers(N, by_key)`. The value is opaque bytes; the runtime hashes it to
147/// assign a dispatch lane.
148pub const PARTITION_KEY_HEADER: &str = "nats-partition-key";
149
150/// `Partitioned` lets the `workers(N, by_key)` runtime feature assign a dispatch lane based on
151/// a well-known message header. NATS has no native partition concept, so the key travels as the
152/// [`PARTITION_KEY_HEADER`] header value and the sender is responsible for setting it.
153impl Partitioned for NatsMessage {
154    fn partition_key(&self) -> Option<&[u8]> {
155        self.headers().get(PARTITION_KEY_HEADER)
156    }
157}
158
159fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
160where
161    E: std::fmt::Display + Send + Sync + 'static,
162{
163    let msg = err.to_string();
164    Box::<dyn std::error::Error + Send + Sync>::from(msg)
165}
166
167#[allow(dead_code)]
168fn _empty_headers_keepalive() -> &'static Headers {
169    empty_headers()
170}
171
172#[cfg(test)]
173mod tests {
174    use super::*;
175
176    fn core_message(reply: Option<&str>) -> NatsMessage {
177        NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
178            subject: "subj".into(),
179            reply: reply.map(Into::into),
180            payload: bytes::Bytes::from_static(b"x"),
181            headers: None,
182            status: None,
183            description: None,
184            length: 1,
185        })))
186    }
187
188    #[test]
189    fn core_reply_inbox_surfaces_as_reply_to_header() {
190        let msg = core_message(Some("_INBOX.42"));
191        assert_eq!(msg.headers().reply_to(), Some("_INBOX.42"));
192    }
193
194    #[test]
195    fn core_message_without_reply_has_no_reply_to() {
196        assert_eq!(core_message(None).headers().reply_to(), None);
197    }
198}