noetl_executor/events.rs
1//! Event emission — pluggable sink that captures the events the
2//! executor produces during one execution.
3//!
4//! Both CLI and worker emit the same shape; only the sink differs:
5//!
6//! - CLI: `StdoutEventSink` — pretty-prints events to the terminal
7//! (R-1.2 will introduce this).
8//! - Worker: `NatsEventSink` — publishes to the configured NATS
9//! subject (R-1.3 will introduce this).
10//!
11//! The envelope shape mirrors the Python-side
12//! `noetl.runtime.events.report_event` payload so wire-format
13//! compatibility holds across stacks. See the noetl/noetl wiki page
14//! `handle_event_timing` for the field catalogue.
15
16use anyhow::Result;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19
20/// One event the executor emits. Keep field naming aligned with the
21/// Python side (`noetl.event` table columns) so envelopes can be
22/// projected by either stack.
23#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
24pub struct ExecutorEvent {
25 pub execution_id: String,
26 pub event_type: String,
27 /// Step name (`node_id` / `node_name` in the Python projector).
28 pub step: String,
29 pub status: String,
30 /// Wall-clock when the event was produced.
31 pub created_at: DateTime<Utc>,
32 /// Free-form payload; the projector reads typed fields out of this.
33 pub context: serde_json::Value,
34}
35
36#[async_trait]
37pub trait EventSink: Send + Sync {
38 /// Emit `event` to whatever durable surface the sink is bound to.
39 /// Implementations should be idempotent per `event_id` once R-1.2
40 /// adds id assignment.
41 async fn emit(&self, event: ExecutorEvent) -> Result<()>;
42}
43
44/// Trait alias used by the dispatch loop — wraps a sink and a
45/// pre-computed `execution_id` so callers don't have to thread it
46/// through every call site.
47pub struct EventEmitter {
48 pub sink: std::sync::Arc<dyn EventSink>,
49 pub execution_id: String,
50}
51
52impl EventEmitter {
53 pub fn new(execution_id: String, sink: std::sync::Arc<dyn EventSink>) -> Self {
54 Self { sink, execution_id }
55 }
56
57 pub async fn emit(
58 &self,
59 event_type: &str,
60 step: &str,
61 status: &str,
62 context: serde_json::Value,
63 ) -> Result<()> {
64 let event = ExecutorEvent {
65 execution_id: self.execution_id.clone(),
66 event_type: event_type.to_string(),
67 step: step.to_string(),
68 status: status.to_string(),
69 created_at: Utc::now(),
70 context,
71 };
72 self.sink.emit(event).await
73 }
74}
75
76/// Drops every event on the floor. Useful in tests and as the default
77/// during the R-1.1 skeleton phase before the real CLI/worker sinks
78/// exist.
79#[derive(Default)]
80pub struct NoopSink;
81
82#[async_trait]
83impl EventSink for NoopSink {
84 async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
85 Ok(())
86 }
87}
88
89#[cfg(test)]
90mod tests {
91 use super::*;
92 use std::sync::Arc;
93
94 #[tokio::test]
95 async fn noop_sink_accepts_any_event() {
96 let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
97 let emitter = EventEmitter::new("exec_test".into(), sink);
98 emitter
99 .emit(
100 "batch.completed",
101 "start",
102 "COMPLETED",
103 serde_json::json!({"processing_ms": 12.3}),
104 )
105 .await
106 .expect("noop emit");
107 }
108}