noetl_executor/worker/source.rs
1//! `CommandSource` — worker-only abstraction over how the worker
2//! receives the next command to run.
3//!
4//! This module is **worker-only**. The CLI does NOT consume types
5//! from here — its local-mode runner is a tree walker that doesn't
6//! need a pull-model command source. See § H.10 of the global hybrid
7//! cloud blueprint for the architectural rationale.
8//!
9//! The worker (R-1.3) implements a NATS-backed `CommandSource` that
10//! pulls from a durable consumer. Future worker implementations
11//! (e.g. an HTTP-poll source for serverless deployments under § H.2's
12//! Cloud Run compute substrate) implement the same trait.
13
14use anyhow::Result;
15use async_trait::async_trait;
16
17/// One command the worker will dispatch to a tool.
18///
19/// The shape mirrors the Python-side `noetl.command` row + envelope as
20/// of v2.103.x — keep the field names aligned so wire-format
21/// compatibility is automatic. R-1.3 will add the remaining fields
22/// (input, render context, parent execution id, etc.) when the worker
23/// reaches feature parity with the Python worker.
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
25pub struct Command {
26 /// Stable identifier for this command. CLI generates a UUID;
27 /// the worker uses the snowflake id from `noetl.command`.
28 pub command_id: String,
29
30 /// Execution this command belongs to.
31 ///
32 /// R-1.2 PR-2a: aligned to `i64` to match the worker's
33 /// `CommandNotification.execution_id` and the Python
34 /// `noetl.command.execution_id` bigint column.
35 pub execution_id: i64,
36
37 /// Step name from the playbook (e.g. `"fetch_calendar"`).
38 pub step: String,
39
40 /// Tool kind that dispatch will route to (e.g. `"http"`, `"postgres"`).
41 pub tool_kind: String,
42
43 /// Tool-specific input payload, already rendered against the merged
44 /// step context.
45 pub input: serde_json::Value,
46}
47
48/// Pull-model command source.
49///
50/// `next()` returns:
51/// - `Ok(Some(cmd))` — one command to dispatch.
52/// - `Ok(None)` — the source is exhausted (local-mode playbook
53/// complete) and the executor should drain its outstanding work and
54/// exit. Long-running sources (worker NATS) never return `None` in
55/// normal operation.
56/// - `Err(e)` — transient or terminal source error; the caller's retry
57/// policy decides whether to call `next()` again.
58#[async_trait]
59pub trait CommandSource: Send + Sync {
60 async fn next(&mut self) -> Result<Option<Command>>;
61}