Skip to main content

batuta/playbook/
executor.rs

1//! Local sequential pipeline executor (PB-005)
2//!
3//! Orchestrates: parse → build DAG → load lock → for each stage in topo order:
4//! resolve template → hash → check cache → execute → hash outputs → update lock → log event.
5//!
6//! Implements Jidoka: stop on first failure (only policy in Phase 1).
7//! Remote targets return error: "Remote execution requires Phase 2 (PB-006)".
8
9#[path = "executor_stages.rs"]
10mod stages;
11
12#[path = "executor_command.rs"]
13mod command;
14
15use super::cache;
16use super::dag;
17use super::eventlog;
18use super::parser;
19use super::types::*;
20use anyhow::Result;
21use indexmap::IndexMap;
22use std::collections::{HashMap, HashSet};
23use std::path::PathBuf;
24use std::time::Instant;
25
26// Re-export public items
27use command::execute_command;
28use command::CommandError;
29pub use command::{show_status, validate_only};
30use stages::{
31    check_remote_target, compute_stage_hashes, evaluate_cache, finalize_run, handle_frozen,
32    handle_stage_failure, handle_stage_success,
33};
34
35/// Configuration for a playbook run
36#[derive(Debug, Clone)]
37pub struct RunConfig {
38    /// Path to the playbook YAML
39    pub playbook_path: PathBuf,
40
41    /// Only run these stages (None = all)
42    pub stage_filter: Option<Vec<String>>,
43
44    /// Force re-run (ignore cache)
45    pub force: bool,
46
47    /// Dry-run mode (Phase 6, no-op in Phase 1)
48    pub dry_run: bool,
49
50    /// Parameter overrides from CLI
51    pub param_overrides: HashMap<String, serde_yaml_ng::Value>,
52}
53
54/// Result of a playbook run
55#[derive(Debug, Clone)]
56pub struct RunResult {
57    pub stages_run: u32,
58    pub stages_cached: u32,
59    pub stages_failed: u32,
60    pub total_duration: std::time::Duration,
61    pub lock_file: Option<LockFile>,
62}
63
64/// Internal context shared across the execution pipeline
65struct ExecutionContext {
66    run_id: String,
67    playbook: Playbook,
68    dag_result: dag::PlaybookDag,
69    existing_lock: Option<LockFile>,
70    lock: LockFile,
71    stages_run: u32,
72    stages_cached: u32,
73    stages_failed: u32,
74    rerun_stages: HashSet<String>,
75}
76
77/// Hashed inputs for a single stage, computed before cache check
78pub(crate) struct StageHashes {
79    pub(crate) resolved_cmd: String,
80    pub(crate) cmd_hash: String,
81    pub(crate) dep_hashes: Vec<(String, String)>,
82    pub(crate) dep_locks: Vec<DepLock>,
83    pub(crate) params_hash: String,
84    pub(crate) cache_key: String,
85}
86
87/// Whether to skip (cached) or execute the stage.
88pub(crate) enum CacheAction {
89    Cached,
90    Execute,
91}
92
93/// Execute a playbook
94pub async fn run_playbook(config: &RunConfig) -> Result<RunResult> {
95    let total_start = Instant::now();
96
97    let mut ctx = prepare_execution(config)?;
98    let stages_to_run = select_stages(&ctx.dag_result, &config.stage_filter);
99
100    for stage_name in &stages_to_run {
101        execute_single_stage(&mut ctx, stage_name, config).await?;
102    }
103
104    finalize_run(
105        &ctx.playbook,
106        &ctx.run_id,
107        ctx.stages_run,
108        ctx.stages_cached,
109        ctx.stages_failed,
110        total_start.elapsed(),
111        ctx.lock,
112        config,
113    )
114}
115
116/// Execute one stage: check cache, run command, update lock.
117async fn execute_single_stage(
118    ctx: &mut ExecutionContext,
119    stage_name: &str,
120    config: &RunConfig,
121) -> Result<()> {
122    let stage = ctx
123        .playbook
124        .stages
125        .get(stage_name)
126        .ok_or_else(|| anyhow::anyhow!("stage '{}' not found in playbook", stage_name))?;
127
128    check_remote_target(stage_name, stage, &ctx.playbook)?;
129
130    if handle_frozen(stage, config.force, stage_name, &config.playbook_path) {
131        ctx.stages_cached += 1;
132        return Ok(());
133    }
134
135    tracing::debug!(
136        "stage '{}': executing via raw sh -c (bashrs purification deferred to Phase 2)",
137        stage_name
138    );
139
140    let hashes = compute_stage_hashes(stage_name, stage, &ctx.playbook)?;
141    let cache_action = evaluate_cache(
142        stage_name,
143        stage,
144        &hashes,
145        &ctx.existing_lock,
146        &ctx.dag_result,
147        &ctx.rerun_stages,
148        config,
149    );
150
151    if matches!(cache_action, CacheAction::Cached) {
152        ctx.stages_cached += 1;
153        return Ok(());
154    }
155
156    let started_at = eventlog::now_iso8601();
157    let stage_start = Instant::now();
158    let exec_result = execute_command(&hashes.resolved_cmd).await;
159    let duration = stage_start.elapsed();
160    let completed_at = eventlog::now_iso8601();
161
162    match exec_result {
163        Ok(()) => {
164            ctx.stages_run += 1;
165            ctx.rerun_stages.insert(stage_name.to_string());
166            handle_stage_success(
167                stage_name,
168                stage,
169                &hashes,
170                &started_at,
171                &completed_at,
172                duration,
173                &mut ctx.lock,
174                &config.playbook_path,
175            )?;
176        }
177        Err(e) => {
178            ctx.stages_failed += 1;
179            handle_stage_failure(
180                stage_name,
181                stage,
182                &hashes,
183                &started_at,
184                &completed_at,
185                duration,
186                e,
187                &mut ctx.lock,
188                &ctx.playbook,
189                config,
190                &ctx.run_id,
191            )?;
192        }
193    }
194    Ok(())
195}
196
197/// Parse playbook, build DAG, initialize lock file and run context.
198fn prepare_execution(config: &RunConfig) -> Result<ExecutionContext> {
199    let mut playbook = parser::parse_playbook_file(&config.playbook_path)?;
200    let warnings = parser::validate_playbook(&playbook)?;
201    for w in &warnings {
202        tracing::warn!("playbook validation: {}", w);
203    }
204
205    for (k, v) in &config.param_overrides {
206        playbook.params.insert(k.clone(), v.clone());
207    }
208
209    let dag_result = dag::build_dag(&playbook)?;
210    let run_id = eventlog::generate_run_id();
211
212    let _ = eventlog::append_event(
213        &config.playbook_path,
214        PipelineEvent::RunStarted {
215            playbook: playbook.name.clone(),
216            run_id: run_id.clone(),
217            batuta_version: env!("CARGO_PKG_VERSION").to_string(),
218        },
219    );
220
221    let existing_lock = cache::load_lock_file(&config.playbook_path)?;
222
223    let mut lock = LockFile {
224        schema: "1.0".to_string(),
225        playbook: playbook.name.clone(),
226        generated_at: eventlog::now_iso8601(),
227        generator: format!("batuta {}", env!("CARGO_PKG_VERSION")),
228        blake3_version: "1.8".to_string(),
229        params_hash: None,
230        stages: IndexMap::new(),
231    };
232
233    if let Some(ref el) = existing_lock {
234        for (name, stage_lock) in &el.stages {
235            lock.stages.insert(name.clone(), stage_lock.clone());
236        }
237    }
238
239    Ok(ExecutionContext {
240        run_id,
241        playbook,
242        dag_result,
243        existing_lock,
244        lock,
245        stages_run: 0,
246        stages_cached: 0,
247        stages_failed: 0,
248        rerun_stages: HashSet::new(),
249    })
250}
251
252/// Filter and order stages according to DAG topology and optional stage filter.
253fn select_stages(dag_result: &dag::PlaybookDag, stage_filter: &Option<Vec<String>>) -> Vec<String> {
254    if let Some(ref filter) = stage_filter {
255        dag_result.topo_order.iter().filter(|s| filter.contains(s)).cloned().collect()
256    } else {
257        dag_result.topo_order.clone()
258    }
259}
260
261#[cfg(test)]
262#[allow(non_snake_case)]
263#[path = "executor_tests.rs"]
264mod tests;
265
266#[cfg(test)]
267#[allow(non_snake_case)]
268#[path = "executor_tests_integration.rs"]
269mod tests_integration;