Skip to main content

noetl_executor/
events.rs

1//! Event emission — pluggable sink that captures the events the
2//! executor produces during one execution.
3//!
4//! Both CLI and worker emit the same shape; only the sink differs:
5//!
6//! - CLI: `StdoutEventSink` — pretty-prints events to the terminal
7//!   (R-1.2 will introduce this).
8//! - Worker: `NatsEventSink` — publishes to the configured NATS
9//!   subject (R-1.3 will introduce this).
10//!
11//! The envelope shape mirrors the Python-side
12//! `noetl.runtime.events.report_event` payload so wire-format
13//! compatibility holds across stacks.  See the noetl/noetl wiki page
14//! `handle_event_timing` for the field catalogue.
15
16use anyhow::Result;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19
20/// One event the executor emits.  Keep field naming aligned with the
21/// Python side (`noetl.event` table columns) so envelopes can be
22/// projected by either stack.
23///
24/// R-1.2 PR-2a: `execution_id` is now `i64` (matching the Python
25/// `noetl.event.execution_id` bigint column, the CLI's
26/// `BridgeContext.execution_id`, the worker's
27/// `CommandNotification.execution_id`, and
28/// `noetl_tools::context::ExecutionContext.execution_id`).
29///
30/// R-1.2 PR-EE-1 (0.3.1): adds the optional fields the Python
31/// `EventEmitRequest` (and the Rust `noetl-server` `EventRequest`)
32/// already expect — `event_id`, `worker_id`, `meta`.  All are
33/// `Option` + `#[serde(default, skip_serializing_if =
34/// "Option::is_none")]` so older wire-format consumers deserialize
35/// cleanly and producers that don't populate them omit them from
36/// the JSON entirely.  This step is the additive prerequisite for
37/// the cross-repo event envelope reconciliation tracked on
38/// [noetl/ai-meta#30](https://github.com/noetl/ai-meta/issues/30).
39#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
40pub struct ExecutorEvent {
41    /// Execution this event belongs to.  Matches the
42    /// `noetl.event.execution_id` bigint column.
43    pub execution_id: i64,
44
45    /// Event type (e.g. `"step.enter"`, `"call.done"`,
46    /// `"step.exit"`, `"command.completed"`, `"command.failed"`).
47    /// Python side uses an `EventType` enum but Rust accepts the
48    /// raw string; the server validates against the enum when
49    /// projecting.
50    pub event_type: String,
51
52    /// Step name (`node_id` / `node_name` in the Python projector).
53    pub step: String,
54
55    /// Lifecycle status (e.g. `"STARTED"`, `"COMPLETED"`,
56    /// `"FAILED"`).
57    pub status: String,
58
59    /// Wall-clock when the event was produced.  Stamped at emit
60    /// time so the event log preserves per-component ordering
61    /// even across server-clock skew.
62    pub created_at: DateTime<Utc>,
63
64    /// Free-form payload; the projector reads typed fields out
65    /// of this.  Renamed from the worker's `payload` field to
66    /// match the Python `EventEmitRequest.context` field; serde
67    /// alias accepts either name on the wire.
68    #[serde(alias = "payload")]
69    pub context: serde_json::Value,
70
71    /// Application-side snowflake id for this event.  Per
72    /// [`agents/rules/observability.md`][rule] Principle 3,
73    /// the emitting process generates this BEFORE the row hits
74    /// the database so spans / metrics / cross-component
75    /// correlation can use it immediately.  `None` falls back to
76    /// the DB-side default (existing `gen_snowflake()` function).
77    ///
78    /// [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
79    #[serde(default, skip_serializing_if = "Option::is_none")]
80    pub event_id: Option<i64>,
81
82    /// Worker that emitted the event (worker pod's id, or
83    /// `"cli-local"` for CLI mode).  Used for shard-aware queries
84    /// + diagnostic correlation.
85    #[serde(default, skip_serializing_if = "Option::is_none")]
86    pub worker_id: Option<String>,
87
88    /// Free-form metadata that doesn't belong in `context` —
89    /// retry counts, parent-event refs, catalog ids.  Matches the
90    /// Python `EventEmitRequest.meta` field 1:1.
91    #[serde(default, skip_serializing_if = "Option::is_none")]
92    pub meta: Option<serde_json::Value>,
93}
94
95#[async_trait]
96pub trait EventSink: Send + Sync {
97    /// Emit `event` to whatever durable surface the sink is bound to.
98    /// Implementations should be idempotent per `event_id` once R-1.2
99    /// adds id assignment.
100    async fn emit(&self, event: ExecutorEvent) -> Result<()>;
101}
102
103/// Trait alias used by the dispatch loop — wraps a sink and a
104/// pre-computed `execution_id` so callers don't have to thread it
105/// through every call site.
106pub struct EventEmitter {
107    pub sink: std::sync::Arc<dyn EventSink>,
108    pub execution_id: i64,
109}
110
111impl EventEmitter {
112    pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
113        Self { sink, execution_id }
114    }
115
116    pub async fn emit(
117        &self,
118        event_type: &str,
119        step: &str,
120        status: &str,
121        context: serde_json::Value,
122    ) -> Result<()> {
123        let event = ExecutorEvent {
124            execution_id: self.execution_id,
125            event_type: event_type.to_string(),
126            step: step.to_string(),
127            status: status.to_string(),
128            created_at: Utc::now(),
129            context,
130            event_id: None,
131            worker_id: None,
132            meta: None,
133        };
134        self.sink.emit(event).await
135    }
136}
137
138/// Drops every event on the floor.  Useful in tests and as the default
139/// during the R-1.1 skeleton phase before the real CLI/worker sinks
140/// exist.
141#[derive(Default)]
142pub struct NoopSink;
143
144#[async_trait]
145impl EventSink for NoopSink {
146    async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
147        Ok(())
148    }
149}
150
151#[cfg(test)]
152mod tests {
153    use super::*;
154    use std::sync::Arc;
155
156    #[tokio::test]
157    async fn noop_sink_accepts_any_event() {
158        let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
159        let emitter = EventEmitter::new(12345, sink);
160        emitter
161            .emit(
162                "batch.completed",
163                "start",
164                "COMPLETED",
165                serde_json::json!({"processing_ms": 12.3}),
166            )
167            .await
168            .expect("noop emit");
169    }
170
171    // ---- R-1.2 PR-EE-1 — envelope enrichment ------------------------
172
173    fn dummy_event() -> ExecutorEvent {
174        ExecutorEvent {
175            execution_id: 1,
176            event_type: "step.enter".to_string(),
177            step: "fetch".to_string(),
178            status: "STARTED".to_string(),
179            created_at: Utc::now(),
180            context: serde_json::json!({}),
181            event_id: None,
182            worker_id: None,
183            meta: None,
184        }
185    }
186
187    #[test]
188    fn new_optional_fields_omit_from_serialized_json_when_none() {
189        let event = dummy_event();
190        let json = serde_json::to_value(&event).unwrap();
191        // The new optional fields should not appear in the wire
192        // format when None — older consumers shouldn't see
193        // unfamiliar keys.
194        assert!(json.get("event_id").is_none(), "event_id omitted");
195        assert!(json.get("worker_id").is_none(), "worker_id omitted");
196        assert!(json.get("meta").is_none(), "meta omitted");
197    }
198
199    #[test]
200    fn new_optional_fields_serialize_when_present() {
201        let event = ExecutorEvent {
202            event_id: Some(478775660589088777),
203            worker_id: Some("worker-1".to_string()),
204            meta: Some(serde_json::json!({"attempts": 2})),
205            ..dummy_event()
206        };
207        let json = serde_json::to_value(&event).unwrap();
208        assert_eq!(json["event_id"], serde_json::json!(478775660589088777_i64));
209        assert_eq!(json["worker_id"], "worker-1");
210        assert_eq!(json["meta"]["attempts"], 2);
211    }
212
213    #[test]
214    fn deserializes_payload_alias_into_context() {
215        // Older wire format uses `payload`; new envelope uses
216        // `context`.  Serde alias lets both deserialize.
217        let json = serde_json::json!({
218            "execution_id": 5,
219            "event_type": "step.enter",
220            "step": "s",
221            "status": "STARTED",
222            "created_at": "2026-05-31T00:00:00Z",
223            "payload": {"foo": "bar"},
224        });
225        let event: ExecutorEvent = serde_json::from_value(json).unwrap();
226        assert_eq!(event.context, serde_json::json!({"foo": "bar"}));
227    }
228
229    #[test]
230    fn deserializes_missing_optional_fields_with_none() {
231        // Wire format without event_id / worker_id / meta should
232        // deserialize cleanly with the new fields set to None.
233        let json = serde_json::json!({
234            "execution_id": 5,
235            "event_type": "step.enter",
236            "step": "s",
237            "status": "STARTED",
238            "created_at": "2026-05-31T00:00:00Z",
239            "context": {},
240        });
241        let event: ExecutorEvent = serde_json::from_value(json).unwrap();
242        assert!(event.event_id.is_none());
243        assert!(event.worker_id.is_none());
244        assert!(event.meta.is_none());
245    }
246
247    #[test]
248    fn round_trips_with_all_optional_fields_set() {
249        let original = ExecutorEvent {
250            execution_id: 478775660589088776,
251            event_type: "command.completed".to_string(),
252            step: "fetch_calendar".to_string(),
253            status: "COMPLETED".to_string(),
254            created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
255                .unwrap()
256                .with_timezone(&Utc),
257            context: serde_json::json!({"result": {"items": 42}}),
258            event_id: Some(478775660589088777),
259            worker_id: Some("worker-prod-7".to_string()),
260            meta: Some(serde_json::json!({"attempts": 3, "parent_event_id": "478775660589088770"})),
261        };
262        let json = serde_json::to_value(&original).unwrap();
263        let parsed: ExecutorEvent = serde_json::from_value(json).unwrap();
264        assert_eq!(parsed.execution_id, original.execution_id);
265        assert_eq!(parsed.event_id, original.event_id);
266        assert_eq!(parsed.worker_id, original.worker_id);
267        assert_eq!(parsed.meta, original.meta);
268    }
269}