use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;
pub type SharedMessage = Arc<Message>;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub meta: MessageMeta,
pub payload: serde_json::Value,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMeta {
pub id: Uuid,
pub timestamp: i64,
pub source_node: String,
pub correlation_id: Option<Uuid>,
pub chain_depth: u8,
#[serde(default)]
pub tags: HashMap<String, String>,
}
impl Message {
pub fn new(source_node: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
meta: MessageMeta::new(source_node),
payload,
}
}
pub fn derive(&self, source_node: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
meta: MessageMeta {
id: Uuid::now_v7(),
timestamp: chrono::Utc::now().timestamp_millis(),
source_node: source_node.into(),
correlation_id: Some(self.meta.id),
chain_depth: self.meta.chain_depth.saturating_add(1),
tags: HashMap::new(),
},
payload,
}
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.meta = self.meta.with_tag(key, value);
self
}
}
impl MessageMeta {
pub fn new(source_node: impl Into<String>) -> Self {
Self {
id: Uuid::now_v7(),
timestamp: chrono::Utc::now().timestamp_millis(),
source_node: source_node.into(),
correlation_id: None,
chain_depth: 0,
tags: HashMap::new(),
}
}
pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.tags.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum NodeType {
Source,
Transform,
Sink,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_message_new() {
let msg = Message::new("test_source", serde_json::json!({"key": "value"}));
assert_eq!(msg.meta.source_node, "test_source");
assert_eq!(msg.meta.chain_depth, 0);
assert!(msg.meta.correlation_id.is_none());
assert_eq!(msg.payload["key"], "value");
}
#[test]
fn test_message_derive() {
let mut original = Message::new("src1", serde_json::json!({"data": 1}));
original.meta.tags.insert("key".into(), "val".into());
let original_id = original.meta.id;
let derived = original.derive("src2", serde_json::json!({"data": 2}));
assert_eq!(derived.meta.source_node, "src2");
assert_eq!(derived.meta.chain_depth, 1);
assert_eq!(derived.meta.correlation_id, Some(original_id));
assert_eq!(derived.payload["data"], 2);
assert!(
derived.meta.tags.is_empty(),
"Tags should not be inherited by default in derive()"
);
let derived2 = derived.derive("src3", serde_json::json!({}));
assert_eq!(derived2.meta.chain_depth, 2);
assert_eq!(derived2.meta.correlation_id, Some(derived.meta.id));
}
#[test]
fn test_message_meta_with_tag() {
let meta = MessageMeta::new("source")
.with_tag("env", "prod")
.with_tag("region", "us-east");
assert_eq!(meta.tags.get("env"), Some(&"prod".to_string()));
assert_eq!(meta.tags.get("region"), Some(&"us-east".to_string()));
}
#[test]
fn test_node_type_serde() {
let json = serde_json::to_string(&NodeType::Source).unwrap();
assert_eq!(json, "\"source\"");
let nt: NodeType = serde_json::from_str("\"transform\"").unwrap();
assert_eq!(nt, NodeType::Transform);
}
#[test]
fn test_message_serialization() {
let msg = Message::new("test", serde_json::json!({"x": 123}));
let json = serde_json::to_string(&msg).unwrap();
let deserialized: Message = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized.meta.source_node, "test");
assert_eq!(deserialized.payload["x"], 123);
}
#[test]
fn test_message_deserialization_defaults() {
let json = r#"{
"meta": {
"id": "018d9ba3-8c43-7f28-8000-000000000000",
"timestamp": 1709300000000,
"source_node": "test_src",
"chain_depth": 0
},
"payload": null
}"#;
let msg: Message =
serde_json::from_str(json).expect("Should deserialize with default tags");
assert!(msg.meta.tags.is_empty());
assert!(msg.meta.correlation_id.is_none());
}
}