noetl_executor/worker/source.rs
1//! `CommandSource` — worker-only abstraction over how the worker
2//! receives the next command to run.
3//!
4//! This module is **worker-only**. The CLI does NOT consume types
5//! from here — its local-mode runner is a tree walker that doesn't
6//! need a pull-model command source. See § H.10 of the global hybrid
7//! cloud blueprint for the architectural rationale.
8//!
9//! The worker (R-1.3) implements a NATS-backed `CommandSource` that
10//! pulls from a durable consumer. Future worker implementations
11//! (e.g. an HTTP-poll source for serverless deployments under § H.2's
12//! Cloud Run compute substrate) implement the same trait.
13
14use anyhow::Result;
15use async_trait::async_trait;
16
17/// One command the worker will dispatch to a tool.
18///
19/// The shape mirrors the Python-side `noetl.command` row + envelope as
20/// of v2.103.x — keep the field names aligned so wire-format
21/// compatibility is automatic. R-1.3 will add the remaining fields
22/// (input, render context, parent execution id, etc.) when the worker
23/// reaches feature parity with the Python worker.
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
25pub struct Command {
26 /// Stable identifier for this command. CLI generates a UUID;
27 /// the worker uses the snowflake id from `noetl.command`.
28 pub command_id: String,
29
30 /// Execution this command belongs to.
31 pub execution_id: String,
32
33 /// Step name from the playbook (e.g. `"fetch_calendar"`).
34 pub step: String,
35
36 /// Tool kind that dispatch will route to (e.g. `"http"`, `"postgres"`).
37 pub tool_kind: String,
38
39 /// Tool-specific input payload, already rendered against the merged
40 /// step context.
41 pub input: serde_json::Value,
42}
43
44/// Pull-model command source.
45///
46/// `next()` returns:
47/// - `Ok(Some(cmd))` — one command to dispatch.
48/// - `Ok(None)` — the source is exhausted (local-mode playbook
49/// complete) and the executor should drain its outstanding work and
50/// exit. Long-running sources (worker NATS) never return `None` in
51/// normal operation.
52/// - `Err(e)` — transient or terminal source error; the caller's retry
53/// policy decides whether to call `next()` again.
54#[async_trait]
55pub trait CommandSource: Send + Sync {
56 async fn next(&mut self) -> Result<Option<Command>>;
57}