1use std::sync::Arc;
16
17use async_trait::async_trait;
18use serde_json::Value;
19use tokio::sync::broadcast;
20use uuid::Uuid;
21
22use crate::result::FlowResult;
23
24#[derive(Debug, Clone)]
30pub enum FlowEvent {
31 FlowStarted { execution_id: Uuid },
33 FlowCompleted {
35 execution_id: Uuid,
36 result: FlowResult,
37 },
38 FlowFailed { execution_id: Uuid, reason: String },
40 FlowTerminated { execution_id: Uuid },
42 NodeStarted {
44 execution_id: Uuid,
45 node_id: String,
46 node_type: String,
47 },
48 NodeCompleted {
50 execution_id: Uuid,
51 node_id: String,
52 output: Value,
53 },
54 NodeSkipped { execution_id: Uuid, node_id: String },
56 NodeFailed {
58 execution_id: Uuid,
59 node_id: String,
60 reason: String,
61 },
62}
63
64pub(crate) struct ChannelEmitter {
68 tx: broadcast::Sender<FlowEvent>,
69}
70
71impl ChannelEmitter {
72 pub(crate) fn new(tx: broadcast::Sender<FlowEvent>) -> Self {
73 Self { tx }
74 }
75}
76
77#[async_trait]
78impl EventEmitter for ChannelEmitter {
79 async fn on_flow_started(&self, execution_id: Uuid) {
80 let _ = self.tx.send(FlowEvent::FlowStarted { execution_id });
81 }
82
83 async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult) {
84 let _ = self.tx.send(FlowEvent::FlowCompleted {
85 execution_id,
86 result: result.clone(),
87 });
88 }
89
90 async fn on_flow_failed(&self, execution_id: Uuid, reason: &str) {
91 let _ = self.tx.send(FlowEvent::FlowFailed {
92 execution_id,
93 reason: reason.to_string(),
94 });
95 }
96
97 async fn on_flow_terminated(&self, execution_id: Uuid) {
98 let _ = self.tx.send(FlowEvent::FlowTerminated { execution_id });
99 }
100
101 async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str) {
102 let _ = self.tx.send(FlowEvent::NodeStarted {
103 execution_id,
104 node_id: node_id.to_string(),
105 node_type: node_type.to_string(),
106 });
107 }
108
109 async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value) {
110 let _ = self.tx.send(FlowEvent::NodeCompleted {
111 execution_id,
112 node_id: node_id.to_string(),
113 output: output.clone(),
114 });
115 }
116
117 async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str) {
118 let _ = self.tx.send(FlowEvent::NodeSkipped {
119 execution_id,
120 node_id: node_id.to_string(),
121 });
122 }
123
124 async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str) {
125 let _ = self.tx.send(FlowEvent::NodeFailed {
126 execution_id,
127 node_id: node_id.to_string(),
128 reason: reason.to_string(),
129 });
130 }
131}
132
133pub(crate) struct MulticastEmitter {
138 pub(crate) a: Arc<dyn EventEmitter>,
139 pub(crate) b: Arc<dyn EventEmitter>,
140}
141
142#[async_trait]
143impl EventEmitter for MulticastEmitter {
144 async fn on_flow_started(&self, execution_id: Uuid) {
145 self.a.on_flow_started(execution_id).await;
146 self.b.on_flow_started(execution_id).await;
147 }
148
149 async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult) {
150 self.a.on_flow_completed(execution_id, result).await;
151 self.b.on_flow_completed(execution_id, result).await;
152 }
153
154 async fn on_flow_failed(&self, execution_id: Uuid, reason: &str) {
155 self.a.on_flow_failed(execution_id, reason).await;
156 self.b.on_flow_failed(execution_id, reason).await;
157 }
158
159 async fn on_flow_terminated(&self, execution_id: Uuid) {
160 self.a.on_flow_terminated(execution_id).await;
161 self.b.on_flow_terminated(execution_id).await;
162 }
163
164 async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str) {
165 self.a
166 .on_node_started(execution_id, node_id, node_type)
167 .await;
168 self.b
169 .on_node_started(execution_id, node_id, node_type)
170 .await;
171 }
172
173 async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value) {
174 self.a
175 .on_node_completed(execution_id, node_id, output)
176 .await;
177 self.b
178 .on_node_completed(execution_id, node_id, output)
179 .await;
180 }
181
182 async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str) {
183 self.a.on_node_skipped(execution_id, node_id).await;
184 self.b.on_node_skipped(execution_id, node_id).await;
185 }
186
187 async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str) {
188 self.a.on_node_failed(execution_id, node_id, reason).await;
189 self.b.on_node_failed(execution_id, node_id, reason).await;
190 }
191}
192
193#[async_trait]
223pub trait EventEmitter: Send + Sync {
224 async fn on_flow_started(&self, execution_id: Uuid);
226
227 async fn on_flow_completed(&self, execution_id: Uuid, result: &FlowResult);
229
230 async fn on_flow_failed(&self, execution_id: Uuid, reason: &str);
232
233 async fn on_flow_terminated(&self, execution_id: Uuid);
235
236 async fn on_node_started(&self, execution_id: Uuid, node_id: &str, node_type: &str);
238
239 async fn on_node_completed(&self, execution_id: Uuid, node_id: &str, output: &Value);
241
242 async fn on_node_skipped(&self, execution_id: Uuid, node_id: &str);
244
245 async fn on_node_failed(&self, execution_id: Uuid, node_id: &str, reason: &str);
247}
248
249pub struct NoopEventEmitter;
251
252#[async_trait]
253impl EventEmitter for NoopEventEmitter {
254 async fn on_flow_started(&self, _: Uuid) {}
255 async fn on_flow_completed(&self, _: Uuid, _: &FlowResult) {}
256 async fn on_flow_failed(&self, _: Uuid, _: &str) {}
257 async fn on_flow_terminated(&self, _: Uuid) {}
258 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {}
259 async fn on_node_completed(&self, _: Uuid, _: &str, _: &Value) {}
260 async fn on_node_skipped(&self, _: Uuid, _: &str) {}
261 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {}
262}
263
264#[cfg(test)]
265mod tests {
266 use super::*;
267 use std::sync::{
268 atomic::{AtomicU32, Ordering},
269 Arc,
270 };
271
272 struct CountEmitter {
274 flow_started: Arc<AtomicU32>,
275 flow_completed: Arc<AtomicU32>,
276 flow_failed: Arc<AtomicU32>,
277 flow_terminated: Arc<AtomicU32>,
278 node_started: Arc<AtomicU32>,
279 node_completed: Arc<AtomicU32>,
280 node_skipped: Arc<AtomicU32>,
281 node_failed: Arc<AtomicU32>,
282 }
283
284 impl CountEmitter {
285 fn new() -> (Arc<Self>, Counts) {
286 let s = Arc::new(AtomicU32::new(0));
287 let c = Arc::new(AtomicU32::new(0));
288 let fa = Arc::new(AtomicU32::new(0));
289 let t = Arc::new(AtomicU32::new(0));
290 let ns = Arc::new(AtomicU32::new(0));
291 let nc = Arc::new(AtomicU32::new(0));
292 let nsk = Arc::new(AtomicU32::new(0));
293 let nf = Arc::new(AtomicU32::new(0));
294 let emitter = Arc::new(CountEmitter {
295 flow_started: Arc::clone(&s),
296 flow_completed: Arc::clone(&c),
297 flow_failed: Arc::clone(&fa),
298 flow_terminated: Arc::clone(&t),
299 node_started: Arc::clone(&ns),
300 node_completed: Arc::clone(&nc),
301 node_skipped: Arc::clone(&nsk),
302 node_failed: Arc::clone(&nf),
303 });
304 let counts = Counts {
305 s,
306 c,
307 fa,
308 t,
309 ns,
310 nc,
311 nsk,
312 nf,
313 };
314 (emitter, counts)
315 }
316 }
317
318 #[allow(dead_code)]
319 struct Counts {
320 s: Arc<AtomicU32>,
321 c: Arc<AtomicU32>,
322 fa: Arc<AtomicU32>,
323 t: Arc<AtomicU32>,
324 ns: Arc<AtomicU32>,
325 nc: Arc<AtomicU32>,
326 nsk: Arc<AtomicU32>,
327 nf: Arc<AtomicU32>,
328 }
329
330 #[async_trait]
331 impl EventEmitter for CountEmitter {
332 async fn on_flow_started(&self, _: Uuid) {
333 self.flow_started.fetch_add(1, Ordering::SeqCst);
334 }
335 async fn on_flow_completed(&self, _: Uuid, _: &FlowResult) {
336 self.flow_completed.fetch_add(1, Ordering::SeqCst);
337 }
338 async fn on_flow_failed(&self, _: Uuid, _: &str) {
339 self.flow_failed.fetch_add(1, Ordering::SeqCst);
340 }
341 async fn on_flow_terminated(&self, _: Uuid) {
342 self.flow_terminated.fetch_add(1, Ordering::SeqCst);
343 }
344 async fn on_node_started(&self, _: Uuid, _: &str, _: &str) {
345 self.node_started.fetch_add(1, Ordering::SeqCst);
346 }
347 async fn on_node_completed(&self, _: Uuid, _: &str, _: &Value) {
348 self.node_completed.fetch_add(1, Ordering::SeqCst);
349 }
350 async fn on_node_skipped(&self, _: Uuid, _: &str) {
351 self.node_skipped.fetch_add(1, Ordering::SeqCst);
352 }
353 async fn on_node_failed(&self, _: Uuid, _: &str, _: &str) {
354 self.node_failed.fetch_add(1, Ordering::SeqCst);
355 }
356 }
357
358 #[tokio::test]
359 async fn noop_emitter_compiles_and_runs() {
360 let e = NoopEventEmitter;
362 let id = Uuid::new_v4();
363 let result = FlowResult {
364 execution_id: id,
365 outputs: Default::default(),
366 completed_nodes: Default::default(),
367 skipped_nodes: Default::default(),
368 context: Default::default(),
369 };
370 e.on_flow_started(id).await;
371 e.on_flow_completed(id, &result).await;
372 e.on_flow_failed(id, "err").await;
373 e.on_flow_terminated(id).await;
374 e.on_node_started(id, "n", "noop").await;
375 e.on_node_completed(id, "n", &serde_json::json!({})).await;
376 e.on_node_skipped(id, "n").await;
377 e.on_node_failed(id, "n", "err").await;
378 }
379
380 #[tokio::test]
381 async fn emitter_receives_flow_and_node_events() {
382 use crate::graph::DagGraph;
383 use crate::registry::NodeRegistry;
384 use crate::runner::FlowRunner;
385 use serde_json::json;
386 use std::collections::HashMap;
387
388 let def = json!({
389 "nodes": [
390 { "id": "a", "type": "noop" },
391 { "id": "b", "type": "noop" }
392 ],
393 "edges": [{ "source": "a", "target": "b" }]
394 });
395 let dag = DagGraph::from_json(&def).unwrap();
396 let (emitter, counts) = CountEmitter::new();
397 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults())
398 .with_event_emitter(emitter as Arc<dyn EventEmitter>);
399
400 runner.run(HashMap::new()).await.unwrap();
401
402 assert_eq!(counts.s.load(Ordering::SeqCst), 1, "flow_started");
403 assert_eq!(counts.c.load(Ordering::SeqCst), 1, "flow_completed");
404 assert_eq!(counts.fa.load(Ordering::SeqCst), 0, "flow_failed");
405 assert_eq!(counts.ns.load(Ordering::SeqCst), 2, "node_started (a + b)");
406 assert_eq!(
407 counts.nc.load(Ordering::SeqCst),
408 2,
409 "node_completed (a + b)"
410 );
411 assert_eq!(counts.nsk.load(Ordering::SeqCst), 0, "no skipped nodes");
412 }
413
414 #[tokio::test]
415 async fn emitter_receives_node_skipped_event() {
416 use crate::graph::DagGraph;
417 use crate::registry::NodeRegistry;
418 use crate::runner::FlowRunner;
419 use serde_json::json;
420 use std::collections::HashMap;
421
422 let def = json!({
424 "nodes": [
425 { "id": "a", "type": "noop" },
426 {
427 "id": "b", "type": "noop",
428 "data": { "run_if": { "from": "a", "path": "nonexistent", "op": "eq", "value": true } }
429 }
430 ],
431 "edges": [{ "source": "a", "target": "b" }]
432 });
433 let dag = DagGraph::from_json(&def).unwrap();
434 let (emitter, counts) = CountEmitter::new();
435 let runner = FlowRunner::new(dag, NodeRegistry::with_defaults())
436 .with_event_emitter(emitter as Arc<dyn EventEmitter>);
437
438 runner.run(HashMap::new()).await.unwrap();
439
440 assert_eq!(counts.nsk.load(Ordering::SeqCst), 1, "one skipped node");
441 assert_eq!(counts.nc.load(Ordering::SeqCst), 1, "only 'a' completed");
442 }
443
444 #[tokio::test]
445 async fn emitter_receives_node_failed_and_flow_failed() {
446 use crate::error::FlowError;
447 use crate::graph::DagGraph;
448 use crate::node::{ExecContext, Node};
449 use crate::registry::NodeRegistry;
450 use crate::runner::FlowRunner;
451 use serde_json::json;
452 use std::collections::HashMap;
453
454 struct FailNode;
455 #[async_trait]
456 impl Node for FailNode {
457 fn node_type(&self) -> &str {
458 "fail-always"
459 }
460 async fn execute(&self, _: ExecContext) -> crate::error::Result<Value> {
461 Err(FlowError::Internal("boom".into()))
462 }
463 }
464
465 let mut registry = NodeRegistry::with_defaults();
466 registry.register(Arc::new(FailNode));
467
468 let def = json!({ "nodes": [{ "id": "x", "type": "fail-always" }], "edges": [] });
469 let dag = DagGraph::from_json(&def).unwrap();
470 let (emitter, counts) = CountEmitter::new();
471 let runner =
472 FlowRunner::new(dag, registry).with_event_emitter(emitter as Arc<dyn EventEmitter>);
473
474 let _ = runner.run(HashMap::new()).await;
475
476 assert_eq!(counts.nf.load(Ordering::SeqCst), 1, "node_failed");
477 assert_eq!(counts.fa.load(Ordering::SeqCst), 1, "flow_failed");
478 assert_eq!(counts.c.load(Ordering::SeqCst), 0, "not completed");
479 }
480}