1use std::{str::Utf8Error, sync::Arc};
2
3use crate::{
4 durable::MessageDurableConfig, interest::Subject, topic::TopicCode, util::MaybeBase64Bytes,
5};
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use typeshare::typeshare;
10
11use super::endpoint::EndpointAddr;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
15#[typeshare]
16#[repr(u8)]
17pub enum MessageStatusKind {
18 Sending = 0xfe,
19 Unsent = 0xff,
20 Sent = 0x00,
21 Received = 0x01,
22 Processed = 0x02,
23 Failed = 0x80,
24 Unreachable = 0x81,
25}
26
27impl std::fmt::Display for MessageStatusKind {
28 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29 match self {
30 MessageStatusKind::Sending => write!(f, "Sending"),
31 MessageStatusKind::Unsent => write!(f, "Unsent"),
32 MessageStatusKind::Sent => write!(f, "Sent"),
33 MessageStatusKind::Received => write!(f, "Received"),
34 MessageStatusKind::Processed => write!(f, "Processed"),
35 MessageStatusKind::Failed => write!(f, "Failed"),
36 MessageStatusKind::Unreachable => write!(f, "Unreachable"),
37 }
38 }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
42#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
43#[typeshare]
44pub enum MessageAckExpectKind {
45 #[default]
46 Sent = 0x00,
47 Received = 0x01,
48 Processed = 0x02,
49}
50
51impl From<MessageAckExpectKind> for MessageStatusKind {
52 fn from(kind: MessageAckExpectKind) -> MessageStatusKind {
53 match kind {
54 MessageAckExpectKind::Sent => MessageStatusKind::Sent,
55 MessageAckExpectKind::Received => MessageStatusKind::Received,
56 MessageAckExpectKind::Processed => MessageStatusKind::Processed,
57 }
58 }
59}
60impl MessageAckExpectKind {
61 pub fn try_from_u8(v: u8) -> Option<Self> {
62 match v {
63 0x00 => Some(MessageAckExpectKind::Sent),
64 0x01 => Some(MessageAckExpectKind::Received),
65 0x02 => Some(MessageAckExpectKind::Processed),
66 _ => None,
67 }
68 }
69}
70
71impl std::fmt::Display for MessageAckExpectKind {
72 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73 match self {
74 MessageAckExpectKind::Sent => write!(f, "Sent"),
75 MessageAckExpectKind::Received => write!(f, "Received"),
76 MessageAckExpectKind::Processed => write!(f, "Processed"),
77 }
78 }
79}
80
81impl MessageStatusKind {
82 pub fn try_from_u8(v: u8) -> Option<Self> {
83 match v {
84 0xfe => Some(MessageStatusKind::Sending),
85 0xff => Some(MessageStatusKind::Unsent),
86 0x00 => Some(MessageStatusKind::Sent),
87 0x01 => Some(MessageStatusKind::Received),
88 0x02 => Some(MessageStatusKind::Processed),
89 0x80 => Some(MessageStatusKind::Failed),
90 0x81 => Some(MessageStatusKind::Unreachable),
91 _ => None,
92 }
93 }
94 #[inline(always)]
95 pub fn is_unsent(&self) -> bool {
96 *self == MessageStatusKind::Unsent
97 }
98 #[inline(always)]
99 pub fn is_failed_or_unreachable(&self) -> bool {
100 *self == MessageStatusKind::Failed || *self == MessageStatusKind::Unreachable
101 }
102 pub fn is_fulfilled(&self, condition: MessageAckExpectKind) -> bool {
103 match condition {
104 MessageAckExpectKind::Sent => {
105 *self == MessageStatusKind::Sent
106 || *self == MessageStatusKind::Received
107 || *self == MessageStatusKind::Processed
108 || *self == MessageStatusKind::Failed
109 }
110 MessageAckExpectKind::Received => {
111 *self == MessageStatusKind::Received
112 || *self == MessageStatusKind::Processed
113 || *self == MessageStatusKind::Failed
114 }
115 MessageAckExpectKind::Processed => *self == MessageStatusKind::Processed,
116 }
117 }
118 #[inline(always)]
120 pub fn is_resolved(&self, condition: MessageAckExpectKind) -> bool {
121 self.is_failed_or_unreachable() || self.is_fulfilled(condition)
122 }
123}
124#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
125#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
126#[typeshare(serialized_as = "String")]
127#[repr(transparent)]
128pub struct MessageId {
129 pub bytes: [u8; 16],
130}
131
132impl MessageId {
133 pub fn to_base64(&self) -> String {
134 use base64::Engine;
135 base64::engine::general_purpose::STANDARD.encode(self.bytes)
136 }
137 pub fn from_base64(s: &str) -> Result<Self, base64::DecodeError> {
138 use base64::Engine;
139 let bytes = base64::engine::general_purpose::STANDARD.decode(s.as_bytes())?;
140 if bytes.len() != 16 {
141 return Err(base64::DecodeError::InvalidLength(bytes.len()));
142 }
143 let mut addr = [0; 16];
144 addr.copy_from_slice(&bytes);
145 Ok(Self { bytes: addr })
146 }
147 pub fn to_u128(&self) -> u128 {
148 u128::from_be_bytes(self.bytes)
149 }
150 pub fn from_u128(v: u128) -> Self {
151 Self {
152 bytes: v.to_be_bytes(),
153 }
154 }
155}
156
157impl Serialize for MessageId {
158 fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
159 if serializer.is_human_readable() {
160 serializer.serialize_str(&self.to_base64())
161 } else {
162 <[u8; 16]>::serialize(&self.bytes, serializer)
163 }
164 }
165}
166
167impl<'de> Deserialize<'de> for MessageId {
168 fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
169 if deserializer.is_human_readable() {
170 use serde::de::Error;
171 let s = <&'de str>::deserialize(deserializer)?;
172 Self::from_base64(s).map_err(D::Error::custom)
173 } else {
174 Ok(Self {
175 bytes: <[u8; 16]>::deserialize(deserializer)?,
176 })
177 }
178 }
179}
180
181impl std::fmt::Debug for MessageId {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_tuple("MessageId")
184 .field(&crate::util::dashed(&[
185 crate::util::hex(&self.bytes[0..4]),
186 crate::util::hex(&self.bytes[4..12]),
187 crate::util::hex(&self.bytes[12..16]),
188 ]))
189 .finish()
190 }
191}
192
193impl std::fmt::Display for MessageId {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 write!(
196 f,
197 "{}-{}-{}",
198 crate::util::hex(&self.bytes[0..4]),
199 crate::util::hex(&self.bytes[4..12]),
200 crate::util::hex(&self.bytes[12..16])
201 )
202 }
203}
204
205impl MessageId {
206 pub fn new_snowflake() -> Self {
207 thread_local! {
208 static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
209 }
210 let timestamp = crate::util::timestamp_sec();
211 let counter = COUNTER.with(|c| {
212 let v = c.get();
213 c.set(v.wrapping_add(1));
214 v
215 });
216 let eid = crate::util::executor_digest() as u32;
217 let mut bytes = [0; 16];
218 bytes[0..4].copy_from_slice(&eid.to_be_bytes());
219 bytes[4..12].copy_from_slice(×tamp.to_be_bytes());
220 bytes[12..16].copy_from_slice(&counter.to_be_bytes());
221 Self { bytes }
222 }
223}
224
225#[derive(Clone, Serialize, Deserialize)]
226#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
227#[typeshare]
228pub struct Message {
229 pub header: MessageHeader,
230 pub payload: MaybeBase64Bytes,
231}
232
233impl std::fmt::Debug for Message {
234 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235 const MAX_DEBUG_PAYLOAD_SIZE: usize = 256;
236 let size = self.payload.0.len();
237 let mut debug = f.debug_struct("Message");
238 debug.field("header", &self.header).field("size", &size);
239 if size < MAX_DEBUG_PAYLOAD_SIZE {
240 debug.field("payload", &self.payload);
241 debug.finish()
242 } else {
243 debug.finish_non_exhaustive()
244 }
245 }
246}
247
248impl Message {
249 pub fn new(header: MessageHeader, payload: impl Into<Bytes>) -> Self {
250 Self {
251 header,
252 payload: MaybeBase64Bytes::new(payload.into()),
253 }
254 }
255 pub fn id(&self) -> MessageId {
256 self.header.message_id
257 }
258 pub fn ack_kind(&self) -> MessageAckExpectKind {
259 self.header.ack_kind
260 }
261 pub fn subjects(&self) -> &[Subject] {
262 &self.header.subjects
263 }
264 pub fn json<T: DeserializeOwned>(&self) -> serde_json::Result<T> {
265 serde_json::from_slice(&self.payload.0)
266 }
267 pub fn text(&self) -> Result<&str, Utf8Error> {
268 std::str::from_utf8(&self.payload.0)
269 }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
274#[typeshare]
275pub struct MessageHeader {
276 pub message_id: MessageId,
277 pub ack_kind: MessageAckExpectKind,
278 pub target_kind: MessageTargetKind,
279 pub durability: Option<MessageDurableConfig>,
280 pub subjects: Arc<[Subject]>,
281}
282
283impl MessageHeader {
284 #[inline(always)]
285 pub(crate) fn ack(
286 &self,
287 topic_code: TopicCode,
288 from: EndpointAddr,
289 kind: MessageStatusKind,
290 ) -> MessageAck {
291 MessageAck {
292 ack_to: self.message_id,
293 kind,
294 from,
295 topic_code,
296 }
297 }
298 #[inline(always)]
299 pub fn ack_received(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
300 self.ack(topic_code, from, MessageStatusKind::Received)
301 }
302 #[inline(always)]
303 pub fn ack_processed(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
304 self.ack(topic_code, from, MessageStatusKind::Processed)
305 }
306 #[inline(always)]
307 pub fn ack_failed(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
308 self.ack(topic_code, from, MessageStatusKind::Failed)
309 }
310 pub fn is_durable(&self) -> bool {
311 self.target_kind == MessageTargetKind::Durable
312 }
313}
314
315pub struct MessageHeaderBuilder {
316 pub ack_kind: MessageAckExpectKind,
317 target_kind: MessageTargetKind,
318 durability: Option<MessageDurableConfig>,
319 pub subjects: Vec<Subject>,
320}
321
322impl MessageHeader {
323 pub fn builder<S: Into<Subject>>(
324 subjects: impl IntoIterator<Item = S>,
325 ) -> MessageHeaderBuilder {
326 MessageHeaderBuilder::new(subjects)
327 }
328}
329
330impl MessageHeaderBuilder {
331 #[inline(always)]
332 pub fn new<S: Into<Subject>>(subjects: impl IntoIterator<Item = S>) -> Self {
333 Self {
334 ack_kind: MessageAckExpectKind::default(),
335 target_kind: MessageTargetKind::default(),
336 durability: None,
337 subjects: subjects.into_iter().map(Into::into).collect(),
338 }
339 }
340 #[inline(always)]
341 pub fn ack_kind(mut self, ack_kind: MessageAckExpectKind) -> Self {
342 self.ack_kind = ack_kind;
343 self
344 }
345 pub fn mode_online(mut self) -> Self {
346 self.target_kind = MessageTargetKind::Online;
347 self
348 }
349 pub fn mode_durable(mut self, config: MessageDurableConfig) -> Self {
350 self.target_kind = MessageTargetKind::Durable;
351 self.durability = Some(config);
352 self
353 }
354 pub fn mode_pull(mut self, expire_at: DateTime<Utc>) -> Self {
355 self.target_kind = MessageTargetKind::Durable;
356 self.durability = Some(MessageDurableConfig::new_pull(expire_at));
357 self
358 }
359 pub fn mode_push(mut self) -> Self {
360 self.target_kind = MessageTargetKind::Push;
361 self
362 }
363 pub fn build(self) -> MessageHeader {
364 MessageHeader {
365 message_id: MessageId::new_snowflake(),
366 ack_kind: self.ack_kind,
367 target_kind: self.target_kind,
368 durability: self.durability,
369 subjects: self.subjects.into(),
370 }
371 }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
375
376pub struct MessageAck {
377 pub ack_to: MessageId,
378 pub topic_code: TopicCode,
379 pub from: EndpointAddr,
380 pub kind: MessageStatusKind,
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
384#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
385#[repr(u8)]
386#[typeshare]
387pub enum MessageTargetKind {
388 Durable = 0,
389 Online = 1,
390 #[default]
394 Push = 3,
395}
396
397impl From<u8> for MessageTargetKind {
398 fn from(kind: u8) -> MessageTargetKind {
399 match kind {
400 0 => MessageTargetKind::Durable,
401 1 => MessageTargetKind::Online,
402 _ => MessageTargetKind::Push,
405 }
406 }
407}
408
409impl MessageTargetKind {
410 pub fn is_online(&self) -> bool {
411 *self == MessageTargetKind::Online
412 }
413 pub fn is_push(&self) -> bool {
414 *self == MessageTargetKind::Push
415 }
416 pub fn is_durable(&self) -> bool {
417 *self == MessageTargetKind::Durable
418 }
419}