use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TracedEvent {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub correlation_id: Uuid,
pub parent_id: Option<Uuid>,
pub category: EventCategory,
pub action: String,
pub fields: HashMap<String, serde_json::Value>,
pub node_id: Option<u64>,
pub duration_ms: Option<u64>,
pub level: EventLevel,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(rename_all = "snake_case")]
pub enum EventCategory {
Job,
Allocation,
Task,
Node,
Scheduling,
Network,
Health,
Security,
Build,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Default)]
#[serde(rename_all = "snake_case")]
pub enum EventLevel {
#[default]
Info,
Warning,
Error,
Debug,
}
impl TracedEvent {
pub fn new(correlation_id: Uuid, category: EventCategory, action: impl Into<String>) -> Self {
Self {
id: Uuid::new_v4(),
timestamp: Utc::now(),
correlation_id,
parent_id: None,
category,
action: action.into(),
fields: HashMap::new(),
node_id: None,
duration_ms: None,
level: EventLevel::Info,
}
}
pub fn with_parent(mut self, parent_id: Uuid) -> Self {
self.parent_id = Some(parent_id);
self
}
pub fn with_node(mut self, node_id: u64) -> Self {
self.node_id = Some(node_id);
self
}
pub fn with_duration(mut self, ms: u64) -> Self {
self.duration_ms = Some(ms);
self
}
pub fn with_level(mut self, level: EventLevel) -> Self {
self.level = level;
self
}
pub fn field(mut self, key: impl Into<String>, value: impl Into<serde_json::Value>) -> Self {
self.fields.insert(key.into(), value.into());
self
}
pub fn allocation_phase(
correlation_id: Uuid,
alloc_id: Uuid,
from_phase: &str,
to_phase: &str,
) -> Self {
Self::new(
correlation_id,
EventCategory::Allocation,
"phase_transition",
)
.field("alloc_id", alloc_id.to_string())
.field("from_phase", from_phase)
.field("to_phase", to_phase)
}
pub fn scheduling_decision(
correlation_id: Uuid,
job_id: &str,
node_id: u64,
driver: &str,
) -> Self {
Self::new(
correlation_id,
EventCategory::Scheduling,
"allocation_placed",
)
.field("job_id", job_id)
.field("node_id", serde_json::Value::Number(node_id.into()))
.field("driver", driver)
}
pub fn health_check(
correlation_id: Uuid,
service_name: &str,
healthy: bool,
latency_ms: u64,
) -> Self {
Self::new(correlation_id, EventCategory::Health, "probe_result")
.field("service_name", service_name)
.field("healthy", healthy)
.with_duration(latency_ms)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CorrelationContext {
pub correlation_id: Uuid,
pub job_id: String,
pub event_count: u64,
pub last_event_id: Option<Uuid>,
}
impl CorrelationContext {
pub fn new(job_id: &str) -> Self {
Self {
correlation_id: Uuid::new_v4(),
job_id: job_id.to_string(),
event_count: 0,
last_event_id: None,
}
}
pub fn emit(&mut self, category: EventCategory, action: impl Into<String>) -> TracedEvent {
let event = TracedEvent::new(self.correlation_id, category, action);
let event = if let Some(parent) = self.last_event_id {
event.with_parent(parent)
} else {
event
};
self.last_event_id = Some(event.id);
self.event_count += 1;
event
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_traced_event_creation() {
let corr_id = Uuid::new_v4();
let event = TracedEvent::new(corr_id, EventCategory::Job, "submitted")
.field("job_id", "web-service")
.with_node(42);
assert_eq!(event.correlation_id, corr_id);
assert_eq!(event.category, EventCategory::Job);
assert_eq!(event.action, "submitted");
assert_eq!(event.node_id, Some(42));
assert_eq!(event.fields["job_id"], "web-service");
}
#[test]
fn test_correlation_context() {
let mut ctx = CorrelationContext::new("web-service");
let e1 = ctx.emit(EventCategory::Job, "submitted");
assert!(e1.parent_id.is_none());
assert_eq!(ctx.event_count, 1);
let e2 = ctx.emit(EventCategory::Scheduling, "placed");
assert_eq!(e2.parent_id, Some(e1.id));
assert_eq!(ctx.event_count, 2);
let e3 = ctx.emit(EventCategory::Allocation, "warming");
assert_eq!(e3.parent_id, Some(e2.id));
assert_eq!(e3.correlation_id, e1.correlation_id);
}
#[test]
fn test_convenience_constructors() {
let corr = Uuid::new_v4();
let alloc = Uuid::new_v4();
let phase = TracedEvent::allocation_phase(corr, alloc, "warming", "executing");
assert_eq!(phase.fields["from_phase"], "warming");
assert_eq!(phase.fields["to_phase"], "executing");
let sched = TracedEvent::scheduling_decision(corr, "web", 42, "wasi");
assert_eq!(sched.fields["driver"], "wasi");
let health = TracedEvent::health_check(corr, "web", true, 5);
assert_eq!(health.duration_ms, Some(5));
}
#[test]
fn test_serde_roundtrip() {
let event = TracedEvent::new(Uuid::new_v4(), EventCategory::Network, "flow_detected")
.field("bytes", 1024)
.with_level(EventLevel::Debug);
let json = serde_json::to_string(&event).unwrap();
let back: TracedEvent = serde_json::from_str(&json).unwrap();
assert_eq!(back.category, EventCategory::Network);
assert_eq!(back.level, EventLevel::Debug);
}
}