pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Message types for the pipeline

use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::sync::Arc;
use uuid::Uuid;

/// Shared message wrapper for efficient passing through broadcast channels.
/// Using Arc avoids cloning the entire Message (which may contain large payloads)
/// when broadcasting to multiple subscribers.
pub type SharedMessage = Arc<Message>;

/// Message flowing through the pipeline
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
    /// Message metadata
    pub meta: MessageMeta,
    /// Message payload as JSON
    pub payload: serde_json::Value,
}

/// Metadata attached to every message
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMeta {
    /// Unique message identifier (UUIDv7 for time-ordering)
    pub id: Uuid,
    /// Unix timestamp in milliseconds when message was created
    pub timestamp: i64,
    /// Source node ID that produced this message
    pub source_node: String,
    /// Optional correlation ID for tracing message chains
    pub correlation_id: Option<Uuid>,
    /// Chain depth to prevent infinite loops in system routing
    pub chain_depth: u8,
    /// Custom key-value tags
    #[serde(default)]
    pub tags: HashMap<String, String>,
}

impl Message {
    /// Create a new message with the given source node and payload
    pub fn new(source_node: impl Into<String>, payload: serde_json::Value) -> Self {
        Self {
            meta: MessageMeta::new(source_node),
            payload,
        }
    }

    /// Create a derived message with correlation to this message
    pub fn derive(&self, source_node: impl Into<String>, payload: serde_json::Value) -> Self {
        Self {
            meta: MessageMeta {
                id: Uuid::now_v7(),
                timestamp: chrono::Utc::now().timestamp_millis(),
                source_node: source_node.into(),
                correlation_id: Some(self.meta.id),
                chain_depth: self.meta.chain_depth.saturating_add(1),
                tags: HashMap::new(),
            },
            payload,
        }
    }

    /// Add a tag to the message metadata (builder pattern)
    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.meta = self.meta.with_tag(key, value);
        self
    }
}

impl MessageMeta {
    /// Create new metadata for a source node
    pub fn new(source_node: impl Into<String>) -> Self {
        Self {
            id: Uuid::now_v7(),
            timestamp: chrono::Utc::now().timestamp_millis(),
            source_node: source_node.into(),
            correlation_id: None,
            chain_depth: 0,
            tags: HashMap::new(),
        }
    }

    /// Add a tag to the metadata
    pub fn with_tag(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.tags.insert(key.into(), value.into());
        self
    }
}

/// Node type for error reporting
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum NodeType {
    /// Source node
    Source,
    /// Transform node
    Transform,
    /// Sink node
    Sink,
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_message_new() {
        let msg = Message::new("test_source", serde_json::json!({"key": "value"}));

        assert_eq!(msg.meta.source_node, "test_source");
        assert_eq!(msg.meta.chain_depth, 0);
        assert!(msg.meta.correlation_id.is_none());
        assert_eq!(msg.payload["key"], "value");
    }

    #[test]
    fn test_message_derive() {
        let mut original = Message::new("src1", serde_json::json!({"data": 1}));
        // Add a tag to ensure it is NOT inherited by default
        original.meta.tags.insert("key".into(), "val".into());
        let original_id = original.meta.id;

        // First derivation
        let derived = original.derive("src2", serde_json::json!({"data": 2}));

        assert_eq!(derived.meta.source_node, "src2");
        assert_eq!(derived.meta.chain_depth, 1);
        assert_eq!(derived.meta.correlation_id, Some(original_id));
        assert_eq!(derived.payload["data"], 2);
        assert!(
            derived.meta.tags.is_empty(),
            "Tags should not be inherited by default in derive()"
        );

        // Second derivation (verifying chain depth increments)
        let derived2 = derived.derive("src3", serde_json::json!({}));
        assert_eq!(derived2.meta.chain_depth, 2);
        assert_eq!(derived2.meta.correlation_id, Some(derived.meta.id));
    }

    #[test]
    fn test_message_meta_with_tag() {
        let meta = MessageMeta::new("source")
            .with_tag("env", "prod")
            .with_tag("region", "us-east");

        assert_eq!(meta.tags.get("env"), Some(&"prod".to_string()));
        assert_eq!(meta.tags.get("region"), Some(&"us-east".to_string()));
    }

    #[test]
    fn test_node_type_serde() {
        let json = serde_json::to_string(&NodeType::Source).unwrap();
        assert_eq!(json, "\"source\"");

        let nt: NodeType = serde_json::from_str("\"transform\"").unwrap();
        assert_eq!(nt, NodeType::Transform);
    }

    #[test]
    fn test_message_serialization() {
        let msg = Message::new("test", serde_json::json!({"x": 123}));
        let json = serde_json::to_string(&msg).unwrap();
        let deserialized: Message = serde_json::from_str(&json).unwrap();

        assert_eq!(deserialized.meta.source_node, "test");
        assert_eq!(deserialized.payload["x"], 123);
    }

    #[test]
    fn test_message_deserialization_defaults() {
        // Test that missing optional fields (tags) are handled correctly via #[serde(default)]
        let json = r#"{
            "meta": {
                "id": "018d9ba3-8c43-7f28-8000-000000000000",
                "timestamp": 1709300000000,
                "source_node": "test_src",
                "chain_depth": 0
            },
            "payload": null
        }"#;

        let msg: Message =
            serde_json::from_str(json).expect("Should deserialize with default tags");
        assert!(msg.meta.tags.is_empty());
        assert!(msg.meta.correlation_id.is_none());
    }
}