mq-bridge 0.2.14

An asynchronous message bridging library connecting Kafka, MQTT, AMQP, NATS, MongoDB, HTTP, and more.
Documentation
//  mq-bridge
//  © Copyright 2025, by Marco Mengelkoch
//  Licensed under MIT License, see License file for more details
//  git clone https://github.com/marcomq/mq-bridge

use bytes::Bytes;
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;

use crate::type_handler::KIND_KEY;

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct CanonicalMessage {
    #[serde(serialize_with = "print_uuidv7", deserialize_with = "deserialize_u128")]
    pub message_id: u128,
    pub payload: Bytes,
    #[serde(default, skip_serializing_if = "HashMap::is_empty")]
    pub metadata: HashMap<String, String>,
}

pub fn print_uuidv7<S>(value: &u128, serializer: S) -> Result<S::Ok, S::Error>
where
    S: serde::Serializer,
{
    serializer.serialize_str(fast_uuid_v7::format_uuid(*value).as_ref())
}

/// Custom deserializer for u128 that handles UUID strings, hex, and numeric formats.
pub fn deserialize_u128<'de, D>(deserializer: D) -> Result<u128, D::Error>
where
    D: serde::Deserializer<'de>,
{
    let val = serde_json::Value::deserialize(deserializer)?;
    u128_from_json(&val).map_err(serde::de::Error::custom)
}

pub(crate) fn u128_from_json(val: &serde_json::Value) -> Result<u128, String> {
    if let Some(s) = val.as_str() {
        if let Ok(uuid) = Uuid::parse_str(s) {
            return Ok(uuid.as_u128());
        } else if s.starts_with("0x") || s.starts_with("0X") {
            if let Ok(n) =
                u128::from_str_radix(s.trim_start_matches("0x").trim_start_matches("0X"), 16)
            {
                return Ok(n);
            }
        } else if let Ok(n) = s.parse::<u128>() {
            return Ok(n);
        }
    } else if let Some(n) = val.as_u64() {
        return Ok(n as u128);
    } else if let Some(n) = val.as_i64() {
        if n < 0 {
            return Err("message_id cannot be negative".to_string());
        }
        return Ok(n as u128);
    } else if val.is_number() {
        // Fallback for large numeric literals that don't fit in u64/i64
        if let Ok(n) = serde_json::from_value::<u128>(val.clone()) {
            return Ok(n);
        }
    } else if let Some(oid) = val.get("$oid").and_then(|v| v.as_str()) {
        if let Ok(n) = u128::from_str_radix(oid, 16) {
            return Ok(n);
        }
    }
    Err("Invalid u128 format".to_string())
}

impl CanonicalMessage {
    pub fn new(payload: Vec<u8>, message_id: Option<u128>) -> Self {
        Self {
            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
            payload: Bytes::from(payload),
            metadata: HashMap::new(),
        }
    }

    pub fn new_bytes(payload: Bytes, message_id: Option<u128>) -> Self {
        Self {
            message_id: message_id.unwrap_or_else(fast_uuid_v7::gen_id),
            payload,
            metadata: HashMap::new(),
        }
    }

    pub fn from_type<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
        let bytes = serde_json::to_vec(data)?;
        Ok(Self::new(bytes, None))
    }

    pub fn from_vec(payload: impl Into<Vec<u8>>) -> Self {
        Self::new(payload.into(), None)
    }

    pub fn set_id(&mut self, id: u128) {
        self.message_id = id;
    }

    pub fn from_json(payload: serde_json::Value) -> Result<Self, serde_json::Error> {
        #[derive(Deserialize)]
        struct IdExtractor {
            #[serde(deserialize_with = "deserialize_u128")]
            id: u128,
        }

        let mut message_id = None;
        for key in ["message_id", "id", "_id"] {
            if let Some(v) = payload.get(key) {
                // Use from_value with a helper struct to leverage deserialize_u128
                // and produce a proper serde_json::Error on failure.
                let mut map = serde_json::Map::new();
                map.insert("id".to_string(), v.clone());
                let extractor: IdExtractor =
                    serde_json::from_value(serde_json::Value::Object(map))?;
                message_id = Some(extractor.id);
                break;
            }
        }

        let bytes = serde_json::to_vec(&payload)?;
        Ok(Self::new(bytes, message_id))
    }

    pub fn parse<T: DeserializeOwned>(&self) -> Result<T, serde_json::Error> {
        serde_json::from_slice(&self.payload)
    }

    /// Returns the payload as a UTF-8 lossy string.
    pub fn get_payload_str(&self) -> std::borrow::Cow<'_, str> {
        String::from_utf8_lossy(&self.payload)
    }

    /// Sets the payload of this message to the given string.
    pub fn set_payload_str(&mut self, payload: impl Into<String>) {
        self.payload = Bytes::from(payload.into());
    }

    pub fn with_metadata(mut self, metadata: HashMap<String, String>) -> Self {
        self.metadata = metadata;
        self
    }

    pub fn with_metadata_kv(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.metadata.insert(key.into(), value.into());
        self
    }

    pub fn with_type_key(mut self, kind: impl Into<String>) -> Self {
        self.metadata.insert(KIND_KEY.into(), kind.into());
        self
    }

    pub fn with_raw_format(mut self) -> Self {
        self.metadata
            .insert("mq_bridge.original_format".to_string(), "raw".to_string());
        self
    }
}

