use std::collections::HashMap;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
Debug,
#[default]
Info,
Warning,
Error,
Critical,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Event {
pub name: String,
#[serde(default)]
pub severity: Severity,
pub payload: serde_json::Value,
#[serde(default)]
pub labels: HashMap<String, String>,
pub timestamp: i64,
}
impl Event {
pub fn new(name: impl Into<String>, payload: serde_json::Value) -> Self {
Self {
name: name.into(),
severity: Severity::Info,
payload,
labels: HashMap::new(),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn with_severity(mut self, severity: Severity) -> Self {
self.severity = severity;
self
}
pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.labels.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "lowercase")]
pub enum AuditStatus {
Success,
Failure,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Audit {
pub node_id: String,
pub node_type: String,
pub operation: String,
pub status: AuditStatus,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub message_id: Option<String>,
pub duration_ms: u64,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub error: Option<String>,
#[serde(default)]
pub labels: HashMap<String, String>,
pub timestamp: i64,
}
impl Audit {
pub fn new(
node_id: impl Into<String>,
node_type: impl Into<String>,
operation: impl Into<String>,
) -> Self {
Self {
node_id: node_id.into(),
node_type: node_type.into(),
operation: operation.into(),
status: AuditStatus::Success,
message_id: None,
duration_ms: 0,
error: None,
labels: HashMap::new(),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn success(mut self, duration_ms: u64) -> Self {
self.status = AuditStatus::Success;
self.duration_ms = duration_ms;
self
}
pub fn failure(mut self, duration_ms: u64, error: impl Into<String>) -> Self {
self.status = AuditStatus::Failure;
self.duration_ms = duration_ms;
self.error = Some(error.into());
self
}
pub fn with_message_id(mut self, id: impl Into<String>) -> Self {
self.message_id = Some(id.into());
self
}
pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.labels.insert(key.into(), value.into());
self
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Notify {
pub name: String,
pub severity: Severity,
pub message: String,
#[serde(default)]
pub labels: HashMap<String, String>,
pub timestamp: i64,
}
impl Notify {
pub fn new(name: impl Into<String>, severity: Severity, message: impl Into<String>) -> Self {
Self {
name: name.into(),
severity,
message: message.into(),
labels: HashMap::new(),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn with_label(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.labels.insert(key.into(), value.into());
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_severity_serde() {
let json = serde_json::to_string(&Severity::Critical).unwrap();
assert_eq!(json, "\"critical\"");
let severity: Severity = serde_json::from_str("\"warning\"").unwrap();
assert_eq!(severity, Severity::Warning);
let debug: Severity = serde_json::from_str("\"debug\"").unwrap();
assert_eq!(debug, Severity::Debug);
}
#[test]
fn test_severity_default() {
assert_eq!(Severity::default(), Severity::Info);
}
#[test]
fn test_event_new() {
let event = Event::new("test_event", serde_json::json!({"value": 42}));
assert_eq!(event.name, "test_event");
assert_eq!(event.severity, Severity::Info);
assert_eq!(event.payload["value"], 42);
assert!(event.labels.is_empty());
assert!(event.timestamp > 0);
}
#[test]
fn test_event_with_severity() {
let event = Event::new("alert", serde_json::json!({})).with_severity(Severity::Critical);
assert_eq!(event.severity, Severity::Critical);
}
#[test]
fn test_event_with_label() {
let event = Event::new("evt", serde_json::json!({}))
.with_label("key1", "val1")
.with_label("key2", "val2");
assert_eq!(event.labels.get("key1"), Some(&"val1".to_string()));
assert_eq!(event.labels.get("key2"), Some(&"val2".to_string()));
}
#[test]
fn test_event_serialization() {
let event = Event::new("test", serde_json::json!({"x": 1}))
.with_severity(Severity::Warning)
.with_label("env", "prod");
let json = serde_json::to_string(&event).unwrap();
let parsed: Event = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.name, "test");
assert_eq!(parsed.severity, Severity::Warning);
assert_eq!(parsed.labels.get("env"), Some(&"prod".to_string()));
}
#[test]
fn test_audit_new() {
let audit = Audit::new("my_sink", "sink", "write");
assert_eq!(audit.node_id, "my_sink");
assert_eq!(audit.node_type, "sink");
assert_eq!(audit.operation, "write");
assert_eq!(audit.status, AuditStatus::Success);
assert!(audit.message_id.is_none());
assert_eq!(audit.duration_ms, 0);
assert!(audit.error.is_none());
assert!(audit.timestamp > 0);
}
#[test]
fn test_audit_success() {
let audit = Audit::new("transform_1", "transform", "process").success(150);
assert_eq!(audit.status, AuditStatus::Success);
assert_eq!(audit.duration_ms, 150);
assert!(audit.error.is_none());
}
#[test]
fn test_audit_failure() {
let audit = Audit::new("sink_db", "sink", "write").failure(50, "Connection timeout");
assert_eq!(audit.status, AuditStatus::Failure);
assert_eq!(audit.duration_ms, 50);
assert_eq!(audit.error, Some("Connection timeout".to_string()));
}
#[test]
fn test_audit_with_message_id() {
let audit = Audit::new("source_1", "source", "emit")
.with_message_id("msg-123")
.success(10);
assert_eq!(audit.message_id, Some("msg-123".to_string()));
}
#[test]
fn test_audit_serialization() {
let audit = Audit::new("test_sink", "sink", "write")
.with_message_id("msg-456")
.with_label("batch", "1")
.success(100);
let json = serde_json::to_string(&audit).unwrap();
let parsed: Audit = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.node_id, "test_sink");
assert_eq!(parsed.status, AuditStatus::Success);
assert_eq!(parsed.message_id, Some("msg-456".to_string()));
assert_eq!(parsed.labels.get("batch"), Some(&"1".to_string()));
}
#[test]
fn test_audit_status_serde() {
let json = serde_json::to_string(&AuditStatus::Success).unwrap();
assert_eq!(json, "\"success\"");
let status: AuditStatus = serde_json::from_str("\"failure\"").unwrap();
assert_eq!(status, AuditStatus::Failure);
}
#[test]
fn test_notify_new() {
let notify = Notify::new("alert", Severity::Warning, "Test message");
assert_eq!(notify.name, "alert");
assert_eq!(notify.severity, Severity::Warning);
assert_eq!(notify.message, "Test message");
assert!(notify.labels.is_empty());
assert!(notify.timestamp > 0);
}
#[test]
fn test_notify_with_label() {
let notify = Notify::new("alert", Severity::Error, "Error occurred")
.with_label("service", "api")
.with_label("env", "prod");
assert_eq!(notify.labels.get("service"), Some(&"api".to_string()));
assert_eq!(notify.labels.get("env"), Some(&"prod".to_string()));
}
#[test]
fn test_notify_serialization() {
let notify = Notify::new("test_notify", Severity::Critical, "Critical alert")
.with_label("region", "us-east");
let json = serde_json::to_string(¬ify).unwrap();
let parsed: Notify = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.name, "test_notify");
assert_eq!(parsed.severity, Severity::Critical);
assert_eq!(parsed.message, "Critical alert");
assert_eq!(parsed.labels.get("region"), Some(&"us-east".to_string()));
}
}