use crate::{ExecutionResult, NodeId, NodeKind, NodeMetrics, WorkflowId, WorkflowMetadata};
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
#[cfg(feature = "openapi")]
use utoipa::ToSchema;
pub type EventId = uuid::Uuid;
pub type ExecutionId = uuid::Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(ToSchema))]
pub struct ExecutionEvent {
#[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
pub id: EventId,
#[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
pub execution_id: ExecutionId,
#[cfg_attr(feature = "openapi", schema(value_type = uuid::Uuid))]
pub workflow_id: WorkflowId,
#[cfg_attr(feature = "openapi", schema(value_type = Option<uuid::Uuid>))]
pub node_id: Option<NodeId>,
pub timestamp: DateTime<Utc>,
pub event_type: EventType,
pub details: EventDetails,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(ToSchema))]
pub enum EventType {
WorkflowStarted,
WorkflowCompleted,
WorkflowFailed,
WorkflowCancelled,
NodeStarted,
NodeCompleted,
NodeFailed,
NodeSkipped,
VariableChanged,
ErrorOccurred,
CheckpointCreated,
ExecutionResumed,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(ToSchema))]
#[serde(tag = "type")]
pub enum EventDetails {
WorkflowStarted {
metadata: WorkflowMetadata,
#[serde(default)]
input: HashMap<String, Value>,
},
WorkflowCompleted {
duration_ms: u64,
result: ExecutionResult,
},
WorkflowFailed {
error: String,
duration_ms: u64,
#[serde(skip_serializing_if = "Option::is_none")]
stack_trace: Option<String>,
},
WorkflowCancelled {
reason: String,
duration_ms: u64,
},
NodeStarted {
node_kind: NodeKind,
#[serde(default)]
input: HashMap<String, Value>,
},
NodeCompleted {
node_kind: NodeKind,
duration_ms: u64,
metrics: NodeMetrics,
#[serde(default)]
output: HashMap<String, Value>,
},
NodeFailed {
node_kind: NodeKind,
error: String,
#[serde(skip_serializing_if = "Option::is_none")]
stack_trace: Option<String>,
retry_attempt: u32,
},
NodeSkipped {
node_kind: NodeKind,
reason: String,
},
VariableChanged {
variable_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
old_value: Option<Value>,
new_value: Value,
source: String,
},
ErrorOccurred {
error: String,
#[serde(skip_serializing_if = "Option::is_none")]
stack_trace: Option<String>,
#[serde(default)]
context: HashMap<String, Value>,
},
CheckpointCreated {
checkpoint_id: String,
nodes_completed: usize,
state: String,
},
ExecutionResumed {
checkpoint_id: String,
nodes_to_skip: usize,
},
}
impl ExecutionEvent {
pub fn workflow_started(
execution_id: ExecutionId,
workflow_id: WorkflowId,
metadata: WorkflowMetadata,
input: HashMap<String, Value>,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::WorkflowStarted,
details: EventDetails::WorkflowStarted { metadata, input },
}
}
pub fn workflow_completed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
duration_ms: u64,
result: ExecutionResult,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::WorkflowCompleted,
details: EventDetails::WorkflowCompleted {
duration_ms,
result,
},
}
}
pub fn workflow_failed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
duration_ms: u64,
error: String,
stack_trace: Option<String>,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::WorkflowFailed,
details: EventDetails::WorkflowFailed {
error,
duration_ms,
stack_trace,
},
}
}
pub fn workflow_cancelled(
execution_id: ExecutionId,
workflow_id: WorkflowId,
duration_ms: u64,
reason: String,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::WorkflowCancelled,
details: EventDetails::WorkflowCancelled {
reason,
duration_ms,
},
}
}
pub fn node_started(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: NodeId,
node_kind: NodeKind,
input: HashMap<String, Value>,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: Some(node_id),
timestamp: Utc::now(),
event_type: EventType::NodeStarted,
details: EventDetails::NodeStarted { node_kind, input },
}
}
pub fn node_completed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: NodeId,
node_kind: NodeKind,
duration_ms: u64,
metrics: NodeMetrics,
output: HashMap<String, Value>,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: Some(node_id),
timestamp: Utc::now(),
event_type: EventType::NodeCompleted,
details: EventDetails::NodeCompleted {
node_kind,
duration_ms,
metrics,
output,
},
}
}
pub fn node_failed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: NodeId,
node_kind: NodeKind,
error: String,
stack_trace: Option<String>,
retry_attempt: u32,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: Some(node_id),
timestamp: Utc::now(),
event_type: EventType::NodeFailed,
details: EventDetails::NodeFailed {
node_kind,
error,
stack_trace,
retry_attempt,
},
}
}
pub fn node_skipped(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: NodeId,
node_kind: NodeKind,
reason: String,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: Some(node_id),
timestamp: Utc::now(),
event_type: EventType::NodeSkipped,
details: EventDetails::NodeSkipped { node_kind, reason },
}
}
pub fn variable_changed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: Option<NodeId>,
variable_name: String,
old_value: Option<Value>,
new_value: Value,
source: String,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id,
timestamp: Utc::now(),
event_type: EventType::VariableChanged,
details: EventDetails::VariableChanged {
variable_name,
old_value,
new_value,
source,
},
}
}
pub fn error_occurred(
execution_id: ExecutionId,
workflow_id: WorkflowId,
node_id: Option<NodeId>,
error: String,
stack_trace: Option<String>,
context: HashMap<String, Value>,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id,
timestamp: Utc::now(),
event_type: EventType::ErrorOccurred,
details: EventDetails::ErrorOccurred {
error,
stack_trace,
context,
},
}
}
pub fn checkpoint_created(
execution_id: ExecutionId,
workflow_id: WorkflowId,
checkpoint_id: String,
nodes_completed: usize,
state: String,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::CheckpointCreated,
details: EventDetails::CheckpointCreated {
checkpoint_id,
nodes_completed,
state,
},
}
}
pub fn execution_resumed(
execution_id: ExecutionId,
workflow_id: WorkflowId,
checkpoint_id: String,
nodes_to_skip: usize,
) -> Self {
Self {
id: EventId::new_v4(),
execution_id,
workflow_id,
node_id: None,
timestamp: Utc::now(),
event_type: EventType::ExecutionResumed,
details: EventDetails::ExecutionResumed {
checkpoint_id,
nodes_to_skip,
},
}
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
#[cfg_attr(feature = "openapi", derive(ToSchema))]
pub struct EventTimeline {
pub events: Vec<ExecutionEvent>,
}
impl EventTimeline {
pub fn new() -> Self {
Self { events: Vec::new() }
}
pub fn push(&mut self, event: ExecutionEvent) {
self.events.push(event);
}
pub fn filter_by_type(&self, event_type: EventType) -> Vec<&ExecutionEvent> {
self.events
.iter()
.filter(|e| e.event_type == event_type)
.collect()
}
pub fn filter_by_node(&self, node_id: NodeId) -> Vec<&ExecutionEvent> {
self.events
.iter()
.filter(|e| e.node_id == Some(node_id))
.collect()
}
pub fn filter_by_time_range(
&self,
start: DateTime<Utc>,
end: DateTime<Utc>,
) -> Vec<&ExecutionEvent> {
self.events
.iter()
.filter(|e| e.timestamp >= start && e.timestamp <= end)
.collect()
}
pub fn total_duration_ms(&self) -> Option<u64> {
let start = self.events.first()?.timestamp;
let end = self.events.last()?.timestamp;
Some((end - start).num_milliseconds() as u64)
}
pub fn count_by_type(&self, event_type: EventType) -> usize {
self.events
.iter()
.filter(|e| e.event_type == event_type)
.count()
}
pub fn errors(&self) -> Vec<&ExecutionEvent> {
self.events
.iter()
.filter(|e| {
matches!(
e.event_type,
EventType::NodeFailed | EventType::WorkflowFailed | EventType::ErrorOccurred
)
})
.collect()
}
pub fn is_successful(&self) -> bool {
self.events
.iter()
.any(|e| e.event_type == EventType::WorkflowCompleted)
}
pub fn is_failed(&self) -> bool {
self.events
.iter()
.any(|e| e.event_type == EventType::WorkflowFailed)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_workflow_started_event() {
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let metadata = WorkflowMetadata::new("test-workflow".to_string());
let event = ExecutionEvent::workflow_started(
execution_id,
workflow_id,
metadata.clone(),
HashMap::new(),
);
assert_eq!(event.execution_id, execution_id);
assert_eq!(event.workflow_id, workflow_id);
assert_eq!(event.event_type, EventType::WorkflowStarted);
assert!(event.node_id.is_none());
}
#[test]
fn test_node_events() {
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let node_id = NodeId::new_v4();
let started = ExecutionEvent::node_started(
execution_id,
workflow_id,
node_id,
NodeKind::Start,
HashMap::new(),
);
assert_eq!(started.event_type, EventType::NodeStarted);
assert_eq!(started.node_id, Some(node_id));
let metrics = NodeMetrics::default();
let completed = ExecutionEvent::node_completed(
execution_id,
workflow_id,
node_id,
NodeKind::Start,
100,
metrics,
HashMap::new(),
);
assert_eq!(completed.event_type, EventType::NodeCompleted);
}
#[test]
fn test_event_timeline() {
let mut timeline = EventTimeline::new();
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let metadata = WorkflowMetadata::new("test".to_string());
timeline.push(ExecutionEvent::workflow_started(
execution_id,
workflow_id,
metadata,
HashMap::new(),
));
assert_eq!(timeline.events.len(), 1);
assert_eq!(timeline.count_by_type(EventType::WorkflowStarted), 1);
}
#[test]
fn test_timeline_filtering() {
let mut timeline = EventTimeline::new();
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let node_id = NodeId::new_v4();
timeline.push(ExecutionEvent::node_started(
execution_id,
workflow_id,
node_id,
NodeKind::Start,
HashMap::new(),
));
timeline.push(ExecutionEvent::node_failed(
execution_id,
workflow_id,
node_id,
NodeKind::Start,
"Test error".to_string(),
None,
0,
));
let node_events = timeline.filter_by_node(node_id);
assert_eq!(node_events.len(), 2);
let errors = timeline.errors();
assert_eq!(errors.len(), 1);
}
#[test]
fn test_variable_changed_event() {
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let node_id = NodeId::new_v4();
let event = ExecutionEvent::variable_changed(
execution_id,
workflow_id,
Some(node_id),
"counter".to_string(),
Some(Value::from(0)),
Value::from(1),
node_id.to_string(),
);
assert_eq!(event.event_type, EventType::VariableChanged);
if let EventDetails::VariableChanged { variable_name, .. } = &event.details {
assert_eq!(variable_name, "counter");
} else {
panic!("Expected VariableChanged event details");
}
}
#[test]
fn test_timeline_success_check() {
let mut timeline = EventTimeline::new();
let execution_id = ExecutionId::new_v4();
let workflow_id = WorkflowId::new_v4();
let metadata = WorkflowMetadata::new("test".to_string());
timeline.push(ExecutionEvent::workflow_started(
execution_id,
workflow_id,
metadata,
HashMap::new(),
));
assert!(!timeline.is_successful());
assert!(!timeline.is_failed());
let result = ExecutionResult::Success(Value::Null);
timeline.push(ExecutionEvent::workflow_completed(
execution_id,
workflow_id,
1000,
result,
));
assert!(timeline.is_successful());
assert!(!timeline.is_failed());
}
}