use std::sync::Arc;
use std::time::{SystemTime, UNIX_EPOCH};
use crate::cancellation_reason::CancellationReason;
#[derive(Debug, Clone)]
pub struct EngineEventData {
pub timestamp: u64,
pub run_id: String,
pub event: EngineEvent,
}
impl EngineEventData {
pub fn new(run_id: String, event: EngineEvent) -> Self {
let timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
Self {
timestamp,
run_id,
event,
}
}
}
#[non_exhaustive]
#[derive(Debug, Clone)]
pub enum EngineEvent {
RunStarted {
workflow_name: String,
},
RunCompleted {
succeeded: bool,
},
RunResumed {
workflow_name: String,
},
RunCancelled {
reason: CancellationReason,
},
StepStarted {
step_name: String,
},
StepCompleted {
step_name: String,
succeeded: bool,
},
StepRetrying {
step_name: String,
attempt: u32,
},
GateWaiting {
gate_name: String,
},
GateResolved {
gate_name: String,
approved: bool,
},
FanOutItemsCollected {
count: usize,
},
FanOutItemStarted {
item_id: String,
},
FanOutItemCompleted {
item_id: String,
succeeded: bool,
},
MetricsUpdated {
total_cost: f64,
total_turns: i64,
total_duration_ms: i64,
},
Panicked {
message: String,
backtrace: String,
},
}
pub trait EventSink: Send + Sync + 'static {
fn emit(&self, event: &EngineEventData);
}
pub fn emit_to_sinks(run_id: &str, event: EngineEvent, sinks: &[Arc<dyn EventSink>]) {
if sinks.is_empty() {
return;
}
let data = EngineEventData::new(run_id.to_string(), event);
for sink in sinks.iter() {
if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
sink.emit(&data);
}))
.is_err()
{
tracing::warn!(
run_id = %run_id,
"EventSink::emit panicked — continuing with remaining sinks"
);
}
}
}