Skip to main content

mq_bridge/
canonical_message.rs

1//  mq-bridge
2//  © Copyright 2025, by Marco Mengelkoch
3//  Licensed under MIT License, see License file for more details
4//  git clone https://github.com/marcomq/mq-bridge
5
6use bytes::Bytes;
7use serde::de::{DeserializeOwned, Error};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12use crate::type_handler::KIND_KEY;
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub struct CanonicalMessage {
16    pub message_id: u128,
17    pub payload: Bytes,
18    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
19    pub metadata: HashMap<String, String>,
20}
21
22impl CanonicalMessage {
23    pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
24        Self {
25            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
26            payload: Bytes::from(payload),
27            metadata: HashMap::new(),
28        }
29    }
30
31    pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
32        Self {
33            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
34            payload,
35            metadata: HashMap::new(),
36        }
37    }
38
39    pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
40        let bytes = serde_json::to_vec(data)?;
41        Ok(Self::new(bytes, None))
42    }
43
44    pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
45        Self::new(payload.into(), None)
46    }
47
48    pub fn set_id(&mut self, id: u128) {
49        self.message_id = id;
50    }
51
52    pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
53        let mut message_id = None;
54        if let Some(val) = payload
55            .get("message_id")
56            .or(payload.get("id"))
57            .or(payload.get("_id"))
58        {
59            if let Some(s) = val.as_str() {
60                if let Ok(uuid) = Uuid::parse_str(s) {
61                    message_id = Some(uuid.as_u128());
62                } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) {
63                    message_id = Some(n);
64                } else if let Ok(n) = s.parse::<u128>() {
65                    message_id = Some(n);
66                }
67            } else if let Some(n) = val.as_i64() {
68                if n < 0 {
69                    return Err(Error::custom("message_id cannot be negative"));
70                }
71                message_id = Some(n as u128);
72            } else if let Some(n) = val.as_u64() {
73                message_id = Some(n as u128);
74            } else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
75                if let Ok(n) = u128::from_str_radix(oid, 16) {
76                    message_id = Some(n);
77                }
78            }
79        }
80        let bytes = serde_json::to_vec(&payload)?;
81        Ok(Self::new(bytes, message_id))
82    }
83
84    pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
85        serde_json::from_slice(&self.payload)
86    }
87
88    /// Returns the payload as a UTF-8 lossy string.
89    pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
90        String::from_utf8_lossy(&self.payload)
91    }
92
93    /// Sets the payload of this message to the given string.
94    pub fn set_payload_str(&mut self, payload: impl Into<String>) {
95        self.payload = Bytes::from(payload.into());
96    }
97
98    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
99        self.metadata = metadata;
100        self
101    }
102
103    pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104        self.metadata.insert(key.into(), value.into());
105        self
106    }
107
108    pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
109        self.metadata.insert(KIND_KEY.into(), kind.into());
110        self
111    }
112
113    pub fn with_raw_format(mut self) -> Self {
114        self.metadata
115            .insert("mq_bridge.original_format".to_string(), "raw".to_string());
116        self
117    }
118}
119
120impl From<&str> for CanonicalMessage {
121    fn from(s: &str) -> Self {
122        Self::new(s.as_bytes().into(), None)
123    }
124}
125
126impl From<String> for CanonicalMessage {
127    fn from(s: String) -> Self {
128        Self::new(s.into_bytes(), None)
129    }
130}
131
132impl From<Vec<u8>> for CanonicalMessage {
133    fn from(v: Vec<u8>) -> Self {
134        Self::new(v, None)
135    }
136}
137
138impl From<serde_json::Value> for CanonicalMessage {
139    fn from(v: serde_json::Value) -> Self {
140        Self::from_json(v).expect("Failed to serialize JSON value")
141    }
142}
143
144/// A context object that holds metadata and identification for a message,
145/// separated from the payload. Useful for typed handlers.
146#[derive(Debug, Clone)]
147pub struct MessageContext {
148    pub message_id: u128,
149    pub metadata: HashMap<String, String>,
150}
151
152impl From<CanonicalMessage> for MessageContext {
153    fn from(msg: CanonicalMessage) -> Self {
154        Self {
155            message_id: msg.message_id,
156            metadata: msg.metadata,
157        }
158    }
159}
160
161#[doc(hidden)]
162pub mod tracing_support {
163    use super::CanonicalMessage;
164
165    /// A helper struct to lazily format a slice of message IDs for tracing.
166    /// The collection and formatting only occurs if the trace is enabled.
167    pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);
168
169    impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
170        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171            let ids: Vec<String> = self
172                .0
173                .iter()
174                .map(|m| format!("{:032x}", m.message_id))
175                .collect();
176            f.debug_list().entries(ids).finish()
177        }
178    }
179}
180
181#[doc(hidden)]
182pub mod macro_support {
183    use super::CanonicalMessage;
184    use serde::Serialize;
185
186    pub trait Fallback {
187        fn convert(&self) -> CanonicalMessage;
188    }
189
190    impl<T: Serialize> Fallback for Wrap<T> {
191        fn convert(&self) -> CanonicalMessage {
192            CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
193        }
194    }
195
196    pub struct Wrap<T>(pub T);
197
198    impl<T> Wrap<T>
199    where
200        T: Into<CanonicalMessage> + Clone,
201    {
202        pub fn convert(&self) -> CanonicalMessage {
203            self.0.clone().into()
204        }
205    }
206}
207
208/// A macro to create a `CanonicalMessage` easily.
209///
210/// Examples:
211/// ```rust
212/// use mq_bridge::msg;
213///
214/// let m1 = msg!("hello");
215/// let m2 = msg!("hello", "greeting");
216/// let m3 = msg!("hello", "kind" => "greeting");
217///
218/// #[derive(serde::Serialize, Clone)]
219/// struct MyData { val: i32 }
220/// let m4 = msg!(MyData { val: 42 }, "my_type");
221/// ```
222#[macro_export]
223macro_rules! msg {
224    ($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
225        {
226            #[allow(unused_imports)]
227            use $crate::canonical_message::macro_support::{Wrap, Fallback};
228            #[allow(unused_mut)]
229            let mut message = Wrap($payload).convert();
230            $(
231                message = message.with_metadata_kv($key, $val);
232            )*
233            message
234        }
235    };
236    ($payload:expr, $kind:expr $(,)?) => {
237        {
238            #[allow(unused_imports)]
239            use $crate::canonical_message::macro_support::{Wrap, Fallback};
240            let mut message = Wrap($payload).convert();
241            message = message.with_type_key($kind);
242            message
243        }
244    };
245}
246
247#[cfg(test)]
248mod tests {
249    use super::*;
250    use serde_json::json;
251
252    #[test]
253    fn test_message_id_parsing() {
254        // String UUID
255        let uuid = "550e8400-e29b-41d4-a716-446655440000";
256        let msg = CanonicalMessage::from_json(json!({ "id": uuid })).unwrap();
257        assert_eq!(msg.message_id, 113059749145936325402354257176981405696);
258
259        // Hex string
260        let msg = CanonicalMessage::from_json(json!({ "id": "0xFF" })).unwrap();
261        assert_eq!(msg.message_id, 255);
262
263        // Numeric
264        let msg = CanonicalMessage::from_json(json!({ "id": 100 })).unwrap();
265        assert_eq!(msg.message_id, 100);
266
267        // Negative numeric
268        let msg_err = CanonicalMessage::from_json(json!({ "id": -1 }));
269        assert!(msg_err.is_err());
270
271        // Mongo OID
272        let oid = "507f1f77bcf86cd799439011";
273        let msg = CanonicalMessage::from_json(json!({ "_id": { "$oid": oid } })).unwrap();
274        let expected = u128::from_str_radix(oid, 16).unwrap();
275        assert_eq!(msg.message_id, expected);
276    }
277
278    #[test]
279    fn test_metadata_builder() {
280        let msg = CanonicalMessage::new(b"payload".to_vec(), None)
281            .with_metadata_kv("key1", "val1")
282            .with_type_key("my_type");
283
284        assert_eq!(msg.metadata.get("key1").map(|s| s.as_str()), Some("val1"));
285        assert_eq!(
286            msg.metadata.get("kind").map(|s| s.as_str()),
287            Some("my_type")
288        );
289    }
290}