Skip to main content

mq_bridge/
canonical_message.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use bytes::Bytes;
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12use crate::type_handler::KIND_KEY;
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub struct CanonicalMessage {
16    pub message_id: u128,
17    pub payload: Bytes,
18    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
19    pub metadata: HashMap<String, String>,
20}
21
22impl CanonicalMessage {
23    pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
24        Self {
25            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
26            payload: Bytes::from(payload),
27            metadata: HashMap::new(),
28        }
29    }
30
31    pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
32        Self {
33            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
34            payload,
35            metadata: HashMap::new(),
36        }
37    }
38
39    pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
40        let bytes = serde_json::to_vec(data)?;
41        Ok(Self::new(bytes, None))
42    }
43
44    pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
45        Self::new(payload.into(), None)
46    }
47
48    pub fn set_id(&mut self, id: u128) {
49        self.message_id = id;
50    }
51
52    pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
53        let mut message_id = None;
54        if let Some(val) = payload
55            .get("message_id")
56            .or(payload.get("id"))
57            .or(payload.get("_id"))
58        {
59            if let Some(s) = val.as_str() {
60                if let Ok(uuid) = Uuid::parse_str(s) {
61                    message_id = Some(uuid.as_u128());
62                } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) {
63                    message_id = Some(n);
64                } else if let Ok(n) = s.parse::<u128>() {
65                    message_id = Some(n);
66                }
67            } else if let Some(n) = val.as_i64() {
68                message_id = Some(n as u128);
69            } else if let Some(n) = val.as_u64() {
70                message_id = Some(n as u128);
71            } else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
72                if let Ok(n) = u128::from_str_radix(oid, 16) {
73                    message_id = Some(n);
74                }
75            }
76        }
77        let bytes = serde_json::to_vec(&payload)?;
78        Ok(Self::new(bytes, message_id))
79    }
80
81    pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
82        serde_json::from_slice(&self.payload)
83    }
84
85    /// Returns the payload as a UTF-8 lossy string.
86    pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
87        String::from_utf8_lossy(&self.payload)
88    }
89
90    /// Sets the payload of this message to the given string.
91    pub fn set_payload_str(&mut self, payload: impl Into<String>) {
92        self.payload = Bytes::from(payload.into());
93    }
94
95    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
96        self.metadata = metadata;
97        self
98    }
99
100    pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
101        self.metadata.insert(key.into(), value.into());
102        self
103    }
104
105    pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
106        self.metadata.insert(KIND_KEY.into(), kind.into());
107        self
108    }
109
110    pub fn with_raw_format(mut self) -> Self {
111        self.metadata
112            .insert("mq_bridge.original_format".to_string(), "raw".to_string());
113        self
114    }
115}
116
117impl From<&str> for CanonicalMessage {
118    fn from(s: &str) -> Self {
119        Self::new(s.as_bytes().into(), None)
120    }
121}
122
123impl From<String> for CanonicalMessage {
124    fn from(s: String) -> Self {
125        Self::new(s.into_bytes(), None)
126    }
127}
128
129impl From<Vec<u8>> for CanonicalMessage {
130    fn from(v: Vec<u8>) -> Self {
131        Self::new(v, None)
132    }
133}
134
135impl From<serde_json::Value> for CanonicalMessage {
136    fn from(v: serde_json::Value) -> Self {
137        Self::from_json(v).expect("Failed to serialize JSON value")
138    }
139}
140
141/// A context object that holds metadata and identification for a message,
142/// separated from the payload. Useful for typed handlers.
143#[derive(Debug, Clone)]
144pub struct MessageContext {
145    pub message_id: u128,
146    pub metadata: HashMap<String, String>,
147}
148
149impl From<CanonicalMessage> for MessageContext {
150    fn from(msg: CanonicalMessage) -> Self {
151        Self {
152            message_id: msg.message_id,
153            metadata: msg.metadata,
154        }
155    }
156}
157
158#[doc(hidden)]
159pub mod tracing_support {
160    use super::CanonicalMessage;
161
162    /// A helper struct to lazily format a slice of message IDs for tracing.
163    /// The collection and formatting only occurs if the trace is enabled.
164    pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);
165
166    impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
167        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168            let ids: Vec<String> = self
169                .0
170                .iter()
171                .map(|m| format!("{:032x}", m.message_id))
172                .collect();
173            f.debug_list().entries(ids).finish()
174        }
175    }
176}
177
178#[doc(hidden)]
179pub mod macro_support {
180    use super::CanonicalMessage;
181    use serde::Serialize;
182
183    pub trait Fallback {
184        fn convert(&self) -> CanonicalMessage;
185    }
186
187    impl<T: Serialize> Fallback for Wrap<T> {
188        fn convert(&self) -> CanonicalMessage {
189            CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
190        }
191    }
192
193    pub struct Wrap<T>(pub T);
194
195    impl<T> Wrap<T>
196    where
197        T: Into<CanonicalMessage> + Clone,
198    {
199        pub fn convert(&self) -> CanonicalMessage {
200            self.0.clone().into()
201        }
202    }
203}
204
205/// A macro to create a `CanonicalMessage` easily.
206///
207/// Examples:
208/// ```rust
209/// use mq_bridge::msg;
210///
211/// let m1 = msg!("hello");
212/// let m2 = msg!("hello", "greeting");
213/// let m3 = msg!("hello", "kind" => "greeting");
214///
215/// #[derive(serde::Serialize, Clone)]
216/// struct MyData { val: i32 }
217/// let m4 = msg!(MyData { val: 42 }, "my_type");
218/// ```
219#[macro_export]
220macro_rules! msg {
221    ($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
222        {
223            #[allow(unused_imports)]
224            use $crate::canonical_message::macro_support::{Wrap, Fallback};
225            #[allow(unused_mut)]
226            let mut message = Wrap($payload).convert();
227            $(
228                message = message.with_metadata_kv($key, $val);
229            )*
230            message
231        }
232    };
233    ($payload:expr, $kind:expr $(,)?) => {
234        {
235            #[allow(unused_imports)]
236            use $crate::canonical_message::macro_support::{Wrap, Fallback};
237            let mut message = Wrap($payload).convert();
238            message = message.with_type_key($kind);
239            message
240        }
241    };
242}