Skip to main content

noetl_events/
lib.rs

1//! NoETL shared event envelope.
2//!
3//! This crate is the single canonical home for the wire-format
4//! `ExecutorEvent` envelope and the `EventSink` trait every NoETL
5//! Rust binary emits events through.  Producers (CLI's local-mode
6//! runner, noetl-worker's NATS pull consumer) and consumers
7//! (noetl-server's `POST /api/events` endpoint, the projector that
8//! writes the `noetl.event` row, the gateway's SSE stream) all
9//! agree on the shape because they all link this one type.
10//!
11//! Until EE-4 the envelope lived in `noetl-executor::events`.
12//! That worked while only producers needed it — the noetl-server
13//! kept its own hand-aligned `EventRequest` and the alignment was
14//! enforced by code review.  When the Rust server started
15//! producing events itself (Phase D R2 orchestrator wiring, see
16//! noetl/server#31) the duplicated type became actively
17//! maintenance-heavy, so EE-4 (noetl/ai-meta#49 follow-up) carves
18//! it out into this dependency-free crate that every component
19//! can link without dragging in the rest of the executor's
20//! surface (playbook parser, runtime trait, tools bridge, …).
21//!
22//! Re-export contract: `noetl-executor::events` re-exports every
23//! public symbol from this crate verbatim so the existing call
24//! sites keep working unchanged.  EE-4 PR 2 publishes the crate
25//! to crates.io and EE-4 PR 3 adds it as a direct dependency to
26//! `noetl-server`.
27//!
28//! # Wire-format alignment
29//!
30//! `ExecutorEvent` is intentionally close to the Python
31//! `EventEmitRequest` shape so events emitted by either stack
32//! project against the same `noetl.event` columns.  See the
33//! noetl/noetl wiki page `handle_event_timing` for the field
34//! catalogue and the per-field semantics.
35
36use anyhow::Result;
37use async_trait::async_trait;
38use chrono::{DateTime, Utc};
39
40/// One event the executor emits.  Keep field naming aligned with the
41/// Python side (`noetl.event` table columns) so envelopes can be
42/// projected by either stack.
43///
44/// R-1.2 PR-2a: `execution_id` is now `i64` (matching the Python
45/// `noetl.event.execution_id` bigint column, the CLI's
46/// `BridgeContext.execution_id`, the worker's
47/// `CommandNotification.execution_id`, and
48/// `noetl_tools::context::ExecutionContext.execution_id`).
49///
50/// R-1.2 PR-EE-1 (0.3.1): adds the optional fields the Python
51/// `EventEmitRequest` (and the Rust `noetl-server` `EventRequest`)
52/// already expect — `event_id`, `worker_id`, `meta`.  All are
53/// `Option` + `#[serde(default, skip_serializing_if =
54/// "Option::is_none")]` so older wire-format consumers deserialize
55/// cleanly and producers that don't populate them omit them from
56/// the JSON entirely.  This step is the additive prerequisite for
57/// the cross-repo event envelope reconciliation tracked on
58/// [noetl/ai-meta#30](https://github.com/noetl/ai-meta/issues/30).
59///
60/// EE-4 (noetl/ai-meta#49): the struct moved out of
61/// `noetl-executor::events` into this dedicated `noetl-events`
62/// crate so noetl-server can link it directly.
63#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
64pub struct ExecutorEvent {
65    /// Execution this event belongs to.  Matches the
66    /// `noetl.event.execution_id` bigint column.
67    pub execution_id: i64,
68
69    /// Event type (e.g. `"step.enter"`, `"call.done"`,
70    /// `"step.exit"`, `"command.completed"`, `"command.failed"`).
71    /// Python side uses an `EventType` enum but Rust accepts the
72    /// raw string; the server validates against the enum when
73    /// projecting.
74    pub event_type: String,
75
76    /// Step name (`node_id` / `node_name` in the Python projector).
77    pub step: String,
78
79    /// Lifecycle status (e.g. `"STARTED"`, `"COMPLETED"`,
80    /// `"FAILED"`).
81    pub status: String,
82
83    /// Wall-clock when the event was produced.  Stamped at emit
84    /// time so the event log preserves per-component ordering
85    /// even across server-clock skew.
86    pub created_at: DateTime<Utc>,
87
88    /// Free-form payload; the projector reads typed fields out
89    /// of this.  Renamed from the worker's `payload` field to
90    /// match the Python `EventEmitRequest.context` field; serde
91    /// alias accepts either name on the wire.
92    #[serde(alias = "payload")]
93    pub context: serde_json::Value,
94
95    /// Application-side snowflake id for this event.  Per
96    /// [`agents/rules/observability.md`][rule] Principle 3,
97    /// the emitting process generates this BEFORE the row hits
98    /// the database so spans / metrics / cross-component
99    /// correlation can use it immediately.  `None` falls back to
100    /// the DB-side default (existing `gen_snowflake()` function).
101    ///
102    /// [rule]: https://github.com/noetl/ai-meta/blob/main/agents/rules/observability.md
103    #[serde(default, skip_serializing_if = "Option::is_none")]
104    pub event_id: Option<i64>,
105
106    /// Worker that emitted the event (worker pod's id, or
107    /// `"cli-local"` for CLI mode).  Used for shard-aware queries
108    /// + diagnostic correlation.
109    #[serde(default, skip_serializing_if = "Option::is_none")]
110    pub worker_id: Option<String>,
111
112    /// Free-form metadata that doesn't belong in `context` —
113    /// retry counts, parent-event refs, catalog ids.  Matches the
114    /// Python `EventEmitRequest.meta` field 1:1.
115    #[serde(default, skip_serializing_if = "Option::is_none")]
116    pub meta: Option<serde_json::Value>,
117}
118
119/// Pluggable sink the executor emits `ExecutorEvent`s through.
120///
121/// Producers swap implementations without touching dispatch code:
122///
123/// - CLI's local-mode runner: a stdout / record-replay sink.
124/// - noetl-worker: an HTTP sink that POSTs to `/api/events`.
125/// - tests: `NoopSink` below.
126///
127/// Implementations should be idempotent per `event_id` once R-1.2
128/// adds id assignment.
129#[async_trait]
130pub trait EventSink: Send + Sync {
131    /// Emit `event` to whatever durable surface the sink is bound to.
132    async fn emit(&self, event: ExecutorEvent) -> Result<()>;
133}
134
135/// Trait alias used by the dispatch loop — wraps a sink and a
136/// pre-computed `execution_id` so callers don't have to thread it
137/// through every call site.
138pub struct EventEmitter {
139    pub sink: std::sync::Arc<dyn EventSink>,
140    pub execution_id: i64,
141}
142
143impl EventEmitter {
144    pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
145        Self { sink, execution_id }
146    }
147
148    pub async fn emit(
149        &self,
150        event_type: &str,
151        step: &str,
152        status: &str,
153        context: serde_json::Value,
154    ) -> Result<()> {
155        let event = ExecutorEvent {
156            execution_id: self.execution_id,
157            event_type: event_type.to_string(),
158            step: step.to_string(),
159            status: status.to_string(),
160            created_at: Utc::now(),
161            context,
162            event_id: None,
163            worker_id: None,
164            meta: None,
165        };
166        self.sink.emit(event).await
167    }
168}
169
170/// Drops every event on the floor.  Useful in tests and as the default
171/// during the R-1.1 skeleton phase before the real CLI/worker sinks
172/// exist.
173#[derive(Default)]
174pub struct NoopSink;
175
176#[async_trait]
177impl EventSink for NoopSink {
178    async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
179        Ok(())
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use super::*;
186    use std::sync::Arc;
187
188    #[tokio::test]
189    async fn noop_sink_accepts_any_event() {
190        let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
191        let emitter = EventEmitter::new(12345, sink);
192        emitter
193            .emit(
194                "batch.completed",
195                "start",
196                "COMPLETED",
197                serde_json::json!({"processing_ms": 12.3}),
198            )
199            .await
200            .expect("noop emit");
201    }
202
203    // ---- R-1.2 PR-EE-1 — envelope enrichment ------------------------
204
205    fn dummy_event() -> ExecutorEvent {
206        ExecutorEvent {
207            execution_id: 1,
208            event_type: "step.enter".to_string(),
209            step: "fetch".to_string(),
210            status: "STARTED".to_string(),
211            created_at: Utc::now(),
212            context: serde_json::json!({}),
213            event_id: None,
214            worker_id: None,
215            meta: None,
216        }
217    }
218
219    #[test]
220    fn new_optional_fields_omit_from_serialized_json_when_none() {
221        let event = dummy_event();
222        let json = serde_json::to_value(&event).unwrap();
223        // The new optional fields should not appear in the wire
224        // format when None — older consumers shouldn't see
225        // unfamiliar keys.
226        assert!(json.get("event_id").is_none(), "event_id omitted");
227        assert!(json.get("worker_id").is_none(), "worker_id omitted");
228        assert!(json.get("meta").is_none(), "meta omitted");
229    }
230
231    #[test]
232    fn new_optional_fields_serialize_when_present() {
233        let event = ExecutorEvent {
234            event_id: Some(478775660589088777),
235            worker_id: Some("worker-1".to_string()),
236            meta: Some(serde_json::json!({"attempts": 2})),
237            ..dummy_event()
238        };
239        let json = serde_json::to_value(&event).unwrap();
240        assert_eq!(json["event_id"], serde_json::json!(478775660589088777_i64));
241        assert_eq!(json["worker_id"], "worker-1");
242        assert_eq!(json["meta"]["attempts"], 2);
243    }
244
245    #[test]
246    fn deserializes_payload_alias_into_context() {
247        // Older wire format uses `payload`; new envelope uses
248        // `context`.  Serde alias lets both deserialize.
249        let json = serde_json::json!({
250            "execution_id": 5,
251            "event_type": "step.enter",
252            "step": "s",
253            "status": "STARTED",
254            "created_at": "2026-05-31T00:00:00Z",
255            "payload": {"foo": "bar"},
256        });
257        let event: ExecutorEvent = serde_json::from_value(json).unwrap();
258        assert_eq!(event.context, serde_json::json!({"foo": "bar"}));
259    }
260
261    #[test]
262    fn deserializes_missing_optional_fields_with_none() {
263        // Wire format without event_id / worker_id / meta should
264        // deserialize cleanly with the new fields set to None.
265        let json = serde_json::json!({
266            "execution_id": 5,
267            "event_type": "step.enter",
268            "step": "s",
269            "status": "STARTED",
270            "created_at": "2026-05-31T00:00:00Z",
271            "context": {},
272        });
273        let event: ExecutorEvent = serde_json::from_value(json).unwrap();
274        assert!(event.event_id.is_none());
275        assert!(event.worker_id.is_none());
276        assert!(event.meta.is_none());
277    }
278
279    #[test]
280    fn round_trips_with_all_optional_fields_set() {
281        let original = ExecutorEvent {
282            execution_id: 478775660589088776,
283            event_type: "command.completed".to_string(),
284            step: "fetch_calendar".to_string(),
285            status: "COMPLETED".to_string(),
286            created_at: chrono::DateTime::parse_from_rfc3339("2026-05-31T03:14:15Z")
287                .unwrap()
288                .with_timezone(&Utc),
289            context: serde_json::json!({"result": {"items": 42}}),
290            event_id: Some(478775660589088777),
291            worker_id: Some("worker-prod-7".to_string()),
292            meta: Some(serde_json::json!({"attempts": 3, "parent_event_id": "478775660589088770"})),
293        };
294        let json = serde_json::to_value(&original).unwrap();
295        let parsed: ExecutorEvent = serde_json::from_value(json).unwrap();
296        assert_eq!(parsed.execution_id, original.execution_id);
297        assert_eq!(parsed.event_id, original.event_id);
298        assert_eq!(parsed.worker_id, original.worker_id);
299        assert_eq!(parsed.meta, original.meta);
300    }
301}