noetl_events/lib.rs
1//! NoETL shared event envelope.
2//!
3//! This crate is the single canonical home for the wire-format
4//! `ExecutorEvent` envelope and the `EventSink` trait every NoETL
5//! Rust binary emits events through. Producers (CLI's local-mode
6//! runner, noetl-worker's NATS pull consumer) and consumers
7//! (noetl-server's `POST /api/events` endpoint, the projector that
8//! writes the `noetl.event` row, the gateway's SSE stream) all
9//! agree on the shape because they all link this one type.
10//!
11//! Until EE-4 the envelope lived in `noetl-executor::events`.
12//! That worked while only producers needed it — the noetl-server
13//! kept its own hand-aligned `EventRequest` and the alignment was
14//! enforced by code review. When the Rust server started
15//! producing events itself (Phase D R2 orchestrator wiring, see
16//! noetl/server#31) the duplicated type became actively
17//! maintenance-heavy, so EE-4 (noetl/ai-meta#49 follow-up) carves
18//! it out into this dependency-free crate that every component
19//! can link without dragging in the rest of the executor's
20//! surface (playbook parser, runtime trait, tools bridge, …).
21//!
22//! Re-export contract: `noetl-executor::events` re-exports every
23//! public symbol from this crate verbatim so the existing call
24//! sites keep working unchanged. EE-4 PR 2 publishes the crate
25//! to crates.io and EE-4 PR 3 adds it as a direct dependency to
26//! `noetl-server`.
27//!
28//! # Wire-format alignment
29//!
30//! `ExecutorEvent` is intentionally close to the Python
31//! `EventEmitRequest` shape so events emitted by either stack
32//! project against the same `noetl.event` columns. See the
33//! noetl/noetl wiki page `handle_event_timing` for the field
34//! catalogue and the per-field semantics.
35
36use anyhow::Result;
37use async_trait::async_trait;
38use chrono::{DateTime, Utc};
39
40/// One event the executor emits. Keep field naming aligned with the
41/// Python side (`noetl.event` table columns) so envelopes can be
42/// projected by either stack.
43///
44/// R-1.2 PR-2a: `execution_id` is now `i64` (matching the Python
45/// `noetl.event.execution_id` bigint column, the CLI's
46/// `BridgeContext.execution_id`, the worker's
47/// `CommandNotification.execution_id`, and
48/// `noetl_tools::context::ExecutionContext.execution_id`).
49///
50/// R-1.2 PR-EE-1 (0.3.1): adds the optional fields the Python
51/// `EventEmitRequest` (and the Rust `noetl-server` `EventRequest`)
52/// already expect — `event_id`, `worker_id`, `meta`. All are
53/// `Option` + `#[serde(default, skip_serializing_if =
54/// "Option::is_none")]` so older wire-format consumers deserialize
55/// cleanly and producers that don't populate them omit them from
56/// the JSON entirely. This step is the additive prerequisite for
57/// the cross-repo event envelope reconciliation tracked on
58/// [noetl/ai-meta#30](https://github.com/noetl/ai-meta/issues/30).
59///
60/// EE-4 (noetl/ai-meta#49): the struct moved out of
61/// `noetl-executor::events` into this dedicated `noetl-events`
62/// crate so noetl-server can link it directly.
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct ExecutorEvent {
65 /// Execution this event belongs to. Matches the
66 /// `noetl.event.execution_id` bigint column.
67 pub execution_id: i64,
68
69 /// Event type (e.g. `"step.enter"`, `"call.done"`,
70 /// `"step.exit"`, `"command.completed"`, `"command.failed"`).
71 /// Python side uses an `EventType` enum but Rust accepts the
72 /// raw string; the server validates against the enum when
73 /// projecting.
74 pub event_type: String,
75
76 /// Step name (`node_id` / `node_name` in the Python projector).
77 pub step: String,
78
79 /// Lifecycle status (e.g. `"STARTED"`, `"COMPLETED"`,
80 /// `"FAILED"`).
81 pub status: String,
82
83 /// Wall-clock when the event was produced. Stamped at emit
84 /// time so the event log preserves per-component ordering
85 /// even across server-clock skew.
86 pub created_at: DateTime<Utc>,
87
88 /// Free-form payload; the projector reads typed fields out
89 /// of this. Renamed from the worker's `payload` field to
90 /// match the Python `EventEmitRequest.context` field; serde
91 /// alias accepts either name on the wire.
92 #[serde(alias = "payload")]
93 pub context: serde_json::Value,
94
95 /// Application-side snowflake id for this event. Per
96 /// [`agents/rules/observability.md`][rule] Principle 3,
97 /// the emitting process generates this BEFORE the row hits
98 /// the database so spans / metrics / cross-component
99 /// correlation can use it immediately. `None` falls back to
100 /// the DB-side default (existing `gen_snowflake()` function).
101 ///
102 /// [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
103 #[serde(default, skip_serializing_if = "Option::is_none")]
104 pub event_id: Option<i64>,
105
106 /// Worker that emitted the event (worker pod's id, or
107 /// `"cli-local"` for CLI mode). Used for shard-aware queries
108 /// + diagnostic correlation.
109 #[serde(default, skip_serializing_if = "Option::is_none")]
110 pub worker_id: Option<String>,
111
112 /// Free-form metadata that doesn't belong in `context` —
113 /// retry counts, parent-event refs, catalog ids. Matches the
114 /// Python `EventEmitRequest.meta` field 1:1.
115 #[serde(default, skip_serializing_if = "Option::is_none")]
116 pub meta: Option<serde_json::Value>,
117}
118
119/// Pluggable sink the executor emits `ExecutorEvent`s through.
120///
121/// Producers swap implementations without touching dispatch code:
122///
123/// - CLI's local-mode runner: a stdout / record-replay sink.
124/// - noetl-worker: an HTTP sink that POSTs to `/api/events`.
125/// - tests: `NoopSink` below.
126///
127/// Implementations should be idempotent per `event_id` once R-1.2
128/// adds id assignment.
129#[async_trait]
130pub trait EventSink: Send + Sync {
131 /// Emit `event` to whatever durable surface the sink is bound to.
132 async fn emit(&self, event: ExecutorEvent) -> Result<()>;
133}
134
135/// Trait alias used by the dispatch loop — wraps a sink and a
136/// pre-computed `execution_id` so callers don't have to thread it
137/// through every call site.
138pub struct EventEmitter {
139 pub sink: std::sync::Arc<dyn EventSink>,
140 pub execution_id: i64,
141}
142
143impl EventEmitter {
144 pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
145 Self { sink, execution_id }
146 }
147
148 pub async fn emit(
149 &self,
150 event_type: &str,
151 step: &str,
152 status: &str,
153 context: serde_json::Value,
154 ) -> Result<()> {
155 let event = ExecutorEvent {
156 execution_id: self.execution_id,
157 event_type: event_type.to_string(),
158 step: step.to_string(),
159 status: status.to_string(),
160 created_at: Utc::now(),
161 context,
162 event_id: None,
163 worker_id: None,
164 meta: None,
165 };
166 self.sink.emit(event).await
167 }
168}
169
170/// Drops every event on the floor. Useful in tests and as the default
171/// during the R-1.1 skeleton phase before the real CLI/worker sinks
172/// exist.
173#[derive(Default)]
174pub struct NoopSink;
175
176#[async_trait]
177impl EventSink for NoopSink {
178 async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
179 Ok(())
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use super::*;
186 use std::sync::Arc;
187
188 #[tokio::test]
189 async fn noop_sink_accepts_any_event() {
190 let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
191 let emitter = EventEmitter::new(12345, sink);
192 emitter
193 .emit(
194 "batch.completed",
195 "start",
196 "COMPLETED",
197 serde_json::json!({"processing_ms": 12.3}),
198 )
199 .await
200 .expect("noop emit");
201 }
202
203 // ---- R-1.2 PR-EE-1 — envelope enrichment ------------------------
204
205 fn dummy_event() -> ExecutorEvent {
206 ExecutorEvent {
207 execution_id: 1,
208 event_type: "step.enter".to_string(),
209 step: "fetch".to_string(),
210 status: "STARTED".to_string(),
211 created_at: Utc::now(),
212 context: serde_json::json!({}),
213 event_id: None,
214 worker_id: None,
215 meta: None,
216 }
217 }
218
219 #[test]
220 fn new_optional_fields_omit_from_serialized_json_when_none() {
221 let event = dummy_event();
222 let json = serde_json::to_value(&event).unwrap();
223 // The new optional fields should not appear in the wire
224 // format when None — older consumers shouldn't see
225 // unfamiliar keys.
226 assert!(json.get("event_id").is_none(), "event_id omitted");
227 assert!(json.get("worker_id").is_none(), "worker_id omitted");
228 assert!(json.get("meta").is_none(), "meta omitted");
229 }
230
231 #[test]
232 fn new_optional_fields_serialize_when_present() {
233 let event = ExecutorEvent {
234 event_id: Some(478775660589088777),
235 worker_id: Some("worker-1".to_string()),
236 meta: Some(serde_json::json!({"attempts": 2})),
237 ..dummy_event()
238 };
239 let json = serde_json::to_value(&event).unwrap();
240 assert_eq!(json["event_id"], serde_json::json!(478775660589088777_i64));
241 assert_eq!(json["worker_id"], "worker-1");
242 assert_eq!(json["meta"]["attempts"], 2);
243 }
244
245 #[test]
246 fn deserializes_payload_alias_into_context() {
247 // Older wire format uses `payload`; new envelope uses
248 // `context`. Serde alias lets both deserialize.
249 let json = serde_json::json!({
250 "execution_id": 5,
251 "event_type": "step.enter",
252 "step": "s",
253 "status": "STARTED",
254 "created_at": "2026-05-31T00:00:00Z",
255 "payload": {"foo": "bar"},
256 });
257 let event: ExecutorEvent = serde_json::from_value(json).unwrap();
258 assert_eq!(event.context, serde_json::json!({"foo": "bar"}));
259 }
260
261 #[test]
262 fn deserializes_missing_optional_fields_with_none() {
263 // Wire format without event_id / worker_id / meta should
264 // deserialize cleanly with the new fields set to None.
265 let json = serde_json::json!({
266 "execution_id": 5,
267 "event_type": "step.enter",
268 "step": "s",
269 "status": "STARTED",
270 "created_at": "2026-05-31T00:00:00Z",
271 "context": {},
272 });
273 let event: ExecutorEvent = serde_json::from_value(json).unwrap();
274 assert!(event.event_id.is_none());
275 assert!(event.worker_id.is_none());
276 assert!(event.meta.is_none());
277 }
278
279 #[test]
280 fn round_trips_with_all_optional_fields_set() {
281 let original = ExecutorEvent {
282 execution_id: 478775660589088776,
283 event_type: "command.completed".to_string(),
284 step: "fetch_calendar".to_string(),
285 status: "COMPLETED".to_string(),
286 created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
287 .unwrap()
288 .with_timezone(&Utc),
289 context: serde_json::json!({"result": {"items": 42}}),
290 event_id: Some(478775660589088777),
291 worker_id: Some("worker-prod-7".to_string()),
292 meta: Some(serde_json::json!({"attempts": 3, "parent_event_id": "478775660589088770"})),
293 };
294 let json = serde_json::to_value(&original).unwrap();
295 let parsed: ExecutorEvent = serde_json::from_value(json).unwrap();
296 assert_eq!(parsed.execution_id, original.execution_id);
297 assert_eq!(parsed.event_id, original.event_id);
298 assert_eq!(parsed.worker_id, original.worker_id);
299 assert_eq!(parsed.meta, original.meta);
300 }
301}