impl From<&str> for CanonicalMessage {
    fn from(s: &str) -> Self {
        Self::new(s.as_bytes().into(), None)
    }
}

impl From<String> for CanonicalMessage {
    fn from(s: String) -> Self {
        Self::new(s.into_bytes(), None)
    }
}

impl From<Vec<u8>> for CanonicalMessage {
    fn from(v: Vec<u8>) -> Self {
        Self::new(v, None)
    }
}

impl From<serde_json::Value> for CanonicalMessage {
    fn from(v: serde_json::Value) -> Self {
        Self::from_json(v).expect("Failed to serialize JSON value")
    }
}

/// A context object that holds metadata and identification for a message,
/// separated from the payload. Useful for typed handlers.
#[derive(Debug, Clone)]
pub struct MessageContext {
    pub message_id: u128,
    pub metadata: HashMap<String, String>,
}

impl From<CanonicalMessage> for MessageContext {
    fn from(msg: CanonicalMessage) -> Self {
        Self {
            message_id: msg.message_id,
            metadata: msg.metadata,
        }
    }
}

#[doc(hidden)]
pub mod tracing_support {
    use super::CanonicalMessage;

    /// A helper struct to lazily format a slice of message IDs for tracing.
    /// The collection and formatting only occurs if the trace is enabled.
    pub struct LazyMessageIds<'a>(pub &'a [CanonicalMessage]);

    impl<'a> std::fmt::Debug for LazyMessageIds<'a> {
        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            let ids: Vec<String> = self
                .0
                .iter()
                .map(|m| format!("{:032x}", m.message_id))
                .collect();
            f.debug_list().entries(ids).finish()
        }
    }
}

#[doc(hidden)]
pub mod macro_support {
    use super::CanonicalMessage;
    use serde::Serialize;

    pub trait Fallback {
        fn convert(&self) -> CanonicalMessage;
    }

    impl<T: Serialize> Fallback for Wrap<T> {
        fn convert(&self) -> CanonicalMessage {
            CanonicalMessage::from_type(&self.0).expect("Serialization failed in msg! macro")
        }
    }

    pub struct Wrap<T>(pub T);

    impl<T> Wrap<T>
    where
        T: Into<CanonicalMessage> + Clone,
    {
        pub fn convert(&self) -> CanonicalMessage {
            self.0.clone().into()
        }
    }
}

/// A macro to create a `CanonicalMessage` easily.
///
/// Examples:
/// ```rust
/// use mq_bridge::msg;
///
/// let m1 = msg!("hello");
/// let m2 = msg!("hello", "greeting");
/// let m3 = msg!("hello", "kind" => "greeting");
///
/// #[derive(serde::Serialize, Clone)]
/// struct MyData { val: i32 }
/// let m4 = msg!(MyData { val: 42 }, "my_type");
/// ```
#[macro_export]
macro_rules! msg {
    ($payload:expr $(, $key:expr => $val:expr)* $(,)?) => {
        {
            #[allow(unused_imports)]
            use $crate::canonical_message::macro_support::{Wrap, Fallback};
            #[allow(unused_mut)]
            let mut message = Wrap($payload).convert();
            $(
                message = message.with_metadata_kv($key, $val);
            )*
            message
        }
    };
    ($payload:expr, $kind:expr $(,)?) => {
        {
            #[allow(unused_imports)]
            use $crate::canonical_message::macro_support::{Wrap, Fallback};
            let mut message = Wrap($payload).convert();
            message = message.with_type_key($kind);
            message
        }
    };
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn test_message_id_parsing() {
        // String UUID
        let uuid = "550e8400-e29b-41d4-a716-446655440000";
        let msg = CanonicalMessage::from_json(json!({ "id": uuid })).unwrap();
        assert_eq!(msg.message_id, 113059749145936325402354257176981405696);

        // Hex string
        let msg = CanonicalMessage::from_json(json!({ "id": "0xFF" })).unwrap();
        assert_eq!(msg.message_id, 255);

        // Numeric
        let msg = CanonicalMessage::from_json(json!({ "id": 100 })).unwrap();
        assert_eq!(msg.message_id, 100);

        // Negative numeric
        let msg_err = CanonicalMessage::from_json(json!({ "id": -1 }));
        assert!(msg_err.is_err());

        // Mongo OID
        let oid = "507f1f77bcf86cd799439011";
        let msg = CanonicalMessage::from_json(json!({ "_id": { "$oid": oid } })).unwrap();
        let expected = u128::from_str_radix(oid, 16).unwrap();
        assert_eq!(msg.message_id, expected);
    }

    #[test]
    fn test_metadata_builder() {
        let msg = CanonicalMessage::new(b"payload".to_vec(), None)
            .with_metadata_kv("key1", "val1")
            .with_type_key("my_type");

        assert_eq!(msg.metadata.get("key1").map(|s| s.as_str()), Some("val1"));
        assert_eq!(
            msg.metadata.get("kind").map(|s| s.as_str()),
            Some("my_type")
        );
    }
}