Skip to main content

noetl_executor/
events.rs

1//! Event emission — pluggable sink that captures the events the
2//! executor produces during one execution.
3//!
4//! Both CLI and worker emit the same shape; only the sink differs:
5//!
6//! - CLI: `StdoutEventSink` — pretty-prints events to the terminal
7//!   (R-1.2 will introduce this).
8//! - Worker: `NatsEventSink` — publishes to the configured NATS
9//!   subject (R-1.3 will introduce this).
10//!
11//! The envelope shape mirrors the Python-side
12//! `noetl.runtime.events.report_event` payload so wire-format
13//! compatibility holds across stacks.  See the noetl/noetl wiki page
14//! `handle_event_timing` for the field catalogue.
15
16use anyhow::Result;
17use async_trait::async_trait;
18use chrono::{DateTime, Utc};
19
20/// One event the executor emits.  Keep field naming aligned with the
21/// Python side (`noetl.event` table columns) so envelopes can be
22/// projected by either stack.
23///
24/// R-1.2 PR-2a: `execution_id` is now `i64` (matching the Python
25/// `noetl.event.execution_id` bigint column, the CLI's
26/// `BridgeContext.execution_id`, the worker's
27/// `CommandNotification.execution_id`, and
28/// `noetl_tools::context::ExecutionContext.execution_id`).  In 0.1.x
29/// the field was `String` as a placeholder; cross-binary work in
30/// R-1.2 makes the alignment load-bearing.
31#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
32pub struct ExecutorEvent {
33    pub execution_id: i64,
34    pub event_type: String,
35    /// Step name (`node_id` / `node_name` in the Python projector).
36    pub step: String,
37    pub status: String,
38    /// Wall-clock when the event was produced.
39    pub created_at: DateTime<Utc>,
40    /// Free-form payload; the projector reads typed fields out of this.
41    pub context: serde_json::Value,
42}
43
44#[async_trait]
45pub trait EventSink: Send + Sync {
46    /// Emit `event` to whatever durable surface the sink is bound to.
47    /// Implementations should be idempotent per `event_id` once R-1.2
48    /// adds id assignment.
49    async fn emit(&self, event: ExecutorEvent) -> Result<()>;
50}
51
52/// Trait alias used by the dispatch loop — wraps a sink and a
53/// pre-computed `execution_id` so callers don't have to thread it
54/// through every call site.
55pub struct EventEmitter {
56    pub sink: std::sync::Arc<dyn EventSink>,
57    pub execution_id: i64,
58}
59
60impl EventEmitter {
61    pub fn new(execution_id: i64, sink: std::sync::Arc<dyn EventSink>) -> Self {
62        Self { sink, execution_id }
63    }
64
65    pub async fn emit(
66        &self,
67        event_type: &str,
68        step: &str,
69        status: &str,
70        context: serde_json::Value,
71    ) -> Result<()> {
72        let event = ExecutorEvent {
73            execution_id: self.execution_id,
74            event_type: event_type.to_string(),
75            step: step.to_string(),
76            status: status.to_string(),
77            created_at: Utc::now(),
78            context,
79        };
80        self.sink.emit(event).await
81    }
82}
83
84/// Drops every event on the floor.  Useful in tests and as the default
85/// during the R-1.1 skeleton phase before the real CLI/worker sinks
86/// exist.
87#[derive(Default)]
88pub struct NoopSink;
89
90#[async_trait]
91impl EventSink for NoopSink {
92    async fn emit(&self, _event: ExecutorEvent) -> Result<()> {
93        Ok(())
94    }
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100    use std::sync::Arc;
101
102    #[tokio::test]
103    async fn noop_sink_accepts_any_event() {
104        let sink: Arc<dyn EventSink> = Arc::new(NoopSink);
105        let emitter = EventEmitter::new(12345, sink);
106        emitter
107            .emit(
108                "batch.completed",
109                "start",
110                "COMPLETED",
111                serde_json::json!({"processing_ms": 12.3}),
112            )
113            .await
114            .expect("noop emit");
115    }
116}