noetl_executor/worker/source.rs
1//! `CommandSource` — worker-only abstraction over how the worker
2//! receives the next command to run.
3//!
4//! This module is **worker-only**. The CLI does NOT consume types
5//! from here — its local-mode runner is a tree walker that doesn't
6//! need a pull-model command source. See § H.10 of the global hybrid
7//! cloud blueprint for the architectural rationale.
8//!
9//! The worker (R-1.2 PR-2d-2) implements a NATS-backed
10//! `CommandSource` that pulls from a durable consumer and uses the
11//! control-plane HTTP API to claim individual commands. Future
12//! worker implementations (e.g. an HTTP-poll source for serverless
13//! deployments under § H.2's Cloud Run compute substrate) implement
14//! the same trait.
15//!
16//! ## R-1.2 PR-2d-1 — trait redesign
17//!
18//! The 0.2.x version of this module had a thin
19//! `next() -> Result<Option<Command>>` trait. The worker's real
20//! NATS pull loop turned out to need:
21//!
22//! 1. **Per-pull ack/nack lifecycle** distinct from `next()` so the
23//! caller can ack BEFORE executing the command (NATS redelivery
24//! semantics) or nack on transient claim failures.
25//! 2. **A 4-state outcome** because claiming a command can succeed
26//! (`Claimed`), be raced by another worker (`AlreadyClaimed`),
27//! fail transiently and warrant redelivery (`RetryLater`), or
28//! fail terminally (`Failed`) — each maps to a different
29//! follow-up.
30//! 3. **A richer `Command` shape** with `render_context` (variables
31//! rendered against the merged step context) and `attempts` (for
32//! backoff decisions in the dispatcher).
33//!
34//! The 0.3.0 redesign captures all three. The breaking change is
35//! safe because no production consumer exists yet — noetl-worker
36//! 1.1.2 doesn't import this module; PR-2d-2 will be its first
37//! adoption.
38
39use std::collections::HashMap;
40
41use anyhow::Result;
42use async_trait::async_trait;
43
44/// One command the worker will dispatch to a tool.
45///
46/// The shape mirrors the Python-side `noetl.command` row + envelope
47/// + render-context map. Keep field naming aligned with the wire
48/// format so JSON serde round-trips both directions.
49#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
50pub struct Command {
51 /// Stable identifier for this command. Worker uses the
52 /// snowflake id from `noetl.command`; future sources (HTTP poll,
53 /// local-mode mock) may use UUIDs.
54 pub command_id: String,
55
56 /// Execution this command belongs to. `i64` mirrors the
57 /// Python `noetl.command.execution_id` bigint column and the
58 /// `BridgeContext.execution_id` field in the cli's tools_bridge.
59 pub execution_id: i64,
60
61 /// Step name from the playbook (e.g. `"fetch_calendar"`).
62 pub step: String,
63
64 /// Tool kind that dispatch will route to (e.g. `"http"`,
65 /// `"postgres"`, `"rhai"`). Matches noetl-tools' registry kind.
66 pub tool_kind: String,
67
68 /// Tool-specific input payload (the `ToolConfig.config` JSON
69 /// passed to `noetl_tools::registry::Tool::execute`).
70 pub input: serde_json::Value,
71
72 /// Variables already rendered against the merged step context
73 /// (workload + vars + step results). Tools read this through
74 /// `noetl_tools::context::ExecutionContext.variables`.
75 ///
76 /// R-1.2 PR-2d-1: added so the worker's
77 /// `Command.render_context()` method maps cleanly onto this
78 /// field.
79 #[serde(default)]
80 pub render_context: HashMap<String, serde_json::Value>,
81
82 /// Number of times this command has been attempted (incremented
83 /// by the control plane on each redelivery). Used by the
84 /// dispatcher for backoff and giving-up decisions.
85 ///
86 /// R-1.2 PR-2d-1: added so retry policy decisions can flow
87 /// through the trait.
88 #[serde(default)]
89 pub attempts: u32,
90}
91
92/// Outcome of an attempt to claim a command from the source.
93///
94/// The four variants mirror the worker's pre-PR-2d-1 inline
95/// `ClaimResult` enum (in `crate::client::ControlPlaneClient`):
96///
97/// - **Claimed**: this worker successfully owns the command. Caller
98/// should `ack` the source handle and dispatch the command.
99/// - **AlreadyClaimed**: another worker raced us and already owns
100/// the command. Caller should `ack` the source handle (no
101/// redelivery needed) and skip dispatch.
102/// - **RetryLater**: claim failed transiently (overload, contention,
103/// network blip). Caller should `nack` the source handle so the
104/// source's redelivery policy gives another worker a shot.
105/// - **Failed**: claim failed terminally (catalog mismatch, malformed
106/// command, permission denied). Caller should `nack` and emit a
107/// diagnostic event so the failure is visible.
108#[derive(Debug, Clone)]
109pub enum ClaimOutcome {
110 Claimed(Command),
111 AlreadyClaimed,
112 RetryLater(String),
113 Failed(String),
114}
115
116/// One pulled item from the source — a claim outcome plus the
117/// opaque handle the caller passes back to `ack` or `nack`.
118///
119/// Generic over the source's `AckHandle` associated type so each
120/// `CommandSource` impl can choose its own underlying handle shape
121/// (e.g. `async_nats::jetstream::Message` for the NATS source,
122/// `()` for an in-memory mock).
123#[derive(Debug, Clone)]
124pub struct Pulled<H> {
125 pub outcome: ClaimOutcome,
126 pub ack: H,
127}
128
129/// Pull-model command source.
130///
131/// `next()` returns:
132/// - `Ok(Some(Pulled { outcome, ack }))` — one pulled item. The
133/// caller inspects `outcome` (Claimed / AlreadyClaimed /
134/// RetryLater / Failed) and decides whether to call `ack` (commit
135/// the pull) or `nack` (redeliver). Both ack and nack are
136/// required exactly once per pulled item.
137/// - `Ok(None)` — the source is exhausted (local-mode playbook
138/// complete, mock source drained). Long-running sources (worker
139/// NATS) never return `None` in normal operation.
140/// - `Err(e)` — transient or terminal source error before any pull
141/// happened; the caller's retry policy decides whether to call
142/// `next()` again.
143///
144/// ## Lifecycle invariants
145///
146/// 1. Each `Pulled.ack` handle must be consumed exactly once via
147/// `ack(handle)` or `nack(handle)` before the next `next()` call
148/// in the same task. (Multiple concurrent next() calls are
149/// safe; each gets its own handle.)
150/// 2. `ack` and `nack` are idempotent at the trait level — calling
151/// them multiple times on the same handle is undefined; the
152/// source impl may panic or treat the duplicate as a no-op.
153#[async_trait]
154pub trait CommandSource: Send + Sync {
155 /// Opaque ack handle returned alongside each pulled item.
156 /// Source impls choose their own type:
157 /// - NATS source uses `async_nats::jetstream::Message`.
158 /// - Mock sources can use `()` or a counter.
159 type AckHandle: Send + Sync;
160
161 /// Pull one command from the source. See trait docs for return
162 /// shape and lifecycle.
163 async fn next(&mut self) -> Result<Option<Pulled<Self::AckHandle>>>;
164
165 /// Acknowledge a pulled item (commit the pull; do not redeliver).
166 async fn ack(&self, handle: Self::AckHandle) -> Result<()>;
167
168 /// Negative-acknowledge a pulled item (redeliver per the source's
169 /// own policy).
170 async fn nack(&self, handle: Self::AckHandle) -> Result<()>;
171}
172
173#[cfg(test)]
174mod tests {
175 use super::*;
176 use std::sync::Arc;
177 use tokio::sync::Mutex;
178
179 /// In-memory mock source for testability. Yields a fixed
180 /// sequence of `ClaimOutcome` values and records every ack /
181 /// nack call so tests can assert on the lifecycle.
182 pub struct MockSource {
183 queue: std::collections::VecDeque<ClaimOutcome>,
184 ack_log: Arc<Mutex<Vec<MockAck>>>,
185 next_ack_id: u64,
186 }
187
188 #[derive(Debug, Clone, PartialEq, Eq)]
189 pub enum MockAck {
190 Acked(u64),
191 Nacked(u64),
192 }
193
194 impl MockSource {
195 pub fn new(outcomes: Vec<ClaimOutcome>) -> Self {
196 Self {
197 queue: outcomes.into(),
198 ack_log: Arc::new(Mutex::new(Vec::new())),
199 next_ack_id: 0,
200 }
201 }
202
203 pub fn ack_log(&self) -> Arc<Mutex<Vec<MockAck>>> {
204 Arc::clone(&self.ack_log)
205 }
206 }
207
208 #[async_trait]
209 impl CommandSource for MockSource {
210 type AckHandle = u64;
211
212 async fn next(&mut self) -> Result<Option<Pulled<u64>>> {
213 match self.queue.pop_front() {
214 None => Ok(None),
215 Some(outcome) => {
216 let id = self.next_ack_id;
217 self.next_ack_id += 1;
218 Ok(Some(Pulled { outcome, ack: id }))
219 }
220 }
221 }
222
223 async fn ack(&self, handle: u64) -> Result<()> {
224 self.ack_log.lock().await.push(MockAck::Acked(handle));
225 Ok(())
226 }
227
228 async fn nack(&self, handle: u64) -> Result<()> {
229 self.ack_log.lock().await.push(MockAck::Nacked(handle));
230 Ok(())
231 }
232 }
233
234 fn dummy_command(id: &str) -> Command {
235 Command {
236 command_id: id.to_string(),
237 execution_id: 12345,
238 step: "fetch".to_string(),
239 tool_kind: "http".to_string(),
240 input: serde_json::json!({"url": "https://example.com"}),
241 render_context: HashMap::new(),
242 attempts: 0,
243 }
244 }
245
246 #[tokio::test]
247 async fn empty_source_returns_none() {
248 let mut source = MockSource::new(vec![]);
249 assert!(source.next().await.unwrap().is_none());
250 }
251
252 #[tokio::test]
253 async fn next_yields_in_order_and_increments_handles() {
254 let mut source = MockSource::new(vec![
255 ClaimOutcome::Claimed(dummy_command("a")),
256 ClaimOutcome::Claimed(dummy_command("b")),
257 ]);
258
259 let first = source.next().await.unwrap().unwrap();
260 let second = source.next().await.unwrap().unwrap();
261
262 assert_eq!(first.ack, 0);
263 assert_eq!(second.ack, 1);
264 if let ClaimOutcome::Claimed(c) = first.outcome {
265 assert_eq!(c.command_id, "a");
266 } else {
267 panic!("expected Claimed");
268 }
269 }
270
271 #[tokio::test]
272 async fn ack_and_nack_recorded_in_order() {
273 let source = MockSource::new(vec![]);
274 let log = source.ack_log();
275 source.ack(7).await.unwrap();
276 source.nack(9).await.unwrap();
277 source.ack(11).await.unwrap();
278
279 let log = log.lock().await;
280 assert_eq!(
281 *log,
282 vec![MockAck::Acked(7), MockAck::Nacked(9), MockAck::Acked(11)]
283 );
284 }
285
286 #[tokio::test]
287 async fn already_claimed_outcome_carries_handle() {
288 let mut source = MockSource::new(vec![ClaimOutcome::AlreadyClaimed]);
289 let pulled = source.next().await.unwrap().unwrap();
290 assert!(matches!(pulled.outcome, ClaimOutcome::AlreadyClaimed));
291 // Caller should ack even when AlreadyClaimed — the message
292 // shouldn't redeliver because another worker has the command.
293 source.ack(pulled.ack).await.unwrap();
294 let log = source.ack_log.lock().await;
295 assert_eq!(*log, vec![MockAck::Acked(0)]);
296 }
297
298 #[tokio::test]
299 async fn retry_later_outcome_carries_error_message() {
300 let mut source = MockSource::new(vec![
301 ClaimOutcome::RetryLater("overload".to_string()),
302 ]);
303 let pulled = source.next().await.unwrap().unwrap();
304 match pulled.outcome {
305 ClaimOutcome::RetryLater(msg) => assert_eq!(msg, "overload"),
306 _ => panic!("expected RetryLater"),
307 }
308 }
309
310 #[tokio::test]
311 async fn failed_outcome_carries_error_message() {
312 let mut source = MockSource::new(vec![
313 ClaimOutcome::Failed("malformed payload".to_string()),
314 ]);
315 let pulled = source.next().await.unwrap().unwrap();
316 match pulled.outcome {
317 ClaimOutcome::Failed(msg) => assert_eq!(msg, "malformed payload"),
318 _ => panic!("expected Failed"),
319 }
320 }
321
322 #[test]
323 fn command_round_trips_through_serde_with_defaults() {
324 // No render_context or attempts in JSON → defaults applied.
325 let json = serde_json::json!({
326 "command_id": "cmd-1",
327 "execution_id": 7,
328 "step": "s",
329 "tool_kind": "http",
330 "input": {"url": "https://example.com"},
331 });
332 let cmd: Command = serde_json::from_value(json).unwrap();
333 assert!(cmd.render_context.is_empty());
334 assert_eq!(cmd.attempts, 0);
335 }
336
337 #[test]
338 fn command_round_trips_through_serde_with_full_fields() {
339 let json = serde_json::json!({
340 "command_id": "cmd-2",
341 "execution_id": 12345,
342 "step": "process",
343 "tool_kind": "rhai",
344 "input": {"code": "1 + 1"},
345 "render_context": {"workload.region": "us-east-1"},
346 "attempts": 3,
347 });
348 let cmd: Command = serde_json::from_value(json).unwrap();
349 assert_eq!(cmd.attempts, 3);
350 assert_eq!(
351 cmd.render_context.get("workload.region"),
352 Some(&serde_json::json!("us-east-1"))
353 );
354 }
355}