Skip to main content

noetl_executor/worker/
source.rs

1//! `CommandSource` — worker-only abstraction over how the worker
2//! receives the next command to run.
3//!
4//! This module is **worker-only**.  The CLI does NOT consume types
5//! from here — its local-mode runner is a tree walker that doesn't
6//! need a pull-model command source.  See § H.10 of the global hybrid
7//! cloud blueprint for the architectural rationale.
8//!
9//! The worker (R-1.3) implements a NATS-backed `CommandSource` that
10//! pulls from a durable consumer.  Future worker implementations
11//! (e.g. an HTTP-poll source for serverless deployments under § H.2's
12//! Cloud Run compute substrate) implement the same trait.
13
14use anyhow::Result;
15use async_trait::async_trait;
16
17/// One command the worker will dispatch to a tool.
18///
19/// The shape mirrors the Python-side `noetl.command` row + envelope as
20/// of v2.103.x — keep the field names aligned so wire-format
21/// compatibility is automatic.  R-1.3 will add the remaining fields
22/// (input, render context, parent execution id, etc.) when the worker
23/// reaches feature parity with the Python worker.
24#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
25pub struct Command {
26    /// Stable identifier for this command.  CLI generates a UUID;
27    /// the worker uses the snowflake id from `noetl.command`.
28    pub command_id: String,
29
30    /// Execution this command belongs to.
31    pub execution_id: String,
32
33    /// Step name from the playbook (e.g. `"fetch_calendar"`).
34    pub step: String,
35
36    /// Tool kind that dispatch will route to (e.g. `"http"`, `"postgres"`).
37    pub tool_kind: String,
38
39    /// Tool-specific input payload, already rendered against the merged
40    /// step context.
41    pub input: serde_json::Value,
42}
43
44/// Pull-model command source.
45///
46/// `next()` returns:
47/// - `Ok(Some(cmd))` — one command to dispatch.
48/// - `Ok(None)` — the source is exhausted (local-mode playbook
49///   complete) and the executor should drain its outstanding work and
50///   exit.  Long-running sources (worker NATS) never return `None` in
51///   normal operation.
52/// - `Err(e)` — transient or terminal source error; the caller's retry
53///   policy decides whether to call `next()` again.
54#[async_trait]
55pub trait CommandSource: Send + Sync {
56    async fn next(&mut self) -> Result<Option<Command>>;
57}