Skip to main content

noetl_executor/
playbook.rs

1//! Pydantic-like YAML playbook types.
2//!
3//! Extracted from `repos/cli/src/playbook_runner.rs` lines 15-446 in
4//! R-1.1 PR-2a per Appendix H of the global hybrid cloud blueprint.
5//! These are the shape of a parsed `Playbook` YAML — the data model
6//! that the executor (both CLI local-mode and worker NATS-mode)
7//! operates on.
8//!
9//! ## What lives here
10//!
11//! - [`RuntimeCapabilities`] — feature/tool advertisement for the
12//!   local vs distributed profiles.
13//! - [`Playbook`] — top-level parsed playbook envelope.
14//! - [`Step`], [`StepSpec`], [`Tool`], [`Loop config`] — workflow
15//!   step shape.
16//! - [`NextFormat`] + impl — v10 router AND legacy array routing
17//!   normalisation.
18//! - [`CaseCondition`], [`ThenBlock`], [`WhenCondition`] —
19//!   conditional routing.
20//! - [`SinkTarget`], [`SinkFormat`], [`AuthConfig`] — tool support
21//!   types.
22//!
23//! ## What does NOT live here
24//!
25//! - `RunOutcome` — the CLI's JSON output envelope.  Stays in
26//!   `playbook_runner.rs` because it's not a YAML input shape and
27//!   the worker has its own output schema.
28//! - `PlaybookRunner` — the orchestration shim.  R-1.1 PR-2d
29//!   replaces its body with a call into the executor core; the
30//!   public struct stays on the CLI side so `main.rs` continues to
31//!   use it unchanged.
32//! - `ExecutionContext` (lines 2420+) — a CLI-side per-step
33//!   render context that R-1.1 PR-2b folds into
34//!   [`crate::runtime::ExecutionContext`].
35
36use serde::Deserialize;
37use std::collections::HashMap;
38
39/// Runtime capability set — defines what features a runtime supports.
40#[derive(Debug, Clone, serde::Serialize, Deserialize)]
41pub struct RuntimeCapabilities {
42    pub runtime: String, // "local" or "distributed"
43    pub version: String, // "noetl-runtime/1"
44    pub tools: Vec<String>,
45    pub features: Vec<String>,
46}
47
48impl RuntimeCapabilities {
49    /// Local runtime capabilities.
50    pub fn local() -> Self {
51        Self {
52            runtime: "local".to_string(),
53            version: "noetl-runtime/1".to_string(),
54            tools: vec![
55                "shell".to_string(),
56                "http".to_string(),
57                "duckdb".to_string(),
58                "rhai".to_string(),
59                "playbook".to_string(),
60                "auth".to_string(),
61                "sink".to_string(),
62            ],
63            features: vec![
64                "case_v1".to_string(),
65                "case_v2".to_string(), // Rhai conditions
66                "loop_v1".to_string(),
67                "vars_v1".to_string(),
68                "jinja2".to_string(),
69            ],
70        }
71    }
72
73    /// Distributed runtime capabilities.
74    #[allow(dead_code)]
75    pub fn distributed() -> Self {
76        Self {
77            runtime: "distributed".to_string(),
78            version: "noetl-runtime/1".to_string(),
79            tools: vec![
80                "shell".to_string(),
81                "http".to_string(),
82                "postgres".to_string(),
83                "duckdb".to_string(),
84                "python".to_string(),
85                "playbook".to_string(),
86                "iterator".to_string(),
87            ],
88            features: vec![
89                "case_v1".to_string(),
90                "case_v2".to_string(),
91                "loop_v1".to_string(),
92                "loop_v2".to_string(), // Pagination
93                "vars_v1".to_string(),
94                "vars_v2".to_string(), // Cross-step results
95                "sink_v1".to_string(),
96                "jinja2".to_string(),
97                "event_sourcing".to_string(),
98            ],
99        }
100    }
101}
102
103#[derive(Debug, Deserialize)]
104pub struct Playbook {
105    #[serde(rename = "apiVersion")]
106    pub api_version: String,
107    #[allow(dead_code)]
108    pub kind: String,
109    pub metadata: Metadata,
110    /// Runtime requirements and capabilities (8-char root key).
111    #[serde(default)]
112    pub executor: Option<Executor>,
113    pub workload: Option<HashMap<String, serde_yaml::Value>>,
114    pub workflow: Vec<Step>,
115}
116
117/// Executor specification — runtime requirements and capabilities.
118#[derive(Debug, Deserialize, Default)]
119pub struct Executor {
120    /// Runtime profile: local, distributed, or auto.
121    #[serde(default = "default_profile")]
122    pub profile: String,
123    /// Semantic contract version: noetl-runtime/1.
124    #[serde(default = "default_version")]
125    pub version: String,
126    /// Required capabilities.
127    #[serde(default)]
128    pub requires: Option<ExecutorRequires>,
129    /// Executor spec for entry/final step configuration.
130    #[serde(default)]
131    pub spec: Option<ExecutorSpec>,
132}
133
134/// Executor spec for workflow entry and termination control.
135#[derive(Debug, Deserialize, Default, Clone)]
136pub struct ExecutorSpec {
137    /// Override entry step (default: workflow[0]).
138    #[serde(default)]
139    pub entry_step: Option<String>,
140    /// Optional finalization step run after quiescence.
141    #[serde(default)]
142    pub final_step: Option<String>,
143    /// Treat "no next match" as error (default: false = branch terminates).
144    #[serde(default)]
145    pub no_next_is_error: Option<bool>,
146}
147
148pub fn default_profile() -> String {
149    "auto".to_string()
150}
151
152pub fn default_version() -> String {
153    "noetl-runtime/1".to_string()
154}
155
156/// Executor requirements.
157#[derive(Debug, Deserialize, Default)]
158pub struct ExecutorRequires {
159    /// Required tool kinds.
160    #[serde(default)]
161    pub tools: Vec<String>,
162    /// Required features.
163    #[serde(default)]
164    pub features: Vec<String>,
165}
166
167#[derive(Debug, Deserialize)]
168pub struct Metadata {
169    pub name: String,
170    #[allow(dead_code)]
171    pub path: Option<String>,
172}
173
174#[derive(Debug, Deserialize)]
175pub struct Step {
176    pub step: String,
177    pub desc: Option<String>,
178    /// Step enablement guard — evaluated before step runs (canonical v2).
179    #[serde(rename = "when")]
180    pub when_guard: Option<String>,
181    /// Step-level input data for cross-boundary propagation (DSL v2).
182    #[serde(default)]
183    pub input: Option<HashMap<String, serde_yaml::Value>>,
184    pub tool: Option<Tool>,
185    /// Next transitions — raw YAML, parsed manually to support both v10
186    /// router and legacy formats.
187    #[serde(default)]
188    pub next: Option<serde_yaml::Value>,
189    #[serde(rename = "case")]
190    pub case: Option<Vec<CaseCondition>>,
191    #[serde(rename = "loop")]
192    #[allow(dead_code)]
193    pub loop_config: Option<LoopConfig>,
194    pub vars: Option<HashMap<String, String>>,
195    /// Step spec for routing mode.
196    #[serde(default)]
197    pub spec: Option<StepSpec>,
198}
199
200/// Step specification for routing control.
201#[derive(Debug, Deserialize, Default, Clone)]
202pub struct StepSpec {
203    /// Routing mode: exclusive (default, first match) or inclusive (all matches).
204    #[serde(default)]
205    pub next_mode: Option<NextMode>,
206}
207
208/// Next routing mode.
209#[derive(Debug, Deserialize, Clone, Default)]
210#[serde(rename_all = "lowercase")]
211pub enum NextMode {
212    #[default]
213    Exclusive,
214    Inclusive,
215}
216
217/// V10 router spec for next transitions.
218#[derive(Debug, Clone)]
219pub struct NextRouterSpec {
220    pub mode: Option<String>,
221}
222
223/// V10 arc for router format.
224#[derive(Debug, Clone)]
225pub struct NextArc {
226    pub step: String,
227    pub when_condition: Option<String>,
228    pub args: Option<HashMap<String, serde_yaml::Value>>,
229}
230
231/// Next format — supports both v10 router and legacy array formats.
232#[derive(Debug)]
233pub enum NextFormat {
234    /// V10 router format: `{ spec: { mode: ... }, arcs: [...] }`.
235    Router {
236        spec: Option<NextRouterSpec>,
237        arcs: Vec<NextArc>,
238    },
239    /// Legacy array format: `[{ step: ... }, ...]`.
240    Array(Vec<NextStep>),
241}
242
243impl NextFormat {
244    /// Parse next field from `serde_yaml::Value`.
245    pub fn from_yaml_value(value: &serde_yaml::Value) -> Option<NextFormat> {
246        match value {
247            serde_yaml::Value::Sequence(_arr) => {
248                // Legacy array format.
249                let steps: Vec<NextStep> = serde_yaml::from_value(value.clone()).ok()?;
250                Some(NextFormat::Array(steps))
251            }
252            serde_yaml::Value::Mapping(map) => {
253                // V10 router format: { spec: { mode: ... }, arcs: [...] }
254                let spec = map.get(&serde_yaml::Value::String("spec".to_string())).and_then(|v| {
255                    if let serde_yaml::Value::Mapping(spec_map) = v {
256                        let mode = spec_map
257                            .get(&serde_yaml::Value::String("mode".to_string()))
258                            .and_then(|m| m.as_str().map(|s| s.to_string()));
259                        Some(NextRouterSpec { mode })
260                    } else {
261                        None
262                    }
263                });
264
265                let arcs = map.get(&serde_yaml::Value::String("arcs".to_string())).and_then(|v| {
266                    if let serde_yaml::Value::Sequence(arcs_arr) = v {
267                        let arcs: Vec<NextArc> = arcs_arr
268                            .iter()
269                            .filter_map(|arc_val| {
270                                if let serde_yaml::Value::Mapping(arc_map) = arc_val {
271                                    let step = arc_map
272                                        .get(&serde_yaml::Value::String("step".to_string()))
273                                        .and_then(|s| s.as_str().map(|s| s.to_string()))?;
274                                    let when_condition = arc_map
275                                        .get(&serde_yaml::Value::String("when".to_string()))
276                                        .and_then(|w| w.as_str().map(|s| s.to_string()));
277                                    let args = arc_map
278                                        .get(&serde_yaml::Value::String("args".to_string()))
279                                        .and_then(|a| serde_yaml::from_value(a.clone()).ok());
280                                    Some(NextArc {
281                                        step,
282                                        when_condition,
283                                        args,
284                                    })
285                                } else {
286                                    None
287                                }
288                            })
289                            .collect();
290                        Some(arcs)
291                    } else {
292                        None
293                    }
294                })?;
295
296                Some(NextFormat::Router { spec, arcs })
297            }
298            _ => None,
299        }
300    }
301}
302
303/// Then block can be either a list of actions or a single action dict.
304#[derive(Debug, Deserialize)]
305#[serde(untagged)]
306pub enum ThenBlock {
307    /// Single action object (backwards compatible).
308    Single(serde_yaml::Value),
309    /// List of action objects.
310    List(Vec<NextStep>),
311}
312
313#[derive(Debug, Deserialize)]
314pub struct CaseCondition {
315    #[serde(flatten)]
316    pub when: WhenCondition,
317    pub then: ThenBlock,
318    #[serde(rename = "else")]
319    pub else_steps: Option<Vec<NextStep>>,
320}
321
322/// Condition that can be either a simple template string or Rhai code.
323#[derive(Debug, Deserialize)]
324#[serde(untagged)]
325pub enum WhenCondition {
326    /// Rhai expression for complex conditions.
327    Rhai {
328        #[serde(alias = "when_rhai")]
329        rhai: String,
330    },
331    /// Simple template string condition (Jinja2-style).
332    Simple { when: String },
333}
334
335#[derive(Debug, Deserialize)]
336#[serde(tag = "kind", rename_all = "lowercase")]
337pub enum Tool {
338    Shell {
339        #[serde(default)]
340        cmds: CmdsList,
341    },
342    Http {
343        #[serde(default = "default_method")]
344        method: String,
345        url: String,
346        #[serde(default)]
347        headers: HashMap<String, String>,
348        #[serde(default)]
349        params: HashMap<String, String>,
350        body: Option<String>,
351        #[serde(default)]
352        auth: Option<AuthConfig>,
353    },
354    Playbook {
355        path: String,
356        /// Legacy args field (DSL v1) — deprecated in favor of input.
357        #[serde(default)]
358        args: HashMap<String, String>,
359        /// Canonical input field (DSL v2) — takes precedence over args.
360        #[serde(default)]
361        input: HashMap<String, serde_yaml::Value>,
362    },
363    #[serde(rename = "duckdb")]
364    DuckDb {
365        #[serde(default = "default_duckdb_path")]
366        db: String,
367        query: Option<String>,
368        #[serde(default)]
369        params: Vec<String>,
370    },
371    Auth {
372        provider: String,
373        #[serde(default)]
374        scopes: Vec<String>,
375        #[serde(default)]
376        project: Option<String>,
377    },
378    Sink {
379        target: SinkTarget,
380        #[serde(default)]
381        format: SinkFormat,
382    },
383    Rhai {
384        code: String,
385        #[serde(default)]
386        args: HashMap<String, String>,
387    },
388    #[serde(other)]
389    Unsupported,
390}
391
392pub fn default_method() -> String {
393    "GET".to_string()
394}
395
396pub fn default_duckdb_path() -> String {
397    ".noetl/state.duckdb".to_string()
398}
399
400#[derive(Debug, Deserialize, Clone)]
401pub struct AuthConfig {
402    /// Auth provider type: adc (Application Default Credentials), token, basic.
403    #[serde(alias = "source")]
404    pub provider: String,
405    #[serde(default)]
406    pub scopes: Vec<String>,
407}
408
409#[derive(Debug, Deserialize)]
410#[serde(tag = "type", rename_all = "lowercase")]
411pub enum SinkTarget {
412    File {
413        path: String,
414    },
415    #[serde(rename = "duckdb")]
416    DuckDb {
417        db: String,
418        table: String,
419    },
420    Gcs {
421        bucket: String,
422        path: String,
423    },
424}
425
426#[derive(Debug, Deserialize, Default)]
427#[serde(rename_all = "lowercase")]
428pub enum SinkFormat {
429    #[default]
430    Json,
431    Yaml,
432    Csv,
433}
434
435#[derive(Debug, Deserialize)]
436#[serde(untagged)]
437pub enum CmdsList {
438    Single(String),
439    Multiple(Vec<String>),
440}
441
442impl Default for CmdsList {
443    fn default() -> Self {
444        CmdsList::Multiple(vec![])
445    }
446}
447
448/// Next step definition — supports canonical v2 format.
449#[derive(Debug, Deserialize)]
450#[serde(untagged)]
451pub enum NextStep {
452    /// Canonical v2 format: `{ step: "name", when: "condition", args: {...} }`.
453    Canonical {
454        step: String,
455        #[serde(rename = "when")]
456        when_condition: Option<String>,
457        #[serde(default)]
458        args: Option<HashMap<String, serde_yaml::Value>>,
459    },
460    /// Legacy conditional: `{ when: "condition", then: [...] }`.
461    Conditional { when: Option<String>, then: Vec<NextStep> },
462    /// Legacy next action: `{ next: [...] }`.
463    NextAction { next: Vec<NextStep> },
464}
465
466#[derive(Debug, Deserialize)]
467#[allow(dead_code)]
468pub struct LoopConfig {
469    #[serde(rename = "in")]
470    pub in_collection: String,
471    pub iterator: String,
472    pub mode: Option<String>,
473}