mq_bridge/
canonical_message.rs1use bytes::Bytes;
7use serde::de::DeserializeOwned;
8use serde::{Deserialize, Serialize};
9use std::collections::HashMap;
10use uuid::Uuid;
11
12use crate::type_handler::KIND_KEY;
13
14#[derive(Debug, Serialize, Deserialize, Clone)]
15pub struct CanonicalMessage {
16 pub message_id: u128,
17 pub payload: Bytes,
18 #[serde(default, skip_serializing_if = "HashMap::is_empty")]
19 pub metadata: HashMap<String, String>,
20}
21
22impl CanonicalMessage {
23 pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
24 Self {
25 message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
26 payload: Bytes::from(payload),
27 metadata: HashMap::new(),
28 }
29 }
30
31 pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
32 Self {
33 message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
34 payload,
35 metadata: HashMap::new(),
36 }
37 }
38
39 pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
40 let bytes = serde_json::to_vec(data)?;
41 Ok(Self::new(bytes, None))
42 }
43
44 pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
45 Self::new(payload.into(), None)
46 }
47
48 pub fn set_id(&mut self, id: u128) {
49 self.message_id = id;
50 }
51
52 pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
53 let mut message_id = None;
54 if let Some(val) = payload
55 .get("message_id")
56 .or(payload.get("id"))
57 .or(payload.get("_id"))
58 {
59 if let Some(s) = val.as_str() {
60 if let Ok(uuid) = Uuid::parse_str(s) {
61 message_id = Some(uuid.as_u128());
62 } else if let Ok(n) = u128::from_str_radix(s.trim_start_matches("0x"), 16) {
63 message_id = Some(n);
64 } else if let Ok(n) = s.parse::<u128>() {
65 message_id = Some(n);
66 }
67 } else if let Some(n) = val.as_i64() {
68 message_id = Some(n as u128);
69 } else if let Some(n) = val.as_u64() {
70 message_id = Some(n as u128);
71 } else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
72 if let Ok(n) = u128::from_str_radix(oid, 16) {
73 message_id = Some(n);
74 }
75 }
76 }
77 let bytes = serde_json::to_vec(&payload)?;
78 Ok(Self::new(bytes, message_id))
79 }
80
81 pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
82 serde_json::from_slice(&self.payload)
83 }
84
85 pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
87 String::from_utf8_lossy(&self.payload)
88 }
89
90 pub fn set_payload_str(&mut self, payload: impl Into<String>) {
92 self.payload = Bytes::from(payload.into());
93 }
94
95 pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
96 self.metadata = metadata;
97 self
98 }
99
100 pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
101 self.metadata.insert(key.into(), value.into());
102 self
103 }
104
105 pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
106 self.metadata.insert(KIND_KEY.into(), kind.into());
107 self
108 }
109
110 pub fn with_raw_format(mut self) -> Self {
111 self.metadata
112 .insert("mq_bridge.original_format".to_string(), "raw".to_string());
113 self
114 }
115}
116
117impl From<&str> for CanonicalMessage {
118 fn from(s: &str) -> Self {
119 Self::new(s.as_bytes().into(), None)
120 }
121}
122
123impl From<String> for CanonicalMessage {
124 fn from(s: String) -> Self {
125 Self::new(s.into_bytes(), None)
126 }
127}
128
129impl From<Vec<u8>> for CanonicalMessage {
130 fn from(v: Vec<u8>) -> Self {
131 Self::new(v, None)
132 }
133}
134
135impl From<serde_json::Value> for CanonicalMessage {
136 fn from(v: serde_json::Value) -> Self {
137 Self::from_json(v).expect("Failed to serialize JSON value")
138 }
139}
140
141#[derive(Debug, Clone)]
144pub struct MessageContext {
145 pub message_id: u128,
146 pub metadata: HashMap<String, String>,
147}
148
149impl From<CanonicalMessage> for MessageContext {
150 fn from(msg: CanonicalMessage) -> Self {
151 Self {
152 message_id: msg.message_id,
153 metadata: msg.metadata,
154 }
155 }
156}
157
158#[doc(hidden)]
159pub mod tracing_support {
160 use super::CanonicalMessage;
161
162 pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);
165
166 impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
168 let ids: Vec<String> = self
169 .0
170 .iter()
171 .map(|m| format!("{:032x}", m.message_id))
172 .collect();
173 f.debug_list().entries(ids).finish()
174 }
175 }
176}
177
178#[doc(hidden)]
179pub mod macro_support {
180 use super::CanonicalMessage;
181 use serde::Serialize;
182
183 pub trait Fallback {
184 fn convert(&self) -> CanonicalMessage;
185 }
186
187 impl<T: Serialize> Fallback for Wrap<T> {
188 fn convert(&self) -> CanonicalMessage {
189 CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
190 }
191 }
192
193 pub struct Wrap<T>(pub T);
194
195 impl<T> Wrap<T>
196 where
197 T: Into<CanonicalMessage> + Clone,
198 {
199 pub fn convert(&self) -> CanonicalMessage {
200 self.0.clone().into()
201 }
202 }
203}
204
205#[macro_export]
220macro_rules! msg {
221 ($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
222 {
223 #[allow(unused_imports)]
224 use $crate::canonical_message::macro_support::{Wrap, Fallback};
225 #[allow(unused_mut)]
226 let mut message = Wrap($payload).convert();
227 $(
228 message = message.with_metadata_kv($key, $val);
229 )*
230 message
231 }
232 };
233 ($payload:expr, $kind:expr $(,)?) => {
234 {
235 #[allow(unused_imports)]
236 use $crate::canonical_message::macro_support::{Wrap, Fallback};
237 let mut message = Wrap($payload).convert();
238 message = message.with_type_key($kind);
239 message
240 }
241 };
242}