asteroid_mq_model/
message.rs

1use std::{str::Utf8Error, sync::Arc};
2
3use crate::{
4    durable::MessageDurableConfig, interest::Subject, topic::TopicCode, util::MaybeBase64Bytes,
5};
6use bytes::Bytes;
7use chrono::{DateTime, Utc};
8use serde::{de::DeserializeOwned, Deserialize, Serialize};
9use typeshare::typeshare;
10
11use super::endpoint::EndpointAddr;
12
13#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
14#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
15#[typeshare]
16#[repr(u8)]
17pub enum MessageStatusKind {
18    Sending = 0xfe,
19    Unsent = 0xff,
20    Sent = 0x00,
21    Received = 0x01,
22    Processed = 0x02,
23    Failed = 0x80,
24    Unreachable = 0x81,
25}
26
27impl std::fmt::Display for MessageStatusKind {
28    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
29        match self {
30            MessageStatusKind::Sending => write!(f, "Sending"),
31            MessageStatusKind::Unsent => write!(f, "Unsent"),
32            MessageStatusKind::Sent => write!(f, "Sent"),
33            MessageStatusKind::Received => write!(f, "Received"),
34            MessageStatusKind::Processed => write!(f, "Processed"),
35            MessageStatusKind::Failed => write!(f, "Failed"),
36            MessageStatusKind::Unreachable => write!(f, "Unreachable"),
37        }
38    }
39}
40
41#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
42#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
43#[typeshare]
44pub enum MessageAckExpectKind {
45    #[default]
46    Sent = 0x00,
47    Received = 0x01,
48    Processed = 0x02,
49}
50
51impl From<MessageAckExpectKind> for MessageStatusKind {
52    fn from(kind: MessageAckExpectKind) -> MessageStatusKind {
53        match kind {
54            MessageAckExpectKind::Sent => MessageStatusKind::Sent,
55            MessageAckExpectKind::Received => MessageStatusKind::Received,
56            MessageAckExpectKind::Processed => MessageStatusKind::Processed,
57        }
58    }
59}
60impl MessageAckExpectKind {
61    pub fn try_from_u8(v: u8) -> Option<Self> {
62        match v {
63            0x00 => Some(MessageAckExpectKind::Sent),
64            0x01 => Some(MessageAckExpectKind::Received),
65            0x02 => Some(MessageAckExpectKind::Processed),
66            _ => None,
67        }
68    }
69}
70
71impl std::fmt::Display for MessageAckExpectKind {
72    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
73        match self {
74            MessageAckExpectKind::Sent => write!(f, "Sent"),
75            MessageAckExpectKind::Received => write!(f, "Received"),
76            MessageAckExpectKind::Processed => write!(f, "Processed"),
77        }
78    }
79}
80
81impl MessageStatusKind {
82    pub fn try_from_u8(v: u8) -> Option<Self> {
83        match v {
84            0xfe => Some(MessageStatusKind::Sending),
85            0xff => Some(MessageStatusKind::Unsent),
86            0x00 => Some(MessageStatusKind::Sent),
87            0x01 => Some(MessageStatusKind::Received),
88            0x02 => Some(MessageStatusKind::Processed),
89            0x80 => Some(MessageStatusKind::Failed),
90            0x81 => Some(MessageStatusKind::Unreachable),
91            _ => None,
92        }
93    }
94    #[inline(always)]
95    pub fn is_unsent(&self) -> bool {
96        *self == MessageStatusKind::Unsent
97    }
98    #[inline(always)]
99    pub fn is_failed_or_unreachable(&self) -> bool {
100        *self == MessageStatusKind::Failed || *self == MessageStatusKind::Unreachable
101    }
102    pub fn is_fulfilled(&self, condition: MessageAckExpectKind) -> bool {
103        match condition {
104            MessageAckExpectKind::Sent => {
105                *self == MessageStatusKind::Sent
106                    || *self == MessageStatusKind::Received
107                    || *self == MessageStatusKind::Processed
108                    || *self == MessageStatusKind::Failed
109            }
110            MessageAckExpectKind::Received => {
111                *self == MessageStatusKind::Received
112                    || *self == MessageStatusKind::Processed
113                    || *self == MessageStatusKind::Failed
114            }
115            MessageAckExpectKind::Processed => *self == MessageStatusKind::Processed,
116        }
117    }
118    /// witch means this state is a acceptable state
119    #[inline(always)]
120    pub fn is_resolved(&self, condition: MessageAckExpectKind) -> bool {
121        self.is_failed_or_unreachable() || self.is_fulfilled(condition)
122    }
123}
124#[derive(Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
125#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
126#[typeshare(serialized_as = "String")]
127#[repr(transparent)]
128pub struct MessageId {
129    pub bytes: [u8; 16],
130}
131
132impl MessageId {
133    pub fn to_base64(&self) -> String {
134        use base64::Engine;
135        base64::engine::general_purpose::STANDARD.encode(self.bytes)
136    }
137    pub fn from_base64(s: &str) -> Result<Self, base64::DecodeError> {
138        use base64::Engine;
139        let bytes = base64::engine::general_purpose::STANDARD.decode(s.as_bytes())?;
140        if bytes.len() != 16 {
141            return Err(base64::DecodeError::InvalidLength(bytes.len()));
142        }
143        let mut addr = [0; 16];
144        addr.copy_from_slice(&bytes);
145        Ok(Self { bytes: addr })
146    }
147    pub fn to_u128(&self) -> u128 {
148        u128::from_be_bytes(self.bytes)
149    }
150    pub fn from_u128(v: u128) -> Self {
151        Self {
152            bytes: v.to_be_bytes(),
153        }
154    }
155}
156
157impl Serialize for MessageId {
158    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
159        if serializer.is_human_readable() {
160            serializer.serialize_str(&self.to_base64())
161        } else {
162            <[u8; 16]>::serialize(&self.bytes, serializer)
163        }
164    }
165}
166
167impl<'de> Deserialize<'de> for MessageId {
168    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
169        if deserializer.is_human_readable() {
170            use serde::de::Error;
171            let s = <&'de str>::deserialize(deserializer)?;
172            Self::from_base64(s).map_err(D::Error::custom)
173        } else {
174            Ok(Self {
175                bytes: <[u8; 16]>::deserialize(deserializer)?,
176            })
177        }
178    }
179}
180
181impl std::fmt::Debug for MessageId {
182    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183        f.debug_tuple("MessageId")
184            .field(&crate::util::dashed(&[
185                crate::util::hex(&self.bytes[0..4]),
186                crate::util::hex(&self.bytes[4..12]),
187                crate::util::hex(&self.bytes[12..16]),
188            ]))
189            .finish()
190    }
191}
192
193impl std::fmt::Display for MessageId {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        write!(
196            f,
197            "{}-{}-{}",
198            crate::util::hex(&self.bytes[0..4]),
199            crate::util::hex(&self.bytes[4..12]),
200            crate::util::hex(&self.bytes[12..16])
201        )
202    }
203}
204
205impl MessageId {
206    pub fn new_snowflake() -> Self {
207        thread_local! {
208            static COUNTER: std::cell::Cell<u32> = const { std::cell::Cell::new(0) };
209        }
210        let timestamp = crate::util::timestamp_sec();
211        let counter = COUNTER.with(|c| {
212            let v = c.get();
213            c.set(v.wrapping_add(1));
214            v
215        });
216        let eid = crate::util::executor_digest() as u32;
217        let mut bytes = [0; 16];
218        bytes[0..4].copy_from_slice(&eid.to_be_bytes());
219        bytes[4..12].copy_from_slice(&timestamp.to_be_bytes());
220        bytes[12..16].copy_from_slice(&counter.to_be_bytes());
221        Self { bytes }
222    }
223}
224
225#[derive(Clone, Serialize, Deserialize)]
226#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
227#[typeshare]
228pub struct Message {
229    pub header: MessageHeader,
230    pub payload: MaybeBase64Bytes,
231}
232
233impl std::fmt::Debug for Message {
234    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
235        const MAX_DEBUG_PAYLOAD_SIZE: usize = 256;
236        let size = self.payload.0.len();
237        let mut debug = f.debug_struct("Message");
238        debug.field("header", &self.header).field("size", &size);
239        if size < MAX_DEBUG_PAYLOAD_SIZE {
240            debug.field("payload", &self.payload);
241            debug.finish()
242        } else {
243            debug.finish_non_exhaustive()
244        }
245    }
246}
247
248impl Message {
249    pub fn new(header: MessageHeader, payload: impl Into<Bytes>) -> Self {
250        Self {
251            header,
252            payload: MaybeBase64Bytes::new(payload.into()),
253        }
254    }
255    pub fn id(&self) -> MessageId {
256        self.header.message_id
257    }
258    pub fn ack_kind(&self) -> MessageAckExpectKind {
259        self.header.ack_kind
260    }
261    pub fn subjects(&self) -> &[Subject] {
262        &self.header.subjects
263    }
264    pub fn json<T: DeserializeOwned>(&self) -> serde_json::Result<T> {
265        serde_json::from_slice(&self.payload.0)
266    }
267    pub fn text(&self) -> Result<&str, Utf8Error> {
268        std::str::from_utf8(&self.payload.0)
269    }
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
273#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
274#[typeshare]
275pub struct MessageHeader {
276    pub message_id: MessageId,
277    pub ack_kind: MessageAckExpectKind,
278    pub target_kind: MessageTargetKind,
279    pub durability: Option<MessageDurableConfig>,
280    pub subjects: Arc<[Subject]>,
281}
282
283impl MessageHeader {
284    #[inline(always)]
285    pub(crate) fn ack(
286        &self,
287        topic_code: TopicCode,
288        from: EndpointAddr,
289        kind: MessageStatusKind,
290    ) -> MessageAck {
291        MessageAck {
292            ack_to: self.message_id,
293            kind,
294            from,
295            topic_code,
296        }
297    }
298    #[inline(always)]
299    pub fn ack_received(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
300        self.ack(topic_code, from, MessageStatusKind::Received)
301    }
302    #[inline(always)]
303    pub fn ack_processed(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
304        self.ack(topic_code, from, MessageStatusKind::Processed)
305    }
306    #[inline(always)]
307    pub fn ack_failed(&self, topic_code: TopicCode, from: EndpointAddr) -> MessageAck {
308        self.ack(topic_code, from, MessageStatusKind::Failed)
309    }
310    pub fn is_durable(&self) -> bool {
311        self.target_kind == MessageTargetKind::Durable
312    }
313}
314
315pub struct MessageHeaderBuilder {
316    pub ack_kind: MessageAckExpectKind,
317    target_kind: MessageTargetKind,
318    durability: Option<MessageDurableConfig>,
319    pub subjects: Vec<Subject>,
320}
321
322impl MessageHeader {
323    pub fn builder<S: Into<Subject>>(
324        subjects: impl IntoIterator<Item = S>,
325    ) -> MessageHeaderBuilder {
326        MessageHeaderBuilder::new(subjects)
327    }
328}
329
330impl MessageHeaderBuilder {
331    #[inline(always)]
332    pub fn new<S: Into<Subject>>(subjects: impl IntoIterator<Item = S>) -> Self {
333        Self {
334            ack_kind: MessageAckExpectKind::default(),
335            target_kind: MessageTargetKind::default(),
336            durability: None,
337            subjects: subjects.into_iter().map(Into::into).collect(),
338        }
339    }
340    #[inline(always)]
341    pub fn ack_kind(mut self, ack_kind: MessageAckExpectKind) -> Self {
342        self.ack_kind = ack_kind;
343        self
344    }
345    pub fn mode_online(mut self) -> Self {
346        self.target_kind = MessageTargetKind::Online;
347        self
348    }
349    pub fn mode_durable(mut self, config: MessageDurableConfig) -> Self {
350        self.target_kind = MessageTargetKind::Durable;
351        self.durability = Some(config);
352        self
353    }
354    pub fn mode_pull(mut self, expire_at: DateTime<Utc>) -> Self {
355        self.target_kind = MessageTargetKind::Durable;
356        self.durability = Some(MessageDurableConfig::new_pull(expire_at));
357        self
358    }
359    pub fn mode_push(mut self) -> Self {
360        self.target_kind = MessageTargetKind::Push;
361        self
362    }
363    pub fn build(self) -> MessageHeader {
364        MessageHeader {
365            message_id: MessageId::new_snowflake(),
366            ack_kind: self.ack_kind,
367            target_kind: self.target_kind,
368            durability: self.durability,
369            subjects: self.subjects.into(),
370        }
371    }
372}
373
374#[derive(Debug, Clone, Serialize, Deserialize)]
375
376pub struct MessageAck {
377    pub ack_to: MessageId,
378    pub topic_code: TopicCode,
379    pub from: EndpointAddr,
380    pub kind: MessageStatusKind,
381}
382
383#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Default, Serialize, Deserialize)]
384#[cfg_attr(feature = "bincode", derive(bincode::Decode, bincode::Encode))]
385#[repr(u8)]
386#[typeshare]
387pub enum MessageTargetKind {
388    Durable = 0,
389    Online = 1,
390    // #[allow(deprecated)]
391    // #[deprecated(note = "not supported yet")]
392    // Available = 2,
393    #[default]
394    Push = 3,
395}
396
397impl From<u8> for MessageTargetKind {
398    fn from(kind: u8) -> MessageTargetKind {
399        match kind {
400            0 => MessageTargetKind::Durable,
401            1 => MessageTargetKind::Online,
402            // #[allow(deprecated)]
403            // 2 => MessageTargetKind::Available,
404            _ => MessageTargetKind::Push,
405        }
406    }
407}
408
409impl MessageTargetKind {
410    pub fn is_online(&self) -> bool {
411        *self == MessageTargetKind::Online
412    }
413    pub fn is_push(&self) -> bool {
414        *self == MessageTargetKind::Push
415    }
416    pub fn is_durable(&self) -> bool {
417        *self == MessageTargetKind::Durable
418    }
419}