pipeflow 0.0.4

A lightweight, configuration-driven data pipeline framework
Documentation
//! Common types shared across pipeline components
//!
//! This module contains data structures used by multiple modules (sources, sinks, engine)
//! to avoid circular dependencies.

use std::collections::HashMap;

use serde::{Deserialize, Serialize};

// ============================================================================
// Severity - unified severity level for events and notifications
// ============================================================================

/// Severity level for events and notifications
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
    /// Debug level
    Debug,
    /// Informational level
    #[default]
    Info,
    /// Warning level
    Warning,
    /// Error level
    Error,
    /// Critical level requiring immediate attention
    Critical,
}

// ============================================================================
// Event data structure
// ============================================================================

/// System event data
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
    /// Event name/type
    pub name: String,
    /// Event severity level
    #[serde(default)]
    pub severity: Severity,
    /// Event payload
    pub payload: serde_json::Value,
    /// Event labels/tags
    #[serde(default)]
    pub labels: HashMap<String, String>,
    /// Timestamp in milliseconds
    pub timestamp: i64,
}

impl Event {
    /// Create a new event with default Info severity
    pub fn new(name: impl Into<String>, payload: serde_json::Value) -> Self {
        Self {
            name: name.into(),
            severity: Severity::Info,
            payload,
            labels: HashMap::new(),
            timestamp: chrono::Utc::now().timestamp_millis(),
        }
    }

    /// Set the event severity
    pub fn with_severity(mut self, severity: Severity) -> Self {
        self.severity = severity;
        self
    }

    /// Add a label to the event
    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.labels.insert(key.into(), value.into());
        self
    }
}

// ============================================================================
// Audit data structure
// ============================================================================

/// Audit record status
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AuditStatus {
    /// Operation completed successfully
    Success,
    /// Operation failed
    Failure,
}

/// Audit record for tracking pipeline operations
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Audit {
    /// Node ID that performed the operation
    pub node_id: String,
    /// Node type ("source", "transform", "sink")
    pub node_type: String,
    /// Operation type ("emit", "process", "write")
    pub operation: String,
    /// Operation status
    pub status: AuditStatus,
    /// Associated message ID (if applicable)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub message_id: Option<String>,
    /// Operation duration in milliseconds
    pub duration_ms: u64,
    /// Error message (if status is Failure)
    #[serde(default, skip_serializing_if = "Option::is_none")]
    pub error: Option<String>,
    /// Additional labels/tags
    #[serde(default)]
    pub labels: HashMap<String, String>,
    /// Timestamp in milliseconds
    pub timestamp: i64,
}

impl Audit {
    /// Create a new audit record
    pub fn new(
        node_id: impl Into<String>,
        node_type: impl Into<String>,
        operation: impl Into<String>,
    ) -> Self {
        Self {
            node_id: node_id.into(),
            node_type: node_type.into(),
            operation: operation.into(),
            status: AuditStatus::Success,
            message_id: None,
            duration_ms: 0,
            error: None,
            labels: HashMap::new(),
            timestamp: chrono::Utc::now().timestamp_millis(),
        }
    }

    /// Mark as successful with duration
    pub fn success(mut self, duration_ms: u64) -> Self {
        self.status = AuditStatus::Success;
        self.duration_ms = duration_ms;
        self
    }

    /// Mark as failed with duration and error message
    pub fn failure(mut self, duration_ms: u64, error: impl Into<String>) -> Self {
        self.status = AuditStatus::Failure;
        self.duration_ms = duration_ms;
        self.error = Some(error.into());
        self
    }

    /// Set the associated message ID
    pub fn with_message_id(mut self, id: impl Into<String>) -> Self {
        self.message_id = Some(id.into());
        self
    }

    /// Add a label to the audit record
    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.labels.insert(key.into(), value.into());
        self
    }
}

// ============================================================================
// Notify data structure (used by sink::notify for message parsing)
// ============================================================================

/// Notification message structure for sink::notify
///
/// This structure is used by the notify sink to parse incoming messages
/// and extract notification details.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notify {
    /// Notification name/type
    pub name: String,
    /// Notification severity
    pub severity: Severity,
    /// Notification message
    pub message: String,
    /// Notification labels/tags
    #[serde(default)]
    pub labels: HashMap<String, String>,
    /// Timestamp in milliseconds
    pub timestamp: i64,
}

