1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use crate::cancellation_reason::CancellationReason;
5
6#[derive(Debug, Clone)]
8pub struct EngineEventData {
9 pub timestamp: u64,
11 pub run_id: String,
13 pub event: EngineEvent,
15}
16
17impl EngineEventData {
18 pub fn new(run_id: String, event: EngineEvent) -> Self {
19 let timestamp = SystemTime::now()
20 .duration_since(UNIX_EPOCH)
21 .unwrap_or_default()
22 .as_secs();
23 Self {
24 timestamp,
25 run_id,
26 event,
27 }
28 }
29}
30
31#[non_exhaustive]
36#[derive(Debug, Clone)]
37pub enum EngineEvent {
38 RunStarted {
40 workflow_name: String,
41 },
42 RunCompleted {
43 succeeded: bool,
44 },
45 RunResumed {
46 workflow_name: String,
47 },
48 RunCancelled {
49 reason: CancellationReason,
50 },
51 StepStarted {
53 step_name: String,
54 },
55 StepCompleted {
56 step_name: String,
57 succeeded: bool,
58 },
59 StepRetrying {
60 step_name: String,
61 attempt: u32,
62 },
63 GateWaiting {
65 gate_name: String,
66 },
67 GateResolved {
68 gate_name: String,
69 approved: bool,
70 },
71 FanOutItemsCollected {
73 count: usize,
74 },
75 FanOutItemStarted {
76 item_id: String,
77 },
78 FanOutItemCompleted {
79 item_id: String,
80 succeeded: bool,
81 },
82 MetricsUpdated {
84 total_cost: f64,
85 total_turns: i64,
86 total_duration_ms: i64,
87 },
88 Panicked {
90 message: String,
91 backtrace: String,
92 },
93}
94
95pub trait EventSink: Send + Sync + 'static {
105 fn emit(&self, event: &EngineEventData);
106}
107
108pub fn emit_to_sinks(run_id: &str, event: EngineEvent, sinks: &[Arc<dyn EventSink>]) {
113 if sinks.is_empty() {
114 return;
115 }
116 let data = EngineEventData::new(run_id.to_string(), event);
117 for sink in sinks.iter() {
118 if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
119 sink.emit(&data);
120 }))
121 .is_err()
122 {
123 tracing::warn!(
124 run_id = %run_id,
125 "EventSink::emit panicked — continuing with remaining sinks"
126 );
127 }
128 }
129}