noetl_executor/events.rs
1//! Event emission — pluggable sink that captures the events the
2//! executor produces during one execution.
3//!
4//! Both CLI and worker emit the same shape; only the sink differs:
5//!
6//! - CLI: `StdoutEventSink` — pretty-prints events to the terminal
7//! (R-1.2 will introduce this).
8//! - Worker: `NatsEventSink` — publishes to the configured NATS
9//! subject (R-1.3 will introduce this).
10//!
11//! The envelope shape mirrors the Python-side
12//! `noetl.runtime.events.report_event` payload so wire-format
13//! compatibility holds across stacks. See the noetl/noetl wiki page
14//! `handle_event_timing` for the field catalogue.
15
16use anyhow::Result;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19
20/// One event the executor emits. Keep field naming aligned with the
21/// Python side (`noetl.event` table columns) so envelopes can be
22/// projected by either stack.
23///
24/// R-1.2 PR-2a: `execution_id` is now `i64` (matching the Python
25/// `noetl.event.execution_id` bigint column, the CLI's
26/// `BridgeContext.execution_id`, the worker's
27/// `CommandNotification.execution_id`, and
28/// `noetl_tools::context::ExecutionContext.execution_id`).
29///
30/// R-1.2 PR-EE-1 (0.3.1): adds the optional fields the Python
31/// `EventEmitRequest` (and the Rust `noetl-server` `EventRequest`)
32/// already expect — `event_id`, `worker_id`, `meta`. All are
33/// `Option` + `#[serde(default, skip_serializing_if =
34/// "Option::is_none")]` so older wire-format consumers deserialize
35/// cleanly and producers that don't populate them omit them from
36/// the JSON entirely. This step is the additive prerequisite for
37/// the cross-repo event envelope reconciliation tracked on
38/// [noetl/ai-meta#30](https://github.com/noetl/ai-meta/issues/30).
39#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
40pub struct ExecutorEvent {
41 /// Execution this event belongs to. Matches the
42 /// `noetl.event.execution_id` bigint column.
43 pub execution_id: i64,
44
45 /// Event type (e.g. `"step.enter"`, `"call.done"`,
46 /// `"step.exit"`, `"command.completed"`, `"command.failed"`).
47 /// Python side uses an `EventType` enum but Rust accepts the
48 /// raw string; the server validates against the enum when
49 /// projecting.
50 pub event_type: String,
51
52 /// Step name (`node_id` / `node_name` in the Python projector).
53 pub step: String,
54
55 /// Lifecycle status (e.g. `"STARTED"`, `"COMPLETED"`,
56 /// `"FAILED"`).
57 pub status: String,
58
59 /// Wall-clock when the event was produced. Stamped at emit
60 /// time so the event log preserves per-component ordering
61 /// even across server-clock skew.
62 pub created_at: DateTime<Utc>,
63
64 /// Free-form payload; the projector reads typed fields out
65 /// of this. Renamed from the worker's `payload` field to
66 /// match the Python `EventEmitRequest.context` field; serde
67 /// alias accepts either name on the wire.
68 #[serde(alias = "payload")]
69 pub context: serde_json::Value,
70
71 /// Application-side snowflake id for this event. Per
72 /// [`agents/rules/observability.md`][rule] Principle 3,
73 /// the emitting process generates this BEFORE the row hits
74 /// the database so spans / metrics / cross-component
75 /// correlation can use it immediately. `None` falls back to
76 /// the DB-side default (existing `gen_snowflake()` function).
77 ///
78 /// [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
79 #[serde(default, skip_serializing_if = "Option::is_none")]
80 pub event_id: Option<i64>,
81
82 /// Worker that emitted the event (worker pod's id, or
83 /// `"cli-local"` for CLI mode). Used for shard-aware queries
84 /// + diagnostic correlation.
85 #[serde(default, skip_serializing_if = "Option::is_none")]
86 pub worker_id: Option<String>,
87
88 /// Free-form metadata that doesn't belong in `context` —
89 /// retry counts, parent-event refs, catalog ids. Matches the
90 /// Python `EventEmitRequest.meta` field 1:1.
91 #[serde(default, skip_serializing_if = "Option::is_none")]
92 pub meta: Option<serde_json::Value>,
93}
94
95#[async_trait]
96pub trait EventSink: Send + Sync {
97 /// Emit `event` to whatever durable surface the sink is bound to.
98 /// Implementations should be idempotent per `event_id` once R-1.2
99 /// adds id assignment.
100 async fn emit(&self, event: ExecutorEvent) -> Result<()>;
101}
102
103/// Trait alias used by the dispatch loop — wraps a sink and a
104/// pre-computed `execution_id` so callers don't have to thread it
105/// through every call site.
106pub struct EventEmitter {
107 pub sink: std::sync::Arc<dyn EventSink>,
108 pub execution_id: i64,
109}
110
111impl EventEmitter {
112 pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
113 Self { sink, execution_id }
114 }
115
116 pub async fn emit(
117 &self,
118 event_type: &str,
119 step: &str,
120 status: &str,
121 context: serde_json::Value,
122 ) -> Result<()> {
123 let event = ExecutorEvent {
124 execution_id: self.execution_id,
125 event_type: event_type.to_string(),
126 step: step.to_string(),
127 status: status.to_string(),
128 created_at: Utc::now(),
129 context,
130 event_id: None,
131 worker_id: None,
132 meta: None,
133 };
134 self.sink.emit(event).await
135 }
136}
137
138/// Drops every event on the floor. Useful in tests and as the default
139/// during the R-1.1 skeleton phase before the real CLI/worker sinks
140/// exist.
141#[derive(Default)]
142pub struct NoopSink;
143
144#[async_trait]
145impl EventSink for NoopSink {
146 async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
147 Ok(())
148 }
149}
150
151#[cfg(test)]
152mod tests {
153 use super::*;
154 use std::sync::Arc;
155
156 #[tokio::test]
157 async fn noop_sink_accepts_any_event() {
158 let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
159 let emitter = EventEmitter::new(12345, sink);
160 emitter
161 .emit(
162 "batch.completed",
163 "start",
164 "COMPLETED",
165 serde_json::json!({"processing_ms": 12.3}),
166 )
167 .await
168 .expect("noop emit");
169 }
170
171 // ---- R-1.2 PR-EE-1 — envelope enrichment ------------------------
172
173 fn dummy_event() -> ExecutorEvent {
174 ExecutorEvent {
175 execution_id: 1,
176 event_type: "step.enter".to_string(),
177 step: "fetch".to_string(),
178 status: "STARTED".to_string(),
179 created_at: Utc::now(),
180 context: serde_json::json!({}),
181 event_id: None,
182 worker_id: None,
183 meta: None,
184 }
185 }
186
187 #[test]
188 fn new_optional_fields_omit_from_serialized_json_when_none() {
189 let event = dummy_event();
190 let json = serde_json::to_value(&event).unwrap();
191 // The new optional fields should not appear in the wire
192 // format when None — older consumers shouldn't see
193 // unfamiliar keys.
194 assert!(json.get("event_id").is_none(), "event_id omitted");
195 assert!(json.get("worker_id").is_none(), "worker_id omitted");
196 assert!(json.get("meta").is_none(), "meta omitted");
197 }
198
199 #[test]
200 fn new_optional_fields_serialize_when_present() {
201 let event = ExecutorEvent {
202 event_id: Some(478775660589088777),
203 worker_id: Some("worker-1".to_string()),
204 meta: Some(serde_json::json!({"attempts": 2})),
205 ..dummy_event()
206 };
207 let json = serde_json::to_value(&event).unwrap();
208 assert_eq!(json["event_id"], serde_json::json!(478775660589088777_i64));
209 assert_eq!(json["worker_id"], "worker-1");
210 assert_eq!(json["meta"]["attempts"], 2);
211 }
212
213 #[test]
214 fn deserializes_payload_alias_into_context() {
215 // Older wire format uses `payload`; new envelope uses
216 // `context`. Serde alias lets both deserialize.
217 let json = serde_json::json!({
218 "execution_id": 5,
219 "event_type": "step.enter",
220 "step": "s",
221 "status": "STARTED",
222 "created_at": "2026-05-31T00:00:00Z",
223 "payload": {"foo": "bar"},
224 });
225 let event: ExecutorEvent = serde_json::from_value(json).unwrap();
226 assert_eq!(event.context, serde_json::json!({"foo": "bar"}));
227 }
228
229 #[test]
230 fn deserializes_missing_optional_fields_with_none() {
231 // Wire format without event_id / worker_id / meta should
232 // deserialize cleanly with the new fields set to None.
233 let json = serde_json::json!({
234 "execution_id": 5,
235 "event_type": "step.enter",
236 "step": "s",
237 "status": "STARTED",
238 "created_at": "2026-05-31T00:00:00Z",
239 "context": {},
240 });
241 let event: ExecutorEvent = serde_json::from_value(json).unwrap();
242 assert!(event.event_id.is_none());
243 assert!(event.worker_id.is_none());
244 assert!(event.meta.is_none());
245 }
246
247 #[test]
248 fn round_trips_with_all_optional_fields_set() {
249 let original = ExecutorEvent {
250 execution_id: 478775660589088776,
251 event_type: "command.completed".to_string(),
252 step: "fetch_calendar".to_string(),
253 status: "COMPLETED".to_string(),
254 created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
255 .unwrap()
256 .with_timezone(&Utc),
257 context: serde_json::json!({"result": {"items": 42}}),
258 event_id: Some(478775660589088777),
259 worker_id: Some("worker-prod-7".to_string()),
260 meta: Some(serde_json::json!({"attempts": 3, "parent_event_id": "478775660589088770"})),
261 };
262 let json = serde_json::to_value(&original).unwrap();
263 let parsed: ExecutorEvent = serde_json::from_value(json).unwrap();
264 assert_eq!(parsed.execution_id, original.execution_id);
265 assert_eq!(parsed.event_id, original.event_id);
266 assert_eq!(parsed.worker_id, original.worker_id);
267 assert_eq!(parsed.meta, original.meta);
268 }
269}