1#[path = "executor_stages.rs"]
10mod stages;
11
12#[path = "executor_command.rs"]
13mod command;
14
15use super::cache;
16use super::dag;
17use super::eventlog;
18use super::parser;
19use super::types::*;
20use anyhow::Result;
21use indexmap::IndexMap;
22use std::collections::{HashMap, HashSet};
23use std::path::PathBuf;
24use std::time::Instant;
25
26use command::execute_command;
28use command::CommandError;
29pub use command::{show_status, validate_only};
30use stages::{
31 check_remote_target, compute_stage_hashes, evaluate_cache, finalize_run, handle_frozen,
32 handle_stage_failure, handle_stage_success,
33};
34
35#[derive(Debug, Clone)]
37pub struct RunConfig {
38 pub playbook_path: PathBuf,
40
41 pub stage_filter: Option<Vec<String>>,
43
44 pub force: bool,
46
47 pub dry_run: bool,
49
50 pub param_overrides: HashMap<String, serde_yaml_ng::Value>,
52}
53
54#[derive(Debug, Clone)]
56pub struct RunResult {
57 pub stages_run: u32,
58 pub stages_cached: u32,
59 pub stages_failed: u32,
60 pub total_duration: std::time::Duration,
61 pub lock_file: Option<LockFile>,
62}
63
64struct ExecutionContext {
66 run_id: String,
67 playbook: Playbook,
68 dag_result: dag::PlaybookDag,
69 existing_lock: Option<LockFile>,
70 lock: LockFile,
71 stages_run: u32,
72 stages_cached: u32,
73 stages_failed: u32,
74 rerun_stages: HashSet<String>,
75}
76
77pub(crate) struct StageHashes {
79 pub(crate) resolved_cmd: String,
80 pub(crate) cmd_hash: String,
81 pub(crate) dep_hashes: Vec<(String, String)>,
82 pub(crate) dep_locks: Vec<DepLock>,
83 pub(crate) params_hash: String,
84 pub(crate) cache_key: String,
85}
86
87pub(crate) enum CacheAction {
89 Cached,
90 Execute,
91}
92
93pub async fn run_playbook(config: &RunConfig) -> Result<RunResult> {
95 let total_start = Instant::now();
96
97 let mut ctx = prepare_execution(config)?;
98 let stages_to_run = select_stages(&ctx.dag_result, &config.stage_filter);
99
100 for stage_name in &stages_to_run {
101 execute_single_stage(&mut ctx, stage_name, config).await?;
102 }
103
104 finalize_run(
105 &ctx.playbook,
106 &ctx.run_id,
107 ctx.stages_run,
108 ctx.stages_cached,
109 ctx.stages_failed,
110 total_start.elapsed(),
111 ctx.lock,
112 config,
113 )
114}
115
116async fn execute_single_stage(
118 ctx: &mut ExecutionContext,
119 stage_name: &str,
120 config: &RunConfig,
121) -> Result<()> {
122 let stage = ctx
123 .playbook
124 .stages
125 .get(stage_name)
126 .ok_or_else(|| anyhow::anyhow!("stage '{}' not found in playbook", stage_name))?;
127
128 check_remote_target(stage_name, stage, &ctx.playbook)?;
129
130 if handle_frozen(stage, config.force, stage_name, &config.playbook_path) {
131 ctx.stages_cached += 1;
132 return Ok(());
133 }
134
135 tracing::debug!(
136 "stage '{}': executing via raw sh -c (bashrs purification deferred to Phase 2)",
137 stage_name
138 );
139
140 let hashes = compute_stage_hashes(stage_name, stage, &ctx.playbook)?;
141 let cache_action = evaluate_cache(
142 stage_name,
143 stage,
144 &hashes,
145 &ctx.existing_lock,
146 &ctx.dag_result,
147 &ctx.rerun_stages,
148 config,
149 );
150
151 if matches!(cache_action, CacheAction::Cached) {
152 ctx.stages_cached += 1;
153 return Ok(());
154 }
155
156 let started_at = eventlog::now_iso8601();
157 let stage_start = Instant::now();
158 let exec_result = execute_command(&hashes.resolved_cmd).await;
159 let duration = stage_start.elapsed();
160 let completed_at = eventlog::now_iso8601();
161
162 match exec_result {
163 Ok(()) => {
164 ctx.stages_run += 1;
165 ctx.rerun_stages.insert(stage_name.to_string());
166 handle_stage_success(
167 stage_name,
168 stage,
169 &hashes,
170 &started_at,
171 &completed_at,
172 duration,
173 &mut ctx.lock,
174 &config.playbook_path,
175 )?;
176 }
177 Err(e) => {
178 ctx.stages_failed += 1;
179 handle_stage_failure(
180 stage_name,
181 stage,
182 &hashes,
183 &started_at,
184 &completed_at,
185 duration,
186 e,
187 &mut ctx.lock,
188 &ctx.playbook,
189 config,
190 &ctx.run_id,
191 )?;
192 }
193 }
194 Ok(())
195}
196
197fn prepare_execution(config: &RunConfig) -> Result<ExecutionContext> {
199 let mut playbook = parser::parse_playbook_file(&config.playbook_path)?;
200 let warnings = parser::validate_playbook(&playbook)?;
201 for w in &warnings {
202 tracing::warn!("playbook validation: {}", w);
203 }
204
205 for (k, v) in &config.param_overrides {
206 playbook.params.insert(k.clone(), v.clone());
207 }
208
209 let dag_result = dag::build_dag(&playbook)?;
210 let run_id = eventlog::generate_run_id();
211
212 let _ = eventlog::append_event(
213 &config.playbook_path,
214 PipelineEvent::RunStarted {
215 playbook: playbook.name.clone(),
216 run_id: run_id.clone(),
217 batuta_version: env!("CARGO_PKG_VERSION").to_string(),
218 },
219 );
220
221 let existing_lock = cache::load_lock_file(&config.playbook_path)?;
222
223 let mut lock = LockFile {
224 schema: "1.0".to_string(),
225 playbook: playbook.name.clone(),
226 generated_at: eventlog::now_iso8601(),
227 generator: format!("batuta {}", env!("CARGO_PKG_VERSION")),
228 blake3_version: "1.8".to_string(),
229 params_hash: None,
230 stages: IndexMap::new(),
231 };
232
233 if let Some(ref el) = existing_lock {
234 for (name, stage_lock) in &el.stages {
235 lock.stages.insert(name.clone(), stage_lock.clone());
236 }
237 }
238
239 Ok(ExecutionContext {
240 run_id,
241 playbook,
242 dag_result,
243 existing_lock,
244 lock,
245 stages_run: 0,
246 stages_cached: 0,
247 stages_failed: 0,
248 rerun_stages: HashSet::new(),
249 })
250}
251
252fn select_stages(dag_result: &dag::PlaybookDag, stage_filter: &Option<Vec<String>>) -> Vec<String> {
254 if let Some(ref filter) = stage_filter {
255 dag_result.topo_order.iter().filter(|s| filter.contains(s)).cloned().collect()
256 } else {
257 dag_result.topo_order.clone()
258 }
259}
260
261#[cfg(test)]
262#[allow(non_snake_case)]
263#[path = "executor_tests.rs"]
264mod tests;
265
266#[cfg(test)]
267#[allow(non_snake_case)]
268#[path = "executor_tests_integration.rs"]
269mod tests_integration;