use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
use crate::type_handler::KIND_KEY;
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CanonicalMessage {
#[serde(serialize_with = "print_uuidv7", deserialize_with = "deserialize_u128")]
pub message_id: u128,
pub payload: Bytes,
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub metadata: HashMap<String, String>,
}
pub fn print_uuidv7<S>(value: &u128, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(fast_uuid_v7::format_uuid(*value).as_ref())
}
pub fn deserialize_u128<'de, D>(deserializer: D) -> Result<u128, D::Error>
where
D: serde::Deserializer<'de>,
{
let val = serde_json::Value::deserialize(deserializer)?;
u128_from_json(&val).map_err(serde::de::Error::custom)
}
pub(crate) fn u128_from_json(val: &serde_json::Value) -> Result<u128, String> {
if let Some(s) = val.as_str() {
if let Ok(uuid) = Uuid::parse_str(s) {
return Ok(uuid.as_u128());
} else if s.starts_with("0x") || s.starts_with("0X") {
if let Ok(n) =
u128::from_str_radix(s.trim_start_matches("0x").trim_start_matches("0X"), 16)
{
return Ok(n);
}
} else if let Ok(n) = s.parse::<u128>() {
return Ok(n);
}
} else if let Some(n) = val.as_u64() {
return Ok(n as u128);
} else if let Some(n) = val.as_i64() {
if n < 0 {
return Err("message_id cannot be negative".to_string());
}
return Ok(n as u128);
} else if val.is_number() {
if let Ok(n) = serde_json::from_value::<u128>(val.clone()) {
return Ok(n);
}
} else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
if let Ok(n) = u128::from_str_radix(oid, 16) {
return Ok(n);
}
}
Err("Invalid u128 format".to_string())
}
impl CanonicalMessage {
pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
Self {
message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
payload: Bytes::from(payload),
metadata: HashMap::new(),
}
}
pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
Self {
message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
payload,
metadata: HashMap::new(),
}
}
pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
let bytes = serde_json::to_vec(data)?;
Ok(Self::new(bytes, None))
}
pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
Self::new(payload.into(), None)
}
pub fn set_id(&mut self, id: u128) {
self.message_id = id;
}
pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
#[derive(Deserialize)]
struct IdExtractor {
#[serde(deserialize_with = "deserialize_u128")]
id: u128,
}
let mut message_id = None;
for key in ["message_id", "id", "_id"] {
if let Some(v) = payload.get(key) {
let mut map = serde_json::Map::new();
map.insert("id".to_string(), v.clone());
let extractor: IdExtractor =
serde_json::from_value(serde_json::Value::Object(map))?;
message_id = Some(extractor.id);
break;
}
}
let bytes = serde_json::to_vec(&payload)?;
Ok(Self::new(bytes, message_id))
}
pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
serde_json::from_slice(&self.payload)
}
pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
String::from_utf8_lossy(&self.payload)
}
pub fn set_payload_str(&mut self, payload: impl Into<String>) {
self.payload = Bytes::from(payload.into());
}
pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
self.metadata = metadata;
self
}
pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.metadata.insert(key.into(), value.into());
self
}
pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
self.metadata.insert(KIND_KEY.into(), kind.into());
self
}
pub fn with_raw_format(mut self) -> Self {
self.metadata
.insert("mq_bridge.original_format".to_string(), "raw".to_string());
self
}
}
impl From<&str> for CanonicalMessage {
fn from(s: &str) -> Self {
Self::new(s.as_bytes().into(), None)
}
}
impl From<String> for CanonicalMessage {
fn from(s: String) -> Self {
Self::new(s.into_bytes(), None)
}
}
impl From<Vec<u8>> for CanonicalMessage {
fn from(v: Vec<u8>) -> Self {
Self::new(v, None)
}
}
impl From<serde_json::Value> for CanonicalMessage {
fn from(v: serde_json::Value) -> Self {
Self::from_json(v).expect("Failed to serialize JSON value")
}
}
#[derive(Debug, Clone)]
pub struct MessageContext {
pub message_id: u128,
pub metadata: HashMap<String, String>,
}
impl From<CanonicalMessage> for MessageContext {
fn from(msg: CanonicalMessage) -> Self {
Self {
message_id: msg.message_id,
metadata: msg.metadata,
}
}
}
#[doc(hidden)]
pub mod tracing_support {
use super::CanonicalMessage;
pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);
impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ids: Vec<String> = self
.0
.iter()
.map(|m| format!("{:032x}", m.message_id))
.collect();
f.debug_list().entries(ids).finish()
}
}
}
#[doc(hidden)]
pub mod macro_support {
use super::CanonicalMessage;
use serde::Serialize;
pub trait Fallback {
fn convert(&self) -> CanonicalMessage;
}
impl<T: Serialize> Fallback for Wrap<T> {
fn convert(&self) -> CanonicalMessage {
CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
}
}
pub struct Wrap<T>(pub T);
impl<T> Wrap<T>
where
T: Into<CanonicalMessage> + Clone,
{
pub fn convert(&self) -> CanonicalMessage {
self.0.clone().into()
}
}
}
#[macro_export]
macro_rules! msg {
($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
{
#[allow(unused_imports)]
use $crate::canonical_message::macro_support::{Wrap, Fallback};
#[allow(unused_mut)]
let mut message = Wrap($payload).convert();
$(
message = message.with_metadata_kv($key, $val);
)*
message
}
};
($payload:expr, $kind:expr $(,)?) => {
{
#[allow(unused_imports)]
use $crate::canonical_message::macro_support::{Wrap, Fallback};
let mut message = Wrap($payload).convert();
message = message.with_type_key($kind);
message
}
};
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_message_id_parsing() {
let uuid = "550e8400-e29b-41d4-a716-446655440000";
let msg = CanonicalMessage::from_json(json!({ "id": uuid })).unwrap();
assert_eq!(msg.message_id, 113059749145936325402354257176981405696);
let msg = CanonicalMessage::from_json(json!({ "id": "0xFF" })).unwrap();
assert_eq!(msg.message_id, 255);
let msg = CanonicalMessage::from_json(json!({ "id": 100 })).unwrap();
assert_eq!(msg.message_id, 100);
let msg_err = CanonicalMessage::from_json(json!({ "id": -1 }));
assert!(msg_err.is_err());
let oid = "507f1f77bcf86cd799439011";
let msg = CanonicalMessage::from_json(json!({ "_id": { "$oid": oid } })).unwrap();
let expected = u128::from_str_radix(oid, 16).unwrap();
assert_eq!(msg.message_id, expected);
}
#[test]
fn test_metadata_builder() {
let msg = CanonicalMessage::new(b"payload".to_vec(), None)
.with_metadata_kv("key1", "val1")
.with_type_key("my_type");
assert_eq!(msg.metadata.get("key1").map(|s| s.as_str()), Some("val1"));
assert_eq!(
msg.metadata.get("kind").map(|s| s.as_str()),
Some("my_type")
);
}
}