Skip to main content

runkon_flow/
events.rs

1use std::sync::Arc;
2use std::time::{SystemTime, UNIX_EPOCH};
3
4use crate::cancellation_reason::CancellationReason;
5
6/// A single workflow engine event with timestamp and run identity.
7#[derive(Debug, Clone)]
8pub struct EngineEventData {
9    /// Unix timestamp (seconds) when the event was emitted.
10    pub timestamp: u64,
11    /// The workflow run ID this event belongs to.
12    pub run_id: String,
13    /// The event payload.
14    pub event: EngineEvent,
15}
16
17impl EngineEventData {
18    pub fn new(run_id: String, event: EngineEvent) -> Self {
19        let timestamp = SystemTime::now()
20            .duration_since(UNIX_EPOCH)
21            .unwrap_or_default()
22            .as_secs();
23        Self {
24            timestamp,
25            run_id,
26            event,
27        }
28    }
29}
30
31/// Workflow engine event variants emitted after each DB-write state transition.
32///
33/// Marked `#[non_exhaustive]` so downstream crates must handle an `_` arm —
34/// future variants can be added without breaking existing sinks.
35#[non_exhaustive]
36#[derive(Debug, Clone)]
37pub enum EngineEvent {
38    // Run lifecycle
39    RunStarted {
40        workflow_name: String,
41    },
42    RunCompleted {
43        succeeded: bool,
44    },
45    RunResumed {
46        workflow_name: String,
47    },
48    RunCancelled {
49        reason: CancellationReason,
50    },
51    // Step lifecycle
52    StepStarted {
53        step_name: String,
54    },
55    StepCompleted {
56        step_name: String,
57        succeeded: bool,
58    },
59    StepRetrying {
60        step_name: String,
61        attempt: u32,
62    },
63    // Gate
64    GateWaiting {
65        gate_name: String,
66    },
67    GateResolved {
68        gate_name: String,
69        approved: bool,
70    },
71    // Fan-out
72    FanOutItemsCollected {
73        count: usize,
74    },
75    FanOutItemStarted {
76        item_id: String,
77    },
78    FanOutItemCompleted {
79        item_id: String,
80        succeeded: bool,
81    },
82    // Metrics
83    MetricsUpdated {
84        total_cost: f64,
85        total_turns: i64,
86        total_duration_ms: i64,
87    },
88    // Panic capture
89    Panicked {
90        message: String,
91        backtrace: String,
92    },
93}
94
95/// Observability sink that receives engine events after each DB-write state transition.
96///
97/// # Contract
98///
99/// - **DB writes happen before emit**: subscribers never observe pre-persistence state.
100/// - **Slow sinks block the engine**: sinks that need async offload must implement it
101///   internally (e.g. send over a channel, not await a future).
102/// - **Panics are caught**: the engine wraps each `emit` call in `catch_unwind` and
103///   logs panics; they do not abort the run.
104pub trait EventSink: Send + Sync + 'static {
105    fn emit(&self, event: &EngineEventData);
106}
107
108/// Emit an event to all sinks, catching and logging any panics.
109///
110/// Panics are caught per-sink so one bad sink cannot abort the run or silence
111/// subsequent sinks. The `run_id` is included in the warning for debuggability.
112pub fn emit_to_sinks(run_id: &str, event: EngineEvent, sinks: &[Arc<dyn EventSink>]) {
113    if sinks.is_empty() {
114        return;
115    }
116    let data = EngineEventData::new(run_id.to_string(), event);
117    for sink in sinks.iter() {
118        if std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
119            sink.emit(&data);
120        }))
121        .is_err()
122        {
123            tracing::warn!(
124                run_id = %run_id,
125                "EventSink::emit panicked — continuing with remaining sinks"
126            );
127        }
128    }
129}