cuenv_ci/executor/
mod.rs

1//! CI Pipeline Executor
2//!
3//! Executes CI pipelines with proper dependency ordering, caching,
4//! and parallel execution.
5
6// CI executor outputs to stdout/stderr as part of its normal operation
7#![allow(clippy::print_stdout, clippy::print_stderr)]
8
9pub mod backend;
10pub mod cache;
11pub mod config;
12pub mod graph;
13pub mod lock;
14pub mod metrics;
15mod orchestrator;
16pub mod redact;
17pub mod remote;
18pub mod runner;
19pub mod secrets;
20
21pub use backend::{
22    BackendError, BackendResult, CacheBackend, CacheEntry, CacheLookupResult, CacheOutput,
23};
24pub use cache::LocalCacheBackend;
25pub use config::CIExecutorConfig;
26pub use lock::{ConcurrencyLock, LockConfig, LockError, LockGuard};
27pub use metrics::{CacheMetrics, RestoreErrorType, global_metrics};
28pub use orchestrator::run_ci;
29pub use redact::{LogRedactor, ShortSecretWarning, redact_secrets};
30pub use remote::{RemoteCacheBackend, RemoteCacheConfig};
31pub use runner::TaskOutput;
32pub use secrets::{EnvSecretResolver, MockSecretResolver, SaltConfig, SecretResolver};
33
34use crate::compiler::Compiler;
35use crate::ir::IntermediateRepresentation;
36use cache::TaskLogs;
37use cuenv_core::manifest::Project;
38use graph::{CITaskGraph, CITaskNode};
39use runner::IRTaskRunner;
40use secrets::CIResolvedSecrets;
41use std::collections::HashMap;
42use thiserror::Error;
43use tokio::task::JoinSet;
44
45/// Error types for CI execution
46#[derive(Debug, Error)]
47pub enum ExecutorError {
48    /// Compilation error
49    #[error("Failed to compile project to IR: {0}")]
50    Compilation(String),
51
52    /// Task graph error
53    #[error(transparent)]
54    Graph(#[from] graph::GraphError),
55
56    /// Secret resolution error
57    #[error(transparent)]
58    Secret(#[from] secrets::SecretError),
59
60    /// Cache error
61    #[error(transparent)]
62    Cache(#[from] cache::CacheError),
63
64    /// Task execution error
65    #[error(transparent)]
66    Runner(#[from] runner::RunnerError),
67
68    /// Task panicked during execution
69    #[error("Task panicked: {0}")]
70    TaskPanic(String),
71
72    /// Pipeline not found
73    #[error("Pipeline '{name}' not found. Available: {available}")]
74    PipelineNotFound { name: String, available: String },
75
76    /// No CI configuration
77    #[error("Project has no CI configuration")]
78    NoCIConfig,
79}
80
81/// Result of pipeline execution
82#[derive(Debug)]
83pub struct PipelineResult {
84    /// Whether all tasks succeeded
85    pub success: bool,
86    /// Results for each task
87    pub tasks: Vec<TaskOutput>,
88    /// Total execution time in milliseconds
89    pub duration_ms: u64,
90}
91
92use std::sync::Arc;
93
94/// CI Pipeline Executor
95///
96/// Executes CI pipelines with:
97/// - IR compilation and validation
98/// - Dependency-ordered parallel execution
99/// - Content-addressable caching (pluggable backends)
100/// - Secret resolution and injection
101pub struct CIExecutor {
102    config: CIExecutorConfig,
103    /// Optional injected cache backend (uses local cache if None)
104    cache_backend: Option<Arc<dyn CacheBackend>>,
105}
106
107impl CIExecutor {
108    /// Create a new executor with the given configuration
109    #[must_use]
110    pub fn new(config: CIExecutorConfig) -> Self {
111        Self {
112            config,
113            cache_backend: None,
114        }
115    }
116
117    /// Create an executor with an injected cache backend
118    ///
119    /// This enables using custom cache backends (e.g., remote cache) or
120    /// mock backends for testing.
121    #[must_use]
122    pub fn with_cache_backend(config: CIExecutorConfig, backend: Arc<dyn CacheBackend>) -> Self {
123        Self {
124            config,
125            cache_backend: Some(backend),
126        }
127    }
128
129    /// Check if a custom cache backend is configured
130    #[must_use]
131    pub fn has_custom_cache_backend(&self) -> bool {
132        self.cache_backend.is_some()
133    }
134
135    /// Get the cache backend name (for logging/metrics)
136    #[must_use]
137    pub fn cache_backend_name(&self) -> &'static str {
138        self.cache_backend.as_ref().map_or("local", |b| b.name())
139    }
140
141    /// Execute a pipeline from a project configuration
142    ///
143    /// # Arguments
144    /// * `project` - The project configuration
145    /// * `pipeline_name` - Optional specific pipeline (defaults to "default")
146    ///
147    /// # Errors
148    /// Returns error if compilation fails, tasks fail, or secrets can't be resolved
149    #[tracing::instrument(
150        name = "ci_execute_pipeline",
151        fields(project_root = %self.config.project_root.display()),
152        skip(self, project)
153    )]
154    pub async fn execute_pipeline(
155        &self,
156        project: &Project,
157        pipeline_name: Option<&str>,
158    ) -> std::result::Result<PipelineResult, ExecutorError> {
159        let start = std::time::Instant::now();
160
161        // Step 1: Compile to IR
162        tracing::info!("Compiling project to IR");
163        let compiler = Compiler::new(project.clone());
164        let ir = compiler
165            .compile()
166            .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
167
168        tracing::info!(task_count = ir.tasks.len(), "IR compilation complete");
169
170        // Step 2: Build task graph
171        tracing::info!("Building task graph");
172        let mut task_graph = CITaskGraph::from_ir(&ir)?;
173
174        // Step 3: Resolve secrets for all tasks
175        tracing::info!("Resolving secrets");
176        let all_secrets = self.resolve_all_secrets(&ir)?;
177        let fingerprints = Self::extract_fingerprints(&all_secrets);
178
179        // Step 4: Compute digests with secret fingerprints
180        tracing::info!("Computing task digests");
181        task_graph.compute_digests(&ir, &fingerprints, self.config.secret_salt.as_deref());
182
183        // Step 5: Get parallel groups for execution
184        let parallel_groups = task_graph.get_parallel_groups()?;
185        tracing::info!(groups = parallel_groups.len(), "Execution groups computed");
186
187        // Step 6: Execute groups
188        let cache_root = self.config.effective_cache_root();
189        let mut all_results = Vec::new();
190        let mut pipeline_success = true;
191
192        for (group_idx, group) in parallel_groups.iter().enumerate() {
193            tracing::info!(
194                group = group_idx,
195                tasks = group.len(),
196                "Executing task group"
197            );
198
199            let group_results = self
200                .execute_group(group, &ir, &cache_root, &all_secrets)
201                .await?;
202
203            // Check for failures
204            for result in &group_results {
205                if !result.success {
206                    tracing::warn!(task = %result.task_id, "Task failed");
207                    pipeline_success = false;
208                }
209            }
210
211            all_results.extend(group_results);
212
213            // Fail fast: stop if any task in the group failed
214            if !pipeline_success {
215                tracing::warn!("Pipeline failed, aborting remaining groups");
216                break;
217            }
218        }
219
220        let duration = start.elapsed();
221
222        Ok(PipelineResult {
223            success: pipeline_success,
224            tasks: all_results,
225            duration_ms: u64::try_from(duration.as_millis()).unwrap_or(u64::MAX),
226        })
227    }
228
229    /// Execute a single group of tasks (can run in parallel)
230    async fn execute_group(
231        &self,
232        group: &[&CITaskNode],
233        ir: &IntermediateRepresentation,
234        cache_root: &std::path::Path,
235        all_secrets: &HashMap<String, CIResolvedSecrets>,
236    ) -> std::result::Result<Vec<TaskOutput>, ExecutorError> {
237        let mut results = Vec::new();
238
239        if self.config.max_parallel <= 1 || group.len() == 1 {
240            // Sequential execution
241            for node in group {
242                let result = self
243                    .execute_single_task(node, ir, cache_root, all_secrets)
244                    .await?;
245                results.push(result);
246            }
247        } else {
248            // Parallel execution with JoinSet
249            let mut join_set = JoinSet::new();
250
251            for node in group {
252                // Check cache first
253                let cache_result = cache::check_cache(
254                    &node.task,
255                    &node.digest,
256                    cache_root,
257                    self.config.cache_policy_override,
258                );
259
260                if cache_result.hit {
261                    tracing::info!(task = %node.id, "Cache hit, skipping execution");
262                    results.push(TaskOutput::from_cache(node.id.clone(), 0));
263                    continue;
264                }
265
266                if self.config.dry_run {
267                    tracing::info!(task = %node.id, "Would execute (dry-run)");
268                    results.push(TaskOutput::dry_run(node.id.clone()));
269                    continue;
270                }
271
272                // Prepare execution context
273                let task = node.task.clone();
274                let digest = node.digest.clone();
275                let project_root = self.config.project_root.clone();
276                let capture = self.config.capture_output;
277                let cache_root_owned = cache_root.to_path_buf();
278                let policy_override = self.config.cache_policy_override;
279
280                // Build environment with secrets
281                let mut env = task.env.clone();
282                if let Some(resolved) = all_secrets.get(&task.id) {
283                    for (name, value) in &resolved.values {
284                        env.insert(name.clone(), value.clone());
285                    }
286                }
287
288                // Spawn task execution
289                join_set.spawn(async move {
290                    let runner = IRTaskRunner::new(project_root, capture);
291                    let result = runner.execute(&task, env).await;
292                    (task, digest, cache_root_owned, policy_override, result)
293                });
294            }
295
296            // Collect results
297            while let Some(join_result) = join_set.join_next().await {
298                let (task, digest, cache_root_owned, policy_override, exec_result) =
299                    join_result.map_err(|e| ExecutorError::TaskPanic(e.to_string()))?;
300
301                let output = exec_result?;
302
303                // Store in cache if successful
304                if output.success {
305                    cache::store_result(
306                        &task,
307                        &digest,
308                        &cache_root_owned,
309                        &TaskLogs {
310                            stdout: Some(output.stdout.clone()),
311                            stderr: Some(output.stderr.clone()),
312                        },
313                        output.duration_ms,
314                        output.exit_code,
315                        policy_override,
316                    )?;
317                }
318
319                results.push(output);
320            }
321        }
322
323        Ok(results)
324    }
325
326    /// Execute a single task with cache checking
327    async fn execute_single_task(
328        &self,
329        node: &CITaskNode,
330        _ir: &IntermediateRepresentation,
331        cache_root: &std::path::Path,
332        all_secrets: &HashMap<String, CIResolvedSecrets>,
333    ) -> std::result::Result<TaskOutput, ExecutorError> {
334        // Check cache
335        let cache_result = cache::check_cache(
336            &node.task,
337            &node.digest,
338            cache_root,
339            self.config.cache_policy_override,
340        );
341
342        if cache_result.hit {
343            tracing::info!(task = %node.id, "Cache hit, skipping execution");
344            return Ok(TaskOutput::from_cache(node.id.clone(), 0));
345        }
346
347        if self.config.dry_run {
348            tracing::info!(task = %node.id, "Would execute (dry-run)");
349            return Ok(TaskOutput::dry_run(node.id.clone()));
350        }
351
352        // Build environment with secrets
353        let mut env = node.task.env.clone();
354        if let Some(resolved) = all_secrets.get(&node.id) {
355            for (name, value) in &resolved.values {
356                env.insert(name.clone(), value.clone());
357            }
358        }
359
360        // Execute
361        let runner =
362            IRTaskRunner::new(self.config.project_root.clone(), self.config.capture_output);
363        let output = runner.execute(&node.task, env).await?;
364
365        // Store in cache if successful
366        if output.success {
367            cache::store_result(
368                &node.task,
369                &node.digest,
370                cache_root,
371                &TaskLogs {
372                    stdout: Some(output.stdout.clone()),
373                    stderr: Some(output.stderr.clone()),
374                },
375                output.duration_ms,
376                output.exit_code,
377                self.config.cache_policy_override,
378            )?;
379        }
380
381        Ok(output)
382    }
383
384    /// Resolve secrets for all tasks
385    fn resolve_all_secrets(
386        &self,
387        ir: &IntermediateRepresentation,
388    ) -> std::result::Result<HashMap<String, CIResolvedSecrets>, secrets::SecretError> {
389        secrets::resolve_all_task_secrets(&ir.tasks, self.config.secret_salt.as_deref())
390    }
391
392    /// Extract fingerprints from resolved secrets
393    fn extract_fingerprints(
394        all_secrets: &HashMap<String, CIResolvedSecrets>,
395    ) -> HashMap<String, HashMap<String, String>> {
396        all_secrets
397            .iter()
398            .map(|(task_id, resolved)| (task_id.clone(), resolved.fingerprints().clone()))
399            .collect()
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use crate::ir::{CachePolicy, PipelineMetadata, StageConfiguration, Task as IRTask};
407
408    #[allow(dead_code)]
409    fn make_simple_ir(tasks: Vec<IRTask>) -> IntermediateRepresentation {
410        IntermediateRepresentation {
411            version: "1.4".to_string(),
412            pipeline: PipelineMetadata {
413                name: "test".to_string(),
414                environment: None,
415                requires_onepassword: false,
416                project_name: None,
417                trigger: None,
418            },
419            runtimes: vec![],
420            stages: StageConfiguration::default(),
421            tasks,
422        }
423    }
424
425    #[allow(dead_code)]
426    fn make_task(id: &str, deps: &[&str]) -> IRTask {
427        IRTask {
428            id: id.to_string(),
429            runtime: None,
430            command: vec!["echo".to_string(), id.to_string()],
431            shell: false,
432            env: HashMap::new(),
433            secrets: HashMap::new(),
434            resources: None,
435            concurrency_group: None,
436            inputs: vec![],
437            outputs: vec![],
438            depends_on: deps.iter().map(|s| (*s).to_string()).collect(),
439            cache_policy: CachePolicy::Normal,
440            deployment: false,
441            manual_approval: false,
442        }
443    }
444
445    #[test]
446    fn test_executor_config_builder() {
447        let config = CIExecutorConfig::new(std::path::PathBuf::from("/project"))
448            .with_max_parallel(8)
449            .with_dry_run(true);
450
451        assert_eq!(config.max_parallel, 8);
452        assert!(config.dry_run);
453    }
454
455    #[test]
456    fn test_extract_fingerprints() {
457        temp_env::with_var("TEST_EXTRACT_FP_SECRET", Some("test_value"), || {
458            let executor = CIExecutor::new(CIExecutorConfig::default());
459
460            let mut secrets = HashMap::new();
461
462            let secret_configs = HashMap::from([(
463                "api_key".to_string(),
464                crate::ir::SecretConfig {
465                    source: "TEST_EXTRACT_FP_SECRET".to_string(),
466                    cache_key: true,
467                },
468            )]);
469
470            let resolved = CIResolvedSecrets::from_env(&secret_configs, Some("test-salt")).unwrap();
471            secrets.insert("task1".to_string(), resolved);
472
473            let _ = executor; // silence unused warning
474            let fingerprints = CIExecutor::extract_fingerprints(&secrets);
475
476            assert!(fingerprints.contains_key("task1"));
477            assert!(fingerprints["task1"].contains_key("api_key"));
478        });
479    }
480}