Skip to main content

noetl_executor/worker/
source.rs

1//! `CommandSource` — worker-only abstraction over how the worker
2//! receives the next command to run.
3//!
4//! This module is **worker-only**.  The CLI does NOT consume types
5//! from here — its local-mode runner is a tree walker that doesn't
6//! need a pull-model command source.  See § H.10 of the global hybrid
7//! cloud blueprint for the architectural rationale.
8//!
9//! The worker (R-1.2 PR-2d-2) implements a NATS-backed
10//! `CommandSource` that pulls from a durable consumer and uses the
11//! control-plane HTTP API to claim individual commands.  Future
12//! worker implementations (e.g. an HTTP-poll source for serverless
13//! deployments under § H.2's Cloud Run compute substrate) implement
14//! the same trait.
15//!
16//! ## R-1.2 PR-2d-1 — trait redesign
17//!
18//! The 0.2.x version of this module had a thin
19//! `next() -> Result<Option<Command>>` trait.  The worker's real
20//! NATS pull loop turned out to need:
21//!
22//! 1. **Per-pull ack/nack lifecycle** distinct from `next()` so the
23//!    caller can ack BEFORE executing the command (NATS redelivery
24//!    semantics) or nack on transient claim failures.
25//! 2. **A 4-state outcome** because claiming a command can succeed
26//!    (`Claimed`), be raced by another worker (`AlreadyClaimed`),
27//!    fail transiently and warrant redelivery (`RetryLater`), or
28//!    fail terminally (`Failed`) — each maps to a different
29//!    follow-up.
30//! 3. **A richer `Command` shape** with `render_context` (variables
31//!    rendered against the merged step context) and `attempts` (for
32//!    backoff decisions in the dispatcher).
33//!
34//! The 0.3.0 redesign captures all three.  The breaking change is
35//! safe because no production consumer exists yet — noetl-worker
36//! 1.1.2 doesn't import this module; PR-2d-2 will be its first
37//! adoption.
38
39use std::collections::HashMap;
40
41use anyhow::Result;
42use async_trait::async_trait;
43
44/// One command the worker will dispatch to a tool.
45///
46/// The shape mirrors the Python-side `noetl.command` row + envelope
47/// + render-context map.  Keep field naming aligned with the wire
48/// format so JSON serde round-trips both directions.
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50pub struct Command {
51    /// Stable identifier for this command.  Worker uses the
52    /// snowflake id from `noetl.command`; future sources (HTTP poll,
53    /// local-mode mock) may use UUIDs.
54    pub command_id: String,
55
56    /// Execution this command belongs to.  `i64` mirrors the
57    /// Python `noetl.command.execution_id` bigint column and the
58    /// `BridgeContext.execution_id` field in the cli's tools_bridge.
59    pub execution_id: i64,
60
61    /// Step name from the playbook (e.g. `"fetch_calendar"`).
62    pub step: String,
63
64    /// Tool kind that dispatch will route to (e.g. `"http"`,
65    /// `"postgres"`, `"rhai"`).  Matches noetl-tools' registry kind.
66    pub tool_kind: String,
67
68    /// Tool-specific input payload (the `ToolConfig.config` JSON
69    /// passed to `noetl_tools::registry::Tool::execute`).
70    pub input: serde_json::Value,
71
72    /// Variables already rendered against the merged step context
73    /// (workload + vars + step results).  Tools read this through
74    /// `noetl_tools::context::ExecutionContext.variables`.
75    ///
76    /// R-1.2 PR-2d-1: added so the worker's
77    /// `Command.render_context()` method maps cleanly onto this
78    /// field.
79    #[serde(default)]
80    pub render_context: HashMap<String, serde_json::Value>,
81
82    /// Number of times this command has been attempted (incremented
83    /// by the control plane on each redelivery).  Used by the
84    /// dispatcher for backoff and giving-up decisions.
85    ///
86    /// R-1.2 PR-2d-1: added so retry policy decisions can flow
87    /// through the trait.
88    #[serde(default)]
89    pub attempts: u32,
90}
91
92/// Outcome of an attempt to claim a command from the source.
93///
94/// The four variants mirror the worker's pre-PR-2d-1 inline
95/// `ClaimResult` enum (in `crate::client::ControlPlaneClient`):
96///
97/// - **Claimed**: this worker successfully owns the command.  Caller
98///   should `ack` the source handle and dispatch the command.
99/// - **AlreadyClaimed**: another worker raced us and already owns
100///   the command.  Caller should `ack` the source handle (no
101///   redelivery needed) and skip dispatch.
102/// - **RetryLater**: claim failed transiently (overload, contention,
103///   network blip).  Caller should `nack` the source handle so the
104///   source's redelivery policy gives another worker a shot.
105/// - **Failed**: claim failed terminally (catalog mismatch, malformed
106///   command, permission denied).  Caller should `nack` and emit a
107///   diagnostic event so the failure is visible.
108#[derive(Debug, Clone)]
109pub enum ClaimOutcome {
110    Claimed(Command),
111    AlreadyClaimed,
112    RetryLater(String),
113    Failed(String),
114}
115
116/// One pulled item from the source — a claim outcome plus the
117/// opaque handle the caller passes back to `ack` or `nack`.
118///
119/// Generic over the source's `AckHandle` associated type so each
120/// `CommandSource` impl can choose its own underlying handle shape
121/// (e.g. `async_nats::jetstream::Message` for the NATS source,
122/// `()` for an in-memory mock).
123#[derive(Debug, Clone)]
124pub struct Pulled<H> {
125    pub outcome: ClaimOutcome,
126    pub ack: H,
127}
128
129/// Pull-model command source.
130///
131/// `next()` returns:
132/// - `Ok(Some(Pulled { outcome, ack }))` — one pulled item.  The
133///   caller inspects `outcome` (Claimed / AlreadyClaimed /
134///   RetryLater / Failed) and decides whether to call `ack` (commit
135///   the pull) or `nack` (redeliver).  Both ack and nack are
136///   required exactly once per pulled item.
137/// - `Ok(None)` — the source is exhausted (local-mode playbook
138///   complete, mock source drained).  Long-running sources (worker
139///   NATS) never return `None` in normal operation.
140/// - `Err(e)` — transient or terminal source error before any pull
141///   happened; the caller's retry policy decides whether to call
142///   `next()` again.
143///
144/// ## Lifecycle invariants
145///
146/// 1. Each `Pulled.ack` handle must be consumed exactly once via
147///    `ack(handle)` or `nack(handle)` before the next `next()` call
148///    in the same task.  (Multiple concurrent next() calls are
149///    safe; each gets its own handle.)
150/// 2. `ack` and `nack` are idempotent at the trait level — calling
151///    them multiple times on the same handle is undefined; the
152///    source impl may panic or treat the duplicate as a no-op.
153#[async_trait]
154pub trait CommandSource: Send + Sync {
155    /// Opaque ack handle returned alongside each pulled item.
156    /// Source impls choose their own type:
157    /// - NATS source uses `async_nats::jetstream::Message`.
158    /// - Mock sources can use `()` or a counter.
159    type AckHandle: Send + Sync;
160
161    /// Pull one command from the source.  See trait docs for return
162    /// shape and lifecycle.
163    async fn next(&mut self) -> Result<Option<Pulled<Self::AckHandle>>>;
164
165    /// Acknowledge a pulled item (commit the pull; do not redeliver).
166    async fn ack(&self, handle: Self::AckHandle) -> Result<()>;
167
168    /// Negative-acknowledge a pulled item (redeliver per the source's
169    /// own policy).
170    async fn nack(&self, handle: Self::AckHandle) -> Result<()>;
171}
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use std::sync::Arc;
177    use tokio::sync::Mutex;
178
179    /// In-memory mock source for testability.  Yields a fixed
180    /// sequence of `ClaimOutcome` values and records every ack /
181    /// nack call so tests can assert on the lifecycle.
182    pub struct MockSource {
183        queue: std::collections::VecDeque<ClaimOutcome>,
184        ack_log: Arc<Mutex<Vec<MockAck>>>,
185        next_ack_id: u64,
186    }
187
188    #[derive(Debug, Clone, PartialEq, Eq)]
189    pub enum MockAck {
190        Acked(u64),
191        Nacked(u64),
192    }
193
194    impl MockSource {
195        pub fn new(outcomes: Vec<ClaimOutcome>) -> Self {
196            Self {
197                queue: outcomes.into(),
198                ack_log: Arc::new(Mutex::new(Vec::new())),
199                next_ack_id: 0,
200            }
201        }
202
203        pub fn ack_log(&self) -> Arc<Mutex<Vec<MockAck>>> {
204            Arc::clone(&self.ack_log)
205        }
206    }
207
208    #[async_trait]
209    impl CommandSource for MockSource {
210        type AckHandle = u64;
211
212        async fn next(&mut self) -> Result<Option<Pulled<u64>>> {
213            match self.queue.pop_front() {
214                None => Ok(None),
215                Some(outcome) => {
216                    let id = self.next_ack_id;
217                    self.next_ack_id += 1;
218                    Ok(Some(Pulled { outcome, ack: id }))
219                }
220            }
221        }
222
223        async fn ack(&self, handle: u64) -> Result<()> {
224            self.ack_log.lock().await.push(MockAck::Acked(handle));
225            Ok(())
226        }
227
228        async fn nack(&self, handle: u64) -> Result<()> {
229            self.ack_log.lock().await.push(MockAck::Nacked(handle));
230            Ok(())
231        }
232    }
233
234    fn dummy_command(id: &str) -> Command {
235        Command {
236            command_id: id.to_string(),
237            execution_id: 12345,
238            step: "fetch".to_string(),
239            tool_kind: "http".to_string(),
240            input: serde_json::json!({"url": "https://example.com"}),
241            render_context: HashMap::new(),
242            attempts: 0,
243        }
244    }
245
246    #[tokio::test]
247    async fn empty_source_returns_none() {
248        let mut source = MockSource::new(vec![]);
249        assert!(source.next().await.unwrap().is_none());
250    }
251
252    #[tokio::test]
253    async fn next_yields_in_order_and_increments_handles() {
254        let mut source = MockSource::new(vec![
255            ClaimOutcome::Claimed(dummy_command("a")),
256            ClaimOutcome::Claimed(dummy_command("b")),
257        ]);
258
259        let first = source.next().await.unwrap().unwrap();
260        let second = source.next().await.unwrap().unwrap();
261
262        assert_eq!(first.ack, 0);
263        assert_eq!(second.ack, 1);
264        if let ClaimOutcome::Claimed(c) = first.outcome {
265            assert_eq!(c.command_id, "a");
266        } else {
267            panic!("expected Claimed");
268        }
269    }
270
271    #[tokio::test]
272    async fn ack_and_nack_recorded_in_order() {
273        let source = MockSource::new(vec![]);
274        let log = source.ack_log();
275        source.ack(7).await.unwrap();
276        source.nack(9).await.unwrap();
277        source.ack(11).await.unwrap();
278
279        let log = log.lock().await;
280        assert_eq!(
281            *log,
282            vec![MockAck::Acked(7), MockAck::Nacked(9), MockAck::Acked(11)]
283        );
284    }
285
286    #[tokio::test]
287    async fn already_claimed_outcome_carries_handle() {
288        let mut source = MockSource::new(vec![ClaimOutcome::AlreadyClaimed]);
289        let pulled = source.next().await.unwrap().unwrap();
290        assert!(matches!(pulled.outcome, ClaimOutcome::AlreadyClaimed));
291        // Caller should ack even when AlreadyClaimed — the message
292        // shouldn't redeliver because another worker has the command.
293        source.ack(pulled.ack).await.unwrap();
294        let log = source.ack_log.lock().await;
295        assert_eq!(*log, vec![MockAck::Acked(0)]);
296    }
297
298    #[tokio::test]
299    async fn retry_later_outcome_carries_error_message() {
300        let mut source = MockSource::new(vec![
301            ClaimOutcome::RetryLater("overload".to_string()),
302        ]);
303        let pulled = source.next().await.unwrap().unwrap();
304        match pulled.outcome {
305            ClaimOutcome::RetryLater(msg) => assert_eq!(msg, "overload"),
306            _ => panic!("expected RetryLater"),
307        }
308    }
309
310    #[tokio::test]
311    async fn failed_outcome_carries_error_message() {
312        let mut source = MockSource::new(vec![
313            ClaimOutcome::Failed("malformed payload".to_string()),
314        ]);
315        let pulled = source.next().await.unwrap().unwrap();
316        match pulled.outcome {
317            ClaimOutcome::Failed(msg) => assert_eq!(msg, "malformed payload"),
318            _ => panic!("expected Failed"),
319        }
320    }
321
322    #[test]
323    fn command_round_trips_through_serde_with_defaults() {
324        // No render_context or attempts in JSON → defaults applied.
325        let json = serde_json::json!({
326            "command_id": "cmd-1",
327            "execution_id": 7,
328            "step": "s",
329            "tool_kind": "http",
330            "input": {"url": "https://example.com"},
331        });
332        let cmd: Command = serde_json::from_value(json).unwrap();
333        assert!(cmd.render_context.is_empty());
334        assert_eq!(cmd.attempts, 0);
335    }
336
337    #[test]
338    fn command_round_trips_through_serde_with_full_fields() {
339        let json = serde_json::json!({
340            "command_id": "cmd-2",
341            "execution_id": 12345,
342            "step": "process",
343            "tool_kind": "rhai",
344            "input": {"code": "1 + 1"},
345            "render_context": {"workload.region": "us-east-1"},
346            "attempts": 3,
347        });
348        let cmd: Command = serde_json::from_value(json).unwrap();
349        assert_eq!(cmd.attempts, 3);
350        assert_eq!(
351            cmd.render_context.get("workload.region"),
352            Some(&serde_json::json!("us-east-1"))
353        );
354    }
355}