1#![allow(clippy::print_stdout, clippy::print_stderr)]
8
9pub mod backend;
10pub mod cache;
11pub mod config;
12pub mod graph;
13pub mod lock;
14pub mod metrics;
15mod orchestrator;
16pub mod redact;
17pub mod remote;
18pub mod runner;
19pub mod secrets;
20
21pub use backend::{
22 BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
23};
24pub use cache::LocalCacheBackend;
25pub use config::CIExecutorConfig;
26pub use lock::{ConcurrencyLock, LockConfig, LockError, LockGuard};
27pub use metrics::{CacheMetrics, RestoreErrorType, global_metrics};
28pub use orchestrator::run_ci;
29pub use redact::{LogRedactor, ShortSecretWarning, redact_secrets};
30pub use remote::{RemoteCacheBackend, RemoteCacheConfig};
31pub use runner::TaskOutput;
32pub use secrets::{EnvSecretResolver, MockSecretResolver, SaltConfig, SecretResolver};
33
34use crate::compiler::Compiler;
35use crate::ir::IntermediateRepresentation;
36use cache::TaskLogs;
37use cuenv_core::manifest::Project;
38use graph::{CITaskGraph, CITaskNode};
39use runner::IRTaskRunner;
40use secrets::CIResolvedSecrets;
41use std::collections::HashMap;
42use thiserror::Error;
43use tokio::task::JoinSet;
44
45#[derive(Debug, Error)]
47pub enum ExecutorError {
48 #[error("Failed to compile project to IR: {0}")]
50 Compilation(String),
51
52 #[error(transparent)]
54 Graph(#[from] graph::GraphError),
55
56 #[error(transparent)]
58 Secret(#[from] secrets::SecretError),
59
60 #[error(transparent)]
62 Cache(#[from] cache::CacheError),
63
64 #[error(transparent)]
66 Runner(#[from] runner::RunnerError),
67
68 #[error("Task panicked: {0}")]
70 TaskPanic(String),
71
72 #[error("Pipeline '{name}' not found. Available: {available}")]
74 PipelineNotFound { name: String, available: String },
75
76 #[error("Project has no CI configuration")]
78 NoCIConfig,
79}
80
81#[derive(Debug)]
83pub struct PipelineResult {
84 pub success: bool,
86 pub tasks: Vec<TaskOutput>,
88 pub duration_ms: u64,
90}
91
92use std::sync::Arc;
93
94pub struct CIExecutor {
102 config: CIExecutorConfig,
103 cache_backend: Option<Arc<dyn CacheBackend>>,
105}
106
107impl CIExecutor {
108 #[must_use]
110 pub fn new(config: CIExecutorConfig) -> Self {
111 Self {
112 config,
113 cache_backend: None,
114 }
115 }
116
117 #[must_use]
122 pub fn with_cache_backend(config: CIExecutorConfig, backend: Arc<dyn CacheBackend>) -> Self {
123 Self {
124 config,
125 cache_backend: Some(backend),
126 }
127 }
128
129 #[must_use]
131 pub fn has_custom_cache_backend(&self) -> bool {
132 self.cache_backend.is_some()
133 }
134
135 #[must_use]
137 pub fn cache_backend_name(&self) -> &'static str {
138 self.cache_backend.as_ref().map_or("local", |b| b.name())
139 }
140
141 #[tracing::instrument(
150 name = "ci_execute_pipeline",
151 fields(project_root = %self.config.project_root.display()),
152 skip(self, project)
153 )]
154 pub async fn execute_pipeline(
155 &self,
156 project: &Project,
157 pipeline_name: Option<&str>,
158 ) -> std::result::Result<PipelineResult, ExecutorError> {
159 let start = std::time::Instant::now();
160
161 tracing::info!("Compiling project to IR");
163 let compiler = Compiler::new(project.clone());
164 let ir = compiler
165 .compile()
166 .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
167
168 tracing::info!(task_count = ir.tasks.len(), "IR compilation complete");
169
170 tracing::info!("Building task graph");
172 let mut task_graph = CITaskGraph::from_ir(&ir)?;
173
174 tracing::info!("Resolving secrets");
176 let all_secrets = self.resolve_all_secrets(&ir)?;
177 let fingerprints = Self::extract_fingerprints(&all_secrets);
178
179 tracing::info!("Computing task digests");
181 task_graph.compute_digests(&ir, &fingerprints, self.config.secret_salt.as_deref());
182
183 let parallel_groups = task_graph.get_parallel_groups()?;
185 tracing::info!(groups = parallel_groups.len(), "Execution groups computed");
186
187 let cache_root = self.config.effective_cache_root();
189 let mut all_results = Vec::new();
190 let mut pipeline_success = true;
191
192 for (group_idx, group) in parallel_groups.iter().enumerate() {
193 tracing::info!(
194 group = group_idx,
195 tasks = group.len(),
196 "Executing task group"
197 );
198
199 let group_results = self
200 .execute_group(group, &ir, &cache_root, &all_secrets)
201 .await?;
202
203 for result in &group_results {
205 if !result.success {
206 tracing::warn!(task = %result.task_id, "Task failed");
207 pipeline_success = false;
208 }
209 }
210
211 all_results.extend(group_results);
212
213 if !pipeline_success {
215 tracing::warn!("Pipeline failed, aborting remaining groups");
216 break;
217 }
218 }
219
220 let duration = start.elapsed();
221
222 Ok(PipelineResult {
223 success: pipeline_success,
224 tasks: all_results,
225 duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
226 })
227 }
228
229 async fn execute_group(
231 &self,
232 group: &[&CITaskNode],
233 ir: &IntermediateRepresentation,
234 cache_root: &std::path::Path,
235 all_secrets: &HashMap<String, CIResolvedSecrets>,
236 ) -> std::result::Result<Vec<TaskOutput>, ExecutorError> {
237 let mut results = Vec::new();
238
239 if self.config.max_parallel <= 1 || group.len() == 1 {
240 for node in group {
242 let result = self
243 .execute_single_task(node, ir, cache_root, all_secrets)
244 .await?;
245 results.push(result);
246 }
247 } else {
248 let mut join_set = JoinSet::new();
250
251 for node in group {
252 let cache_result = cache::check_cache(
254 &node.task,
255 &node.digest,
256 cache_root,
257 self.config.cache_policy_override,
258 );
259
260 if cache_result.hit {
261 tracing::info!(task = %node.id, "Cache hit, skipping execution");
262 results.push(TaskOutput::from_cache(node.id.clone(), 0));
263 continue;
264 }
265
266 if self.config.dry_run {
267 tracing::info!(task = %node.id, "Would execute (dry-run)");
268 results.push(TaskOutput::dry_run(node.id.clone()));
269 continue;
270 }
271
272 let task = node.task.clone();
274 let digest = node.digest.clone();
275 let project_root = self.config.project_root.clone();
276 let capture = self.config.capture_output;
277 let cache_root_owned = cache_root.to_path_buf();
278 let policy_override = self.config.cache_policy_override;
279
280 let mut env = task.env.clone();
282 if let Some(resolved) = all_secrets.get(&task.id) {
283 for (name, value) in &resolved.values {
284 env.insert(name.clone(), value.clone());
285 }
286 }
287
288 join_set.spawn(async move {
290 let runner = IRTaskRunner::new(project_root, capture);
291 let result = runner.execute(&task, env).await;
292 (task, digest, cache_root_owned, policy_override, result)
293 });
294 }
295
296 while let Some(join_result) = join_set.join_next().await {
298 let (task, digest, cache_root_owned, policy_override, exec_result) =
299 join_result.map_err(|e| ExecutorError::TaskPanic(e.to_string()))?;
300
301 let output = exec_result?;
302
303 if output.success {
305 cache::store_result(
306 &task,
307 &digest,
308 &cache_root_owned,
309 &TaskLogs {
310 stdout: Some(output.stdout.clone()),
311 stderr: Some(output.stderr.clone()),
312 },
313 output.duration_ms,
314 output.exit_code,
315 policy_override,
316 )?;
317 }
318
319 results.push(output);
320 }
321 }
322
323 Ok(results)
324 }
325
326 async fn execute_single_task(
328 &self,
329 node: &CITaskNode,
330 _ir: &IntermediateRepresentation,
331 cache_root: &std::path::Path,
332 all_secrets: &HashMap<String, CIResolvedSecrets>,
333 ) -> std::result::Result<TaskOutput, ExecutorError> {
334 let cache_result = cache::check_cache(
336 &node.task,
337 &node.digest,
338 cache_root,
339 self.config.cache_policy_override,
340 );
341
342 if cache_result.hit {
343 tracing::info!(task = %node.id, "Cache hit, skipping execution");
344 return Ok(TaskOutput::from_cache(node.id.clone(), 0));
345 }
346
347 if self.config.dry_run {
348 tracing::info!(task = %node.id, "Would execute (dry-run)");
349 return Ok(TaskOutput::dry_run(node.id.clone()));
350 }
351
352 let mut env = node.task.env.clone();
354 if let Some(resolved) = all_secrets.get(&node.id) {
355 for (name, value) in &resolved.values {
356 env.insert(name.clone(), value.clone());
357 }
358 }
359
360 let runner =
362 IRTaskRunner::new(self.config.project_root.clone(), self.config.capture_output);
363 let output = runner.execute(&node.task, env).await?;
364
365 if output.success {
367 cache::store_result(
368 &node.task,
369 &node.digest,
370 cache_root,
371 &TaskLogs {
372 stdout: Some(output.stdout.clone()),
373 stderr: Some(output.stderr.clone()),
374 },
375 output.duration_ms,
376 output.exit_code,
377 self.config.cache_policy_override,
378 )?;
379 }
380
381 Ok(output)
382 }
383
384 fn resolve_all_secrets(
386 &self,
387 ir: &IntermediateRepresentation,
388 ) -> std::result::Result<HashMap<String, CIResolvedSecrets>, secrets::SecretError> {
389 secrets::resolve_all_task_secrets(&ir.tasks, self.config.secret_salt.as_deref())
390 }
391
392 fn extract_fingerprints(
394 all_secrets: &HashMap<String, CIResolvedSecrets>,
395 ) -> HashMap<String, HashMap<String, String>> {
396 all_secrets
397 .iter()
398 .map(|(task_id, resolved)| (task_id.clone(), resolved.fingerprints().clone()))
399 .collect()
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use crate::ir::{CachePolicy, PipelineMetadata, StageConfiguration, Task as IRTask};
407
408 #[allow(dead_code)]
409 fn make_simple_ir(tasks: Vec<IRTask>) -> IntermediateRepresentation {
410 IntermediateRepresentation {
411 version: "1.4".to_string(),
412 pipeline: PipelineMetadata {
413 name: "test".to_string(),
414 environment: None,
415 requires_onepassword: false,
416 project_name: None,
417 trigger: None,
418 },
419 runtimes: vec![],
420 stages: StageConfiguration::default(),
421 tasks,
422 }
423 }
424
425 #[allow(dead_code)]
426 fn make_task(id: &str, deps: &[&str]) -> IRTask {
427 IRTask {
428 id: id.to_string(),
429 runtime: None,
430 command: vec!["echo".to_string(), id.to_string()],
431 shell: false,
432 env: HashMap::new(),
433 secrets: HashMap::new(),
434 resources: None,
435 concurrency_group: None,
436 inputs: vec![],
437 outputs: vec![],
438 depends_on: deps.iter().map(|s| (*s).to_string()).collect(),
439 cache_policy: CachePolicy::Normal,
440 deployment: false,
441 manual_approval: false,
442 }
443 }
444
445 #[test]
446 fn test_executor_config_builder() {
447 let config = CIExecutorConfig::new(std::path::PathBuf::from("/project"))
448 .with_max_parallel(8)
449 .with_dry_run(true);
450
451 assert_eq!(config.max_parallel, 8);
452 assert!(config.dry_run);
453 }
454
455 #[test]
456 fn test_extract_fingerprints() {
457 temp_env::with_var("TEST_EXTRACT_FP_SECRET", Some("test_value"), || {
458 let executor = CIExecutor::new(CIExecutorConfig::default());
459
460 let mut secrets = HashMap::new();
461
462 let secret_configs = HashMap::from([(
463 "api_key".to_string(),
464 crate::ir::SecretConfig {
465 source: "TEST_EXTRACT_FP_SECRET".to_string(),
466 cache_key: true,
467 },
468 )]);
469
470 let resolved = CIResolvedSecrets::from_env(&secret_configs, Some("test-salt")).unwrap();
471 secrets.insert("task1".to_string(), resolved);
472
473 let _ = executor; let fingerprints = CIExecutor::extract_fingerprints(&secrets);
475
476 assert!(fingerprints.contains_key("task1"));
477 assert!(fingerprints["task1"].contains_key("api_key"));
478 });
479 }
480}