mq_bridge/
canonical_message.rs1use bytes::Bytes;
7use serde::de::{DeserializeOwned, Error};
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12use crate::type_handler::KIND_KEY;
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub struct CanonicalMessage {
16 pub message_id: u128,
17 pub payload: Bytes,
18 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
19 pub metadata: HashMap<String, String>,
20}
21
22impl CanonicalMessage {
23 pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
24 Self {
25 message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
26 payload: Bytes::from(payload),
27 metadata: HashMap::new(),
28 }
29 }
30
31 pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
32 Self {
33 message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
34 payload,
35 metadata: HashMap::new(),
36 }
37 }
38
39 pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
40 let bytes = serde_json::to_vec(data)?;
41 Ok(Self::new(bytes, None))
42 }
43
44 pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
45 Self::new(payload.into(), None)
46 }
47
48 pub fn set_id(&mut self, id: u128) {
49 self.message_id = id;
50 }
51
52 pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
53 let mut message_id = None;
54 if let Some(val) = payload
55 .get("message_id")
56 .or(payload.get("id"))
57 .or(payload.get("_id"))
58 {
59 if let Some(s) = val.as_str() {
60 if let Ok(uuid) = Uuid::parse_str(s) {
61 message_id = Some(uuid.as_u128());
62 } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) {
63 message_id = Some(n);
64 } else if let Ok(n) = s.parse::<u128>() {
65 message_id = Some(n);
66 }
67 } else if let Some(n) = val.as_i64() {
68 if n < 0 {
69 return Err(Error::custom("message_id cannot be negative"));
70 }
71 message_id = Some(n as u128);
72 } else if let Some(n) = val.as_u64() {
73 message_id = Some(n as u128);
74 } else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
75 if let Ok(n) = u128::from_str_radix(oid, 16) {
76 message_id = Some(n);
77 }
78 }
79 }
80 let bytes = serde_json::to_vec(&payload)?;
81 Ok(Self::new(bytes, message_id))
82 }
83
84 pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
85 serde_json::from_slice(&self.payload)
86 }
87
88 pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
90 String::from_utf8_lossy(&self.payload)
91 }
92
93 pub fn set_payload_str(&mut self, payload: impl Into<String>) {
95 self.payload = Bytes::from(payload.into());
96 }
97
98 pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
99 self.metadata = metadata;
100 self
101 }
102
103 pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
104 self.metadata.insert(key.into(), value.into());
105 self
106 }
107
108 pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
109 self.metadata.insert(KIND_KEY.into(), kind.into());
110 self
111 }
112
113 pub fn with_raw_format(mut self) -> Self {
114 self.metadata
115 .insert("mq_bridge.original_format".to_string(), "raw".to_string());
116 self
117 }
118}
119
120impl From<&str> for CanonicalMessage {
121 fn from(s: &str) -> Self {
122 Self::new(s.as_bytes().into(), None)
123 }
124}
125
126impl From<String> for CanonicalMessage {
127 fn from(s: String) -> Self {
128 Self::new(s.into_bytes(), None)
129 }
130}
131
132impl From<Vec<u8>> for CanonicalMessage {
133 fn from(v: Vec<u8>) -> Self {
134 Self::new(v, None)
135 }
136}
137
138impl From<serde_json::Value> for CanonicalMessage {
139 fn from(v: serde_json::Value) -> Self {
140 Self::from_json(v).expect("Failed to serialize JSON value")
141 }
142}
143
144#[derive(Debug, Clone)]
147pub struct MessageContext {
148 pub message_id: u128,
149 pub metadata: HashMap<String, String>,
150}
151
152impl From<CanonicalMessage> for MessageContext {
153 fn from(msg: CanonicalMessage) -> Self {
154 Self {
155 message_id: msg.message_id,
156 metadata: msg.metadata,
157 }
158 }
159}
160
161#[doc(hidden)]
162pub mod tracing_support {
163 use super::CanonicalMessage;
164
165 pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);
168
169 impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 let ids: Vec<String> = self
172 .0
173 .iter()
174 .map(|m| format!("{:032x}", m.message_id))
175 .collect();
176 f.debug_list().entries(ids).finish()
177 }
178 }
179}
180
181#[doc(hidden)]
182pub mod macro_support {
183 use super::CanonicalMessage;
184 use serde::Serialize;
185
186 pub trait Fallback {
187 fn convert(&self) -> CanonicalMessage;
188 }
189
190 impl<T: Serialize> Fallback for Wrap<T> {
191 fn convert(&self) -> CanonicalMessage {
192 CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
193 }
194 }
195
196 pub struct Wrap<T>(pub T);
197
198 impl<T> Wrap<T>
199 where
200 T: Into<CanonicalMessage> + Clone,
201 {
202 pub fn convert(&self) -> CanonicalMessage {
203 self.0.clone().into()
204 }
205 }
206}
207
208#[macro_export]
223macro_rules! msg {
224 ($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
225 {
226 #[allow(unused_imports)]
227 use $crate::canonical_message::macro_support::{Wrap, Fallback};
228 #[allow(unused_mut)]
229 let mut message = Wrap($payload).convert();
230 $(
231 message = message.with_metadata_kv($key, $val);
232 )*
233 message
234 }
235 };
236 ($payload:expr, $kind:expr $(,)?) => {
237 {
238 #[allow(unused_imports)]
239 use $crate::canonical_message::macro_support::{Wrap, Fallback};
240 let mut message = Wrap($payload).convert();
241 message = message.with_type_key($kind);
242 message
243 }
244 };
245}
246
247#[cfg(test)]
248mod tests {
249 use super::*;
250 use serde_json::json;
251
252 #[test]
253 fn test_message_id_parsing() {
254 let uuid = "550e8400-e29b-41d4-a716-446655440000";
256 let msg = CanonicalMessage::from_json(json!({ "id": uuid })).unwrap();
257 assert_eq!(msg.message_id, 113059749145936325402354257176981405696);
258
259 let msg = CanonicalMessage::from_json(json!({ "id": "0xFF" })).unwrap();
261 assert_eq!(msg.message_id, 255);
262
263 let msg = CanonicalMessage::from_json(json!({ "id": 100 })).unwrap();
265 assert_eq!(msg.message_id, 100);
266
267 let msg_err = CanonicalMessage::from_json(json!({ "id": -1 }));
269 assert!(msg_err.is_err());
270
271 let oid = "507f1f77bcf86cd799439011";
273 let msg = CanonicalMessage::from_json(json!({ "_id": { "$oid": oid } })).unwrap();
274 let expected = u128::from_str_radix(oid, 16).unwrap();
275 assert_eq!(msg.message_id, expected);
276 }
277
278 #[test]
279 fn test_metadata_builder() {
280 let msg = CanonicalMessage::new(b"payload".to_vec(), None)
281 .with_metadata_kv("key1", "val1")
282 .with_type_key("my_type");
283
284 assert_eq!(msg.metadata.get("key1").map(|s| s.as_str()), Some("val1"));
285 assert_eq!(
286 msg.metadata.get("kind").map(|s| s.as_str()),
287 Some("my_type")
288 );
289 }
290}