Skip to main content

a3s_flow/
event.rs

1//! [`EventEmitter`] — node and flow lifecycle event extension point.
2//!
3//! Implement [`EventEmitter`] to react to workflow execution events — e.g. to
4//! stream progress to a UI, collect metrics, or integrate with `a3s-event`.
5//!
6//! Register a custom emitter via
7//! [`FlowEngine::with_event_emitter`](crate::engine::FlowEngine::with_event_emitter) or
8//! [`FlowRunner::with_event_emitter`](crate::runner::FlowRunner::with_event_emitter).
9//! The built-in [`NoopEventEmitter`] is used when no custom emitter is set.
10//!
11//! For pull-based event consumption, use
12//! [`FlowEngine::start_streaming`](crate::engine::FlowEngine::start_streaming)
13//! which returns a [`tokio::sync::broadcast::Receiver<FlowEvent>`].
14
15use std::sync::Arc;
16
17use async_trait::async_trait;
18use serde_json::Value;
19use tokio::sync::broadcast;
20use uuid::Uuid;
21
22use crate::result::FlowResult;
23
24/// A snapshot of a single lifecycle event emitted during flow execution.
25///
26/// Produced by [`FlowEngine::start_streaming`](crate::engine::FlowEngine::start_streaming)
27/// via a [`tokio::sync::broadcast`] channel. All variants are `Clone` so they
28/// can be forwarded to multiple subscribers.
29#[derive(Debug, Clone)]
30pub enum FlowEvent {
31    /// A flow execution started.
32    FlowStarted { execution_id: Uuid },
33    /// A flow execution completed successfully.
34    FlowCompleted {
35        execution_id: Uuid,
36        result: FlowResult,
37    },
38    /// A flow execution failed (node error or internal error).
39    FlowFailed { execution_id: Uuid, reason: String },
40    /// A flow execution was terminated externally.
41    FlowTerminated { execution_id: Uuid },
42    /// A node is about to execute.
43    NodeStarted {
44        execution_id: Uuid,
45        node_id: String,
46        node_type: String,
47    },
48    /// A node completed successfully.
49    NodeCompleted {
50        execution_id: Uuid,
51        node_id: String,
52        output: Value,
53    },
54    /// A node was skipped because its `run_if` guard evaluated to false.
55    NodeSkipped { execution_id: Uuid, node_id: String },
56    /// A node failed (all retry attempts exhausted).
57    NodeFailed {
58        execution_id: Uuid,
59        node_id: String,
60        reason: String,
61    },
62}
63
64/// An [`EventEmitter`] that forwards all events into a broadcast channel.
65///
66/// Created internally by [`FlowEngine::start_streaming`](crate::engine::FlowEngine::start_streaming).
67pub(crate) struct ChannelEmitter {
68    tx: broadcast::Sender<FlowEvent>,
69}
70
71impl ChannelEmitter {
72    pub(crate) fn new(tx: broadcast::Sender<FlowEvent>) -> Self {
73        Self { tx }
74    }
75}
76
77#[async_trait]
78impl EventEmitter for ChannelEmitter {
79    async fn on_flow_started(&self, execution_id: Uuid) {
80        let _ = self.tx.send(FlowEvent::FlowStarted { execution_id });
81    }
82
83    async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult) {
84        let _ = self.tx.send(FlowEvent::FlowCompleted {
85            execution_id,
86            result: result.clone(),
87        });
88    }
89
90    async fn on_flow_failed(&self, execution_id: Uuid, reason: &str) {
91        let _ = self.tx.send(FlowEvent::FlowFailed {
92            execution_id,
93            reason: reason.to_string(),
94        });
95    }
96
97    async fn on_flow_terminated(&self, execution_id: Uuid) {
98        let _ = self.tx.send(FlowEvent::FlowTerminated { execution_id });
99    }
100
101    async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str) {
102        let _ = self.tx.send(FlowEvent::NodeStarted {
103            execution_id,
104            node_id: node_id.to_string(),
105            node_type: node_type.to_string(),
106        });
107    }
108
109    async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value) {
110        let _ = self.tx.send(FlowEvent::NodeCompleted {
111            execution_id,
112            node_id: node_id.to_string(),
113            output: output.clone(),
114        });
115    }
116
117    async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str) {
118        let _ = self.tx.send(FlowEvent::NodeSkipped {
119            execution_id,
120            node_id: node_id.to_string(),
121        });
122    }
123
124    async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str) {
125        let _ = self.tx.send(FlowEvent::NodeFailed {
126            execution_id,
127            node_id: node_id.to_string(),
128            reason: reason.to_string(),
129        });
130    }
131}
132
133/// An [`EventEmitter`] that fans events out to two downstream emitters.
134///
135/// Used internally by [`FlowEngine::start_streaming`](crate::engine::FlowEngine::start_streaming)
136/// to compose a [`ChannelEmitter`] with the engine's existing emitter.
137pub(crate) struct MulticastEmitter {
138    pub(crate) a: Arc<dyn EventEmitter>,
139    pub(crate) b: Arc<dyn EventEmitter>,
140}
141
142#[async_trait]
143impl EventEmitter for MulticastEmitter {
144    async fn on_flow_started(&self, execution_id: Uuid) {
145        self.a.on_flow_started(execution_id).await;
146        self.b.on_flow_started(execution_id).await;
147    }
148
149    async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult) {
150        self.a.on_flow_completed(execution_id, result).await;
151        self.b.on_flow_completed(execution_id, result).await;
152    }
153
154    async fn on_flow_failed(&self, execution_id: Uuid, reason: &str) {
155        self.a.on_flow_failed(execution_id, reason).await;
156        self.b.on_flow_failed(execution_id, reason).await;
157    }
158
159    async fn on_flow_terminated(&self, execution_id: Uuid) {
160        self.a.on_flow_terminated(execution_id).await;
161        self.b.on_flow_terminated(execution_id).await;
162    }
163
164    async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str) {
165        self.a
166            .on_node_started(execution_id, node_id, node_type)
167            .await;
168        self.b
169            .on_node_started(execution_id, node_id, node_type)
170            .await;
171    }
172
173    async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value) {
174        self.a
175            .on_node_completed(execution_id, node_id, output)
176            .await;
177        self.b
178            .on_node_completed(execution_id, node_id, output)
179            .await;
180    }
181
182    async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str) {
183        self.a.on_node_skipped(execution_id, node_id).await;
184        self.b.on_node_skipped(execution_id, node_id).await;
185    }
186
187    async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str) {
188        self.a.on_node_failed(execution_id, node_id, reason).await;
189        self.b.on_node_failed(execution_id, node_id, reason).await;
190    }
191}
192
193/// Listener for flow and node lifecycle events.
194///
195/// All methods default to no-ops in [`NoopEventEmitter`]. Implement only the
196/// events you care about by delegating to your own struct.
197///
198/// # Example
199///
200/// ```rust
201/// use a3s_flow::{EventEmitter, FlowResult, NoopEventEmitter};
202/// use async_trait::async_trait;
203/// use serde_json::Value;
204/// use uuid::Uuid;
205///
206/// struct PrintEmitter;
207///
208/// #[async_trait]
209/// impl EventEmitter for PrintEmitter {
210///     async fn on_flow_started(&self, _: Uuid) {}
211///     async fn on_flow_completed(&self, _: Uuid, _: &FlowResult) {}
212///     async fn on_flow_failed(&self, _: Uuid, _: &str) {}
213///     async fn on_flow_terminated(&self, _: Uuid) {}
214///     async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
215///     async fn on_node_completed(&self, _exec: Uuid, node_id: &str, _out: &Value) {
216///         println!("node {node_id} completed");
217///     }
218///     async fn on_node_skipped(&self, _: Uuid, _: &str) {}
219///     async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
220/// }
221/// ```
222#[async_trait]
223pub trait EventEmitter: Send + Sync {
224    /// A flow execution has started.
225    async fn on_flow_started(&self, execution_id: Uuid);
226
227    /// A flow execution completed successfully.
228    async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult);
229
230    /// A flow execution failed (node error or internal error).
231    async fn on_flow_failed(&self, execution_id: Uuid, reason: &str);
232
233    /// A flow execution was terminated via [`FlowEngine::terminate`](crate::engine::FlowEngine::terminate).
234    async fn on_flow_terminated(&self, execution_id: Uuid);
235
236    /// A node is about to execute (before the first attempt).
237    async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str);
238
239    /// A node completed successfully.
240    async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value);
241
242    /// A node was skipped because its `run_if` guard evaluated to false.
243    async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str);
244
245    /// A node failed (all retry attempts exhausted).
246    async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str);
247}
248
249/// A no-op [`EventEmitter`] — the default when no custom emitter is registered.
250pub struct NoopEventEmitter;
251
252#[async_trait]
253impl EventEmitter for NoopEventEmitter {
254    async fn on_flow_started(&self, _: Uuid) {}
255    async fn on_flow_completed(&self, _: Uuid, _: &FlowResult) {}
256    async fn on_flow_failed(&self, _: Uuid, _: &str) {}
257    async fn on_flow_terminated(&self, _: Uuid) {}
258    async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
259    async fn on_node_completed(&self, _: Uuid, _: &str, _: &Value) {}
260    async fn on_node_skipped(&self, _: Uuid, _: &str) {}
261    async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
262}
263
264#[cfg(test)]
265mod tests {
266    use super::*;
267    use std::sync::{
268        atomic::{AtomicU32, Ordering},
269        Arc,
270    };
271
272    // A test emitter that counts each event type.
273    struct CountEmitter {
274        flow_started: Arc<AtomicU32>,
275        flow_completed: Arc<AtomicU32>,
276        flow_failed: Arc<AtomicU32>,
277        flow_terminated: Arc<AtomicU32>,
278        node_started: Arc<AtomicU32>,
279        node_completed: Arc<AtomicU32>,
280        node_skipped: Arc<AtomicU32>,
281        node_failed: Arc<AtomicU32>,
282    }
283
284    impl CountEmitter {
285        fn new() -> (Arc<Self>, Counts) {
286            let s = Arc::new(AtomicU32::new(0));
287            let c = Arc::new(AtomicU32::new(0));
288            let fa = Arc::new(AtomicU32::new(0));
289            let t = Arc::new(AtomicU32::new(0));
290            let ns = Arc::new(AtomicU32::new(0));
291            let nc = Arc::new(AtomicU32::new(0));
292            let nsk = Arc::new(AtomicU32::new(0));
293            let nf = Arc::new(AtomicU32::new(0));
294            let emitter = Arc::new(CountEmitter {
295                flow_started: Arc::clone(&s),
296                flow_completed: Arc::clone(&c),
297                flow_failed: Arc::clone(&fa),
298                flow_terminated: Arc::clone(&t),
299                node_started: Arc::clone(&ns),
300                node_completed: Arc::clone(&nc),
301                node_skipped: Arc::clone(&nsk),
302                node_failed: Arc::clone(&nf),
303            });
304            let counts = Counts {
305                s,
306                c,
307                fa,
308                t,
309                ns,
310                nc,
311                nsk,
312                nf,
313            };
314            (emitter, counts)
315        }
316    }
317
318    #[allow(dead_code)]
319    struct Counts {
320        s: Arc<AtomicU32>,
321        c: Arc<AtomicU32>,
322        fa: Arc<AtomicU32>,
323        t: Arc<AtomicU32>,
324        ns: Arc<AtomicU32>,
325        nc: Arc<AtomicU32>,
326        nsk: Arc<AtomicU32>,
327        nf: Arc<AtomicU32>,
328    }
329
330    #[async_trait]
331    impl EventEmitter for CountEmitter {
332        async fn on_flow_started(&self, _: Uuid) {
333            self.flow_started.fetch_add(1, Ordering::SeqCst);
334        }
335        async fn on_flow_completed(&self, _: Uuid, _: &FlowResult) {
336            self.flow_completed.fetch_add(1, Ordering::SeqCst);
337        }
338        async fn on_flow_failed(&self, _: Uuid, _: &str) {
339            self.flow_failed.fetch_add(1, Ordering::SeqCst);
340        }
341        async fn on_flow_terminated(&self, _: Uuid) {
342            self.flow_terminated.fetch_add(1, Ordering::SeqCst);
343        }
344        async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
345            self.node_started.fetch_add(1, Ordering::SeqCst);
346        }
347        async fn on_node_completed(&self, _: Uuid, _: &str, _: &Value) {
348            self.node_completed.fetch_add(1, Ordering::SeqCst);
349        }
350        async fn on_node_skipped(&self, _: Uuid, _: &str) {
351            self.node_skipped.fetch_add(1, Ordering::SeqCst);
352        }
353        async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {
354            self.node_failed.fetch_add(1, Ordering::SeqCst);
355        }
356    }
357
358    #[tokio::test]
359    async fn noop_emitter_compiles_and_runs() {
360        // Verify the no-op emitter can be called without panicking.
361        let e = NoopEventEmitter;
362        let id = Uuid::new_v4();
363        let result = FlowResult {
364            execution_id: id,
365            outputs: Default::default(),
366            completed_nodes: Default::default(),
367            skipped_nodes: Default::default(),
368        };
369        e.on_flow_started(id).await;
370        e.on_flow_completed(id, &result).await;
371        e.on_flow_failed(id, "err").await;
372        e.on_flow_terminated(id).await;
373        e.on_node_started(id, "n", "noop").await;
374        e.on_node_completed(id, "n", &serde_json::json!({})).await;
375        e.on_node_skipped(id, "n").await;
376        e.on_node_failed(id, "n", "err").await;
377    }
378
379    #[tokio::test]
380    async fn emitter_receives_flow_and_node_events() {
381        use crate::graph::DagGraph;
382        use crate::registry::NodeRegistry;
383        use crate::runner::FlowRunner;
384        use serde_json::json;
385        use std::collections::HashMap;
386
387        let def = json!({
388            "nodes": [
389                { "id": "a", "type": "noop" },
390                { "id": "b", "type": "noop" }
391            ],
392            "edges": [{ "source": "a", "target": "b" }]
393        });
394        let dag = DagGraph::from_json(&def).unwrap();
395        let (emitter, counts) = CountEmitter::new();
396        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults())
397            .with_event_emitter(emitter as Arc<dyn EventEmitter>);
398
399        runner.run(HashMap::new()).await.unwrap();
400
401        assert_eq!(counts.s.load(Ordering::SeqCst), 1, "flow_started");
402        assert_eq!(counts.c.load(Ordering::SeqCst), 1, "flow_completed");
403        assert_eq!(counts.fa.load(Ordering::SeqCst), 0, "flow_failed");
404        assert_eq!(counts.ns.load(Ordering::SeqCst), 2, "node_started (a + b)");
405        assert_eq!(
406            counts.nc.load(Ordering::SeqCst),
407            2,
408            "node_completed (a + b)"
409        );
410        assert_eq!(counts.nsk.load(Ordering::SeqCst), 0, "no skipped nodes");
411    }
412
413    #[tokio::test]
414    async fn emitter_receives_node_skipped_event() {
415        use crate::graph::DagGraph;
416        use crate::registry::NodeRegistry;
417        use crate::runner::FlowRunner;
418        use serde_json::json;
419        use std::collections::HashMap;
420
421        // "b" is always skipped via a run_if that never matches.
422        let def = json!({
423            "nodes": [
424                { "id": "a", "type": "noop" },
425                {
426                    "id": "b", "type": "noop",
427                    "data": { "run_if": { "from": "a", "path": "nonexistent", "op": "eq", "value": true } }
428                }
429            ],
430            "edges": [{ "source": "a", "target": "b" }]
431        });
432        let dag = DagGraph::from_json(&def).unwrap();
433        let (emitter, counts) = CountEmitter::new();
434        let runner = FlowRunner::new(dag, NodeRegistry::with_defaults())
435            .with_event_emitter(emitter as Arc<dyn EventEmitter>);
436
437        runner.run(HashMap::new()).await.unwrap();
438
439        assert_eq!(counts.nsk.load(Ordering::SeqCst), 1, "one skipped node");
440        assert_eq!(counts.nc.load(Ordering::SeqCst), 1, "only 'a' completed");
441    }
442
443    #[tokio::test]
444    async fn emitter_receives_node_failed_and_flow_failed() {
445        use crate::error::FlowError;
446        use crate::graph::DagGraph;
447        use crate::node::{ExecContext, Node};
448        use crate::registry::NodeRegistry;
449        use crate::runner::FlowRunner;
450        use serde_json::json;
451        use std::collections::HashMap;
452
453        struct FailNode;
454        #[async_trait]
455        impl Node for FailNode {
456            fn node_type(&self) -> &str {
457                "fail-always"
458            }
459            async fn execute(&self, _: ExecContext) -> crate::error::Result<Value> {
460                Err(FlowError::Internal("boom".into()))
461            }
462        }
463
464        let mut registry = NodeRegistry::with_defaults();
465        registry.register(Arc::new(FailNode));
466
467        let def = json!({ "nodes": [{ "id": "x", "type": "fail-always" }], "edges": [] });
468        let dag = DagGraph::from_json(&def).unwrap();
469        let (emitter, counts) = CountEmitter::new();
470        let runner =
471            FlowRunner::new(dag, registry).with_event_emitter(emitter as Arc<dyn EventEmitter>);
472
473        let _ = runner.run(HashMap::new()).await;
474
475        assert_eq!(counts.nf.load(Ordering::SeqCst), 1, "node_failed");
476        assert_eq!(counts.fa.load(Ordering::SeqCst), 1, "flow_failed");
477        assert_eq!(counts.c.load(Ordering::SeqCst), 0, "not completed");
478    }
479}