ruststream_nats/
message.rs1use std::fmt::{Debug, Formatter};
4use std::sync::OnceLock;
5
6use async_nats::jetstream::AckKind;
7use ruststream::{AckError, Headers, IncomingMessage, Partitioned};
8
9use crate::convert::headers_from_nats;
10
11pub enum NatsMessage {
15 Core(Box<CoreMessage>),
17 JetStream(Box<JetStreamMessage>),
19}
20
21impl Debug for NatsMessage {
22 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
23 match self {
24 Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
25 Self::JetStream(_) => f
26 .debug_struct("NatsMessage::JetStream")
27 .finish_non_exhaustive(),
28 }
29 }
30}
31
32pub struct CoreMessage {
34 inner: async_nats::Message,
35 headers: Headers,
36}
37
38impl Debug for CoreMessage {
39 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
40 f.debug_struct("CoreMessage")
41 .field("subject", &self.inner.subject.as_str())
42 .field("payload_len", &self.inner.payload.len())
43 .finish_non_exhaustive()
44 }
45}
46
47impl CoreMessage {
48 pub(crate) fn new(inner: async_nats::Message) -> Self {
49 let mut headers = headers_from_nats(inner.headers.as_ref());
50 if let Some(reply) = inner.reply.as_ref() {
56 headers.insert("reply-to", reply.as_str().to_owned());
57 }
58 Self { inner, headers }
59 }
60}
61
62pub struct JetStreamMessage {
64 inner: async_nats::jetstream::Message,
65 headers: Headers,
66}
67
68impl Debug for JetStreamMessage {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 f.debug_struct("JetStreamMessage")
71 .field("subject", &self.inner.message.subject.as_str())
72 .field("payload_len", &self.inner.message.payload.len())
73 .finish_non_exhaustive()
74 }
75}
76
77impl JetStreamMessage {
78 pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
79 let headers = headers_from_nats(inner.message.headers.as_ref());
80 Self { inner, headers }
81 }
82}
83
84fn empty_headers() -> &'static Headers {
85 static EMPTY: OnceLock<Headers> = OnceLock::new();
86 EMPTY.get_or_init(Headers::new)
87}
88
89impl IncomingMessage for NatsMessage {
90 fn payload(&self) -> &[u8] {
91 match self {
92 Self::Core(m) => &m.inner.payload,
93 Self::JetStream(m) => &m.inner.message.payload,
94 }
95 }
96
97 fn headers(&self) -> &Headers {
98 match self {
99 Self::Core(m) => &m.headers,
100 Self::JetStream(m) => &m.headers,
101 }
102 }
103
104 async fn ack(self) -> Result<(), AckError> {
105 match self {
106 Self::Core(_) => Err(AckError::Unsupported),
107 Self::JetStream(m) => m
108 .inner
109 .ack()
110 .await
111 .map_err(|err| AckError::Broker(format_err(err))),
112 }
113 }
114
115 async fn nack(self, requeue: bool) -> Result<(), AckError> {
116 match self {
117 Self::Core(_) => Err(AckError::Unsupported),
118 Self::JetStream(m) => {
119 let kind = if requeue {
120 AckKind::Nak(None)
121 } else {
122 AckKind::Term
123 };
124 m.inner
125 .ack_with(kind)
126 .await
127 .map_err(|err| AckError::Broker(format_err(err)))
128 }
129 }
130 }
131}
132
133pub const PARTITION_KEY_HEADER: &str = "nats-partition-key";
139
140impl Partitioned for NatsMessage {
144 fn partition_key(&self) -> Option<&[u8]> {
145 self.headers().get(PARTITION_KEY_HEADER)
146 }
147}
148
149fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
150where
151 E: std::fmt::Display + Send + Sync + 'static,
152{
153 let msg = err.to_string();
154 Box::<dyn std::error::Error + Send + Sync>::from(msg)
155}
156
157#[allow(dead_code)]
158fn _empty_headers_keepalive() -> &'static Headers {
159 empty_headers()
160}
161
162#[cfg(test)]
163mod tests {
164 use super::*;
165
166 fn core_message(reply: Option<&str>) -> NatsMessage {
167 NatsMessage::Core(Box::new(CoreMessage::new(async_nats::Message {
168 subject: "subj".into(),
169 reply: reply.map(Into::into),
170 payload: bytes::Bytes::from_static(b"x"),
171 headers: None,
172 status: None,
173 description: None,
174 length: 1,
175 })))
176 }
177
178 #[test]
179 fn core_reply_inbox_surfaces_as_reply_to_header() {
180 let msg = core_message(Some("_INBOX.42"));
181 assert_eq!(msg.headers().reply_to(), Some("_INBOX.42"));
182 }
183
184 #[test]
185 fn core_message_without_reply_has_no_reply_to() {
186 assert_eq!(core_message(None).headers().reply_to(), None);
187 }
188}