ruststream_nats/
message.rs1use std::sync::OnceLock;
4
5use ruststream::{AckError, Headers, IncomingMessage};
6
7use crate::convert::headers_from_nats;
8
9pub enum NatsMessage {
13 Core(Box<CoreMessage>),
15 JetStream(Box<JetStreamMessage>),
17}
18
19impl std::fmt::Debug for NatsMessage {
20 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
21 match self {
22 Self::Core(_) => f.debug_struct("NatsMessage::Core").finish_non_exhaustive(),
23 Self::JetStream(_) => f
24 .debug_struct("NatsMessage::JetStream")
25 .finish_non_exhaustive(),
26 }
27 }
28}
29
30pub struct CoreMessage {
32 inner: async_nats::Message,
33 headers: Headers,
34}
35
36impl std::fmt::Debug for CoreMessage {
37 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 f.debug_struct("CoreMessage")
39 .field("subject", &self.inner.subject.as_str())
40 .field("payload_len", &self.inner.payload.len())
41 .finish_non_exhaustive()
42 }
43}
44
45impl CoreMessage {
46 pub(crate) fn new(inner: async_nats::Message) -> Self {
47 let headers = headers_from_nats(inner.headers.as_ref());
48 Self { inner, headers }
49 }
50}
51
52pub struct JetStreamMessage {
54 inner: async_nats::jetstream::Message,
55 headers: Headers,
56}
57
58impl std::fmt::Debug for JetStreamMessage {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 f.debug_struct("JetStreamMessage")
61 .field("subject", &self.inner.message.subject.as_str())
62 .field("payload_len", &self.inner.message.payload.len())
63 .finish_non_exhaustive()
64 }
65}
66
67impl JetStreamMessage {
68 pub(crate) fn new(inner: async_nats::jetstream::Message) -> Self {
69 let headers = headers_from_nats(inner.message.headers.as_ref());
70 Self { inner, headers }
71 }
72}
73
74fn empty_headers() -> &'static Headers {
75 static EMPTY: OnceLock<Headers> = OnceLock::new();
76 EMPTY.get_or_init(Headers::new)
77}
78
79impl IncomingMessage for NatsMessage {
80 fn payload(&self) -> &[u8] {
81 match self {
82 Self::Core(m) => &m.inner.payload,
83 Self::JetStream(m) => &m.inner.message.payload,
84 }
85 }
86
87 fn headers(&self) -> &Headers {
88 match self {
89 Self::Core(m) => &m.headers,
90 Self::JetStream(m) => &m.headers,
91 }
92 }
93
94 async fn ack(self) -> Result<(), AckError> {
95 match self {
96 Self::Core(_) => Err(AckError::Unsupported),
97 Self::JetStream(m) => m
98 .inner
99 .ack()
100 .await
101 .map_err(|err| AckError::Broker(format_err(err))),
102 }
103 }
104
105 async fn nack(self, requeue: bool) -> Result<(), AckError> {
106 match self {
107 Self::Core(_) => Err(AckError::Unsupported),
108 Self::JetStream(m) => {
109 use async_nats::jetstream::AckKind;
110 let kind = if requeue {
111 AckKind::Nak(None)
112 } else {
113 AckKind::Term
114 };
115 m.inner
116 .ack_with(kind)
117 .await
118 .map_err(|err| AckError::Broker(format_err(err)))
119 }
120 }
121 }
122}
123
124fn format_err<E>(err: E) -> Box<dyn std::error::Error + Send + Sync>
125where
126 E: std::fmt::Display + Send + Sync + 'static,
127{
128 let msg = err.to_string();
129 Box::<dyn std::error::Error + Send + Sync>::from(msg)
130}
131
132#[allow(dead_code)]
133fn _empty_headers_keepalive() -> &'static Headers {
134 empty_headers()
135}