impl Notify {
    /// Create a new notification
    pub fn new(name: impl Into<String>, severity: Severity, message: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            severity,
            message: message.into(),
            labels: HashMap::new(),
            timestamp: chrono::Utc::now().timestamp_millis(),
        }
    }

    /// Add a label to the notification
    pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
        self.labels.insert(key.into(), value.into());
        self
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_severity_serde() {
        let json = serde_json::to_string(&Severity::Critical).unwrap();
        assert_eq!(json, "\"critical\"");

        let severity: Severity = serde_json::from_str("\"warning\"").unwrap();
        assert_eq!(severity, Severity::Warning);

        let debug: Severity = serde_json::from_str("\"debug\"").unwrap();
        assert_eq!(debug, Severity::Debug);
    }

    #[test]
    fn test_severity_default() {
        assert_eq!(Severity::default(), Severity::Info);
    }

    #[test]
    fn test_event_new() {
        let event = Event::new("test_event", serde_json::json!({"value": 42}));

        assert_eq!(event.name, "test_event");
        assert_eq!(event.severity, Severity::Info);
        assert_eq!(event.payload["value"], 42);
        assert!(event.labels.is_empty());
        assert!(event.timestamp > 0);
    }

    #[test]
    fn test_event_with_severity() {
        let event = Event::new("alert", serde_json::json!({})).with_severity(Severity::Critical);

        assert_eq!(event.severity, Severity::Critical);
    }

    #[test]
    fn test_event_with_label() {
        let event = Event::new("evt", serde_json::json!({}))
            .with_label("key1", "val1")
            .with_label("key2", "val2");

        assert_eq!(event.labels.get("key1"), Some(&"val1".to_string()));
        assert_eq!(event.labels.get("key2"), Some(&"val2".to_string()));
    }

    #[test]
    fn test_event_serialization() {
        let event = Event::new("test", serde_json::json!({"x": 1}))
            .with_severity(Severity::Warning)
            .with_label("env", "prod");

        let json = serde_json::to_string(&event).unwrap();
        let parsed: Event = serde_json::from_str(&json).unwrap();

        assert_eq!(parsed.name, "test");
        assert_eq!(parsed.severity, Severity::Warning);
        assert_eq!(parsed.labels.get("env"), Some(&"prod".to_string()));
    }

    #[test]
    fn test_audit_new() {
        let audit = Audit::new("my_sink", "sink", "write");

        assert_eq!(audit.node_id, "my_sink");
        assert_eq!(audit.node_type, "sink");
        assert_eq!(audit.operation, "write");
        assert_eq!(audit.status, AuditStatus::Success);
        assert!(audit.message_id.is_none());
        assert_eq!(audit.duration_ms, 0);
        assert!(audit.error.is_none());
        assert!(audit.timestamp > 0);
    }

    #[test]
    fn test_audit_success() {
        let audit = Audit::new("transform_1", "transform", "process").success(150);

        assert_eq!(audit.status, AuditStatus::Success);
        assert_eq!(audit.duration_ms, 150);
        assert!(audit.error.is_none());
    }

    #[test]
    fn test_audit_failure() {
        let audit = Audit::new("sink_db", "sink", "write").failure(50, "Connection timeout");

        assert_eq!(audit.status, AuditStatus::Failure);
        assert_eq!(audit.duration_ms, 50);
        assert_eq!(audit.error, Some("Connection timeout".to_string()));
    }

    #[test]
    fn test_audit_with_message_id() {
        let audit = Audit::new("source_1", "source", "emit")
            .with_message_id("msg-123")
            .success(10);

        assert_eq!(audit.message_id, Some("msg-123".to_string()));
    }

    #[test]
    fn test_audit_serialization() {
        let audit = Audit::new("test_sink", "sink", "write")
            .with_message_id("msg-456")
            .with_label("batch", "1")
            .success(100);

        let json = serde_json::to_string(&audit).unwrap();
        let parsed: Audit = serde_json::from_str(&json).unwrap();

        assert_eq!(parsed.node_id, "test_sink");
        assert_eq!(parsed.status, AuditStatus::Success);
        assert_eq!(parsed.message_id, Some("msg-456".to_string()));
        assert_eq!(parsed.labels.get("batch"), Some(&"1".to_string()));
    }

    #[test]
    fn test_audit_status_serde() {
        let json = serde_json::to_string(&AuditStatus::Success).unwrap();
        assert_eq!(json, "\"success\"");

        let status: AuditStatus = serde_json::from_str("\"failure\"").unwrap();
        assert_eq!(status, AuditStatus::Failure);
    }

    #[test]
    fn test_notify_new() {
        let notify = Notify::new("alert", Severity::Warning, "Test message");

        assert_eq!(notify.name, "alert");
        assert_eq!(notify.severity, Severity::Warning);
        assert_eq!(notify.message, "Test message");
        assert!(notify.labels.is_empty());
        assert!(notify.timestamp > 0);
    }

    #[test]
    fn test_notify_with_label() {
        let notify = Notify::new("alert", Severity::Error, "Error occurred")
            .with_label("service", "api")
            .with_label("env", "prod");

        assert_eq!(notify.labels.get("service"), Some(&"api".to_string()));
        assert_eq!(notify.labels.get("env"), Some(&"prod".to_string()));
    }

    #[test]
    fn test_notify_serialization() {
        let notify = Notify::new("test_notify", Severity::Critical, "Critical alert")
            .with_label("region", "us-east");

        let json = serde_json::to_string(&notify).unwrap();
        let parsed: Notify = serde_json::from_str(&json).unwrap();

        assert_eq!(parsed.name, "test_notify");
        assert_eq!(parsed.severity, Severity::Critical);
        assert_eq!(parsed.message, "Critical alert");
        assert_eq!(parsed.labels.get("region"), Some(&"us-east".to_string()));
    }
}