Skip to main content

ruststream_nats/
message.rs

1//! Delivered-message wrapper that implements [`IncomingMessage`].
2
3use std::fmt::{Debug, Formatter};
4use std::sync::OnceLock;
5
6use async_nats::jetstream::AckKind;
7use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
8
9use crate::convert::headers_from_nats;
10
11/// A NATS delivery. Two flavours: core NATS (no ack) and `JetStream` (real ack/nack/redelivery).
12///
13/// Both variants are boxed to keep the enum compact; the wrapped `async_nats` messages are large.
14pub enum NatsMessage {
15    /// A core NATS subject delivery. Acknowledgement is not supported.
16    Core(Box<CoreMessage>),
17    /// A `JetStream` pull-consumer delivery with full ack support.
18    JetStream(Box<JetStreamMessage>),
19}
20
21impl Debug for NatsMessage {
22    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23        match self {
24            Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
25            Self::JetStream(_) => f
26                .debug_struct("NatsMessage::JetStream")
27                .finish_non_exhaustive(),
28        }
29    }
30}
31
32/// Wrapper around an `async_nats::Message` from a core (non-JetStream) subscription.
33pub struct CoreMessage {
34    inner: async_nats::Message,
35    headers: Headers,
36}
37
38impl Debug for CoreMessage {
39    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40        f.debug_struct("CoreMessage")
41            .field("subject", &self.inner.subject.as_str())
42            .field("payload_len", &self.inner.payload.len())
43            .finish_non_exhaustive()
44    }
45}
46
47impl CoreMessage {
48    pub(crate) fn new(inner: async_nats::Message) -> Self {
49        let mut headers = headers_from_nats(inner.headers.as_ref());
50        // NATS carries the request inbox in the wire-level `reply` field, not in a header.
51        // Surface it as the well-known `reply-to` header so framework handlers can respond
52        // (the in-memory testing broker already exposes it this way). The wire field is
53        // authoritative: it overrides a literal `reply-to` header if both are present.
54        // JetStream deliveries are excluded on purpose - there `reply` is the ack inbox.
55        if let Some(reply) = inner.reply.as_ref() {
56            headers.insert("reply-to", reply.as_str().to_owned());
57        }
58        Self { inner, headers }
59    }
60}
61
62/// Wrapper around an `async_nats::jetstream::Message` with ack semantics.
63pub struct JetStreamMessage {
64    inner: async_nats::jetstream::Message,
65    headers: Headers,
66}
67
68impl Debug for JetStreamMessage {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        f.debug_struct("JetStreamMessage")
71            .field("subject", &self.inner.message.subject.as_str())
72            .field("payload_len", &self.inner.message.payload.len())
73            .finish_non_exhaustive()
74    }
75}
76
77impl JetStreamMessage {
78    pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
79        let headers = headers_from_nats(inner.message.headers.as_ref());
80        Self { inner, headers }
81    }
82}
83
84fn empty_headers() -> &'static Headers {
85    static EMPTY: OnceLock<Headers> = OnceLock::new();
86    EMPTY.get_or_init(Headers::new)
87}
88
89impl IncomingMessage for NatsMessage {
90    fn payload(&self) -> &[u8] {
91        match self {
92            Self::Core(m) => &m.inner.payload,
93            Self::JetStream(m) => &m.inner.message.payload,
94        }
95    }
96
97    fn headers(&self) -> &Headers {
98        match self {
99            Self::Core(m) => &m.headers,
100            Self::JetStream(m) => &m.headers,
101        }
102    }
103
104    async fn ack(self) -> Result<(), AckError> {
105        match self {
106            Self::Core(_) => Err(AckError::Unsupported),
107            Self::JetStream(m) => m
108                .inner
109                .ack()
110                .await
111                .map_err(|err| AckError::Broker(format_err(err))),
112        }
113    }
114
115    async fn nack(self, requeue: bool) -> Result<(), AckError> {
116        match self {
117            Self::Core(_) => Err(AckError::Unsupported),
118            Self::JetStream(m) => {
119                let kind = if requeue {
120                    AckKind::Nak(None)
121                } else {
122                    AckKind::Term
123                };
124                m.inner
125                    .ack_with(kind)
126                    .await
127                    .map_err(|err| AckError::Broker(format_err(err)))
128            }
129        }
130    }
131}
132
133/// The well-known header key for per-message routing / partitioning.
134///
135/// Set this header on outgoing messages to control key-based fan-out when the runtime is
136/// configured with `workers(N, by_key)`. The value is opaque bytes; the runtime hashes it to
137/// assign a dispatch lane.
138pub const PARTITION_KEY_HEADER: &str = "nats-partition-key";
139
140/// `Partitioned` lets the `workers(N, by_key)` runtime feature assign a dispatch lane based on
141/// a well-known message header. NATS has no native partition concept, so the key travels as the
142/// [`PARTITION_KEY_HEADER`] header value and the sender is responsible for setting it.
143impl Partitioned for NatsMessage {
144    fn partition_key(&self) -> Option<&[u8]> {
145        self.headers().get(PARTITION_KEY_HEADER)
146    }
147}
148
149fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
150where
151    E: std::fmt::Display + Send + Sync + 'static,
152{
153    let msg = err.to_string();
154    Box::<dyn std::error::Error + Send + Sync>::from(msg)
155}
156
157#[allow(dead_code)]
158fn _empty_headers_keepalive() -> &'static Headers {
159    empty_headers()
160}
161
162#[cfg(test)]
163mod tests {
164    use super::*;
165
166    fn core_message(reply: Option<&str>) -> NatsMessage {
167        NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
168            subject: "subj".into(),
169            reply: reply.map(Into::into),
170            payload: bytes::Bytes::from_static(b"x"),
171            headers: None,
172            status: None,
173            description: None,
174            length: 1,
175        })))
176    }
177
178    #[test]
179    fn core_reply_inbox_surfaces_as_reply_to_header() {
180        let msg = core_message(Some("_INBOX.42"));
181        assert_eq!(msg.headers().reply_to(), Some("_INBOX.42"));
182    }
183
184    #[test]
185    fn core_message_without_reply_has_no_reply_to() {
186        assert_eq!(core_message(None).headers().reply_to(), None);
187    }
188}