ruststream_nats/
message.rs1use std::fmt::{Debug, Formatter};
4use std::sync::OnceLock;
5
6use async_nats::jetstream::AckKind;
7use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
8
9use crate::convert::headers_from_nats;
10
11pub enum NatsMessage {
15 Core(Box<CoreMessage>),
17 JetStream(Box<JetStreamMessage>),
19}
20
21impl Debug for NatsMessage {
22 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23 match self {
24 Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
25 Self::JetStream(_) => f
26 .debug_struct("NatsMessage::JetStream")
27 .finish_non_exhaustive(),
28 }
29 }
30}
31
32pub struct CoreMessage {
34 inner: async_nats::Message,
35 headers: Headers,
36}
37
38impl Debug for CoreMessage {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("CoreMessage")
41 .field("subject", &self.inner.subject.as_str())
42 .field("payload_len", &self.inner.payload.len())
43 .finish_non_exhaustive()
44 }
45}
46
47impl CoreMessage {
48 pub(crate) fn new(inner: async_nats::Message) -> Self {
49 let mut headers = headers_from_nats(inner.headers.as_ref());
50 if let Some(reply) = inner.reply.as_ref() {
56 headers.insert("reply-to", reply.as_str().to_owned());
57 }
58 Self { inner, headers }
59 }
60}
61
62pub struct JetStreamMessage {
64 inner: async_nats::jetstream::Message,
65 headers: Headers,
66}
67
68impl Debug for JetStreamMessage {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("JetStreamMessage")
71 .field("subject", &self.inner.message.subject.as_str())
72 .field("payload_len", &self.inner.message.payload.len())
73 .finish_non_exhaustive()
74 }
75}
76
77impl JetStreamMessage {
78 pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
79 let headers = headers_from_nats(inner.message.headers.as_ref());
80 Self { inner, headers }
81 }
82
83 pub(crate) fn info(&self) -> Option<async_nats::jetstream::message::Info<'_>> {
90 self.inner.info().ok()
91 }
92}
93
94fn empty_headers() -> &'static Headers {
95 static EMPTY: OnceLock<Headers> = OnceLock::new();
96 EMPTY.get_or_init(Headers::new)
97}
98
99impl IncomingMessage for NatsMessage {
100 fn payload(&self) -> &[u8] {
101 match self {
102 Self::Core(m) => &m.inner.payload,
103 Self::JetStream(m) => &m.inner.message.payload,
104 }
105 }
106
107 fn headers(&self) -> &Headers {
108 match self {
109 Self::Core(m) => &m.headers,
110 Self::JetStream(m) => &m.headers,
111 }
112 }
113
114 async fn ack(self) -> Result<(), AckError> {
115 match self {
116 Self::Core(_) => Err(AckError::Unsupported),
117 Self::JetStream(m) => m
118 .inner
119 .ack()
120 .await
121 .map_err(|err| AckError::Broker(format_err(err))),
122 }
123 }
124
125 async fn nack(self, requeue: bool) -> Result<(), AckError> {
126 match self {
127 Self::Core(_) => Err(AckError::Unsupported),
128 Self::JetStream(m) => {
129 let kind = if requeue {
130 AckKind::Nak(None)
131 } else {
132 AckKind::Term
133 };
134 m.inner
135 .ack_with(kind)
136 .await
137 .map_err(|err| AckError::Broker(format_err(err)))
138 }
139 }
140 }
141}
142
143pub const PARTITION_KEY_HEADER: &str = "nats-partition-key";
149
150impl Partitioned for NatsMessage {
154 fn partition_key(&self) -> Option<&[u8]> {
155 self.headers().get(PARTITION_KEY_HEADER)
156 }
157}
158
159fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
160where
161 E: std::fmt::Display + Send + Sync + 'static,
162{
163 let msg = err.to_string();
164 Box::<dyn std::error::Error + Send + Sync>::from(msg)
165}
166
167#[allow(dead_code)]
168fn _empty_headers_keepalive() -> &'static Headers {
169 empty_headers()
170}
171
172#[cfg(test)]
173mod tests {
174 use super::*;
175
176 fn core_message(reply: Option<&str>) -> NatsMessage {
177 NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
178 subject: "subj".into(),
179 reply: reply.map(Into::into),
180 payload: bytes::Bytes::from_static(b"x"),
181 headers: None,
182 status: None,
183 description: None,
184 length: 1,
185 })))
186 }
187
188 #[test]
189 fn core_reply_inbox_surfaces_as_reply_to_header() {
190 let msg = core_message(Some("_INBOX.42"));
191 assert_eq!(msg.headers().reply_to(), Some("_INBOX.42"));
192 }
193
194 #[test]
195 fn core_message_without_reply_has_no_reply_to() {
196 assert_eq!(core_message(None).headers().reply_to(), None);
197 }
198}