Skip to main content

cuenv_ci/executor/
orchestrator.rs

1//! CI Pipeline Orchestrator
2//!
3//! Main entry point for CI pipeline execution, integrating with the provider
4//! system for context detection, file change tracking, and reporting.
5//!
6//! This module orchestrates complex async workflows with caching, concurrency control,
7//! and multi-project coordination. The complexity is inherent to the domain.
8
9// CI orchestration has inherent complexity - coordinates async tasks, caching, reporting
10#![allow(clippy::cognitive_complexity, clippy::too_many_lines)]
11
12use crate::affected::{compute_affected_tasks, matched_inputs_for_task};
13use crate::compiler::Compiler;
14use crate::discovery::evaluate_module_from_cwd;
15use crate::ir::CachePolicy;
16use crate::provider::CIProvider;
17use crate::report::json::write_report;
18use crate::report::{ContextReport, PipelineReport, PipelineStatus, TaskReport, TaskStatus};
19use chrono::Utc;
20use cuenv_core::cue::discovery::find_ancestor_env_files;
21use cuenv_core::lockfile::{LOCKFILE_NAME, LockedToolPlatform, Lockfile};
22use cuenv_core::manifest::Project;
23use cuenv_core::tasks::{TaskGraph, TaskIndex};
24use cuenv_core::tools::{
25    Platform, ResolvedTool, ResolvedToolActivationStep, ToolActivationResolveOptions, ToolExtract,
26    ToolOptions, ToolRegistry, ToolSource, apply_resolved_tool_activation, resolve_tool_activation,
27    validate_tool_activation,
28};
29use cuenv_core::{DryRun, Result};
30use cuenv_hooks::{
31    ExecutionStatus, HookExecutionConfig, HookExecutionState, StateManager, compute_instance_hash,
32    execute_hooks,
33};
34use std::collections::{BTreeMap, HashMap, HashSet};
35use std::path::{Path, PathBuf};
36use std::sync::Arc;
37
38use super::ExecutorError;
39use super::config::CIExecutorConfig;
40use super::runner::{IRTaskRunner, TaskOutput};
41
42/// Run the CI pipeline logic
43///
44/// This is the main entry point for CI execution, integrating with the provider
45/// system for context detection, file change tracking, and reporting.
46///
47/// # Arguments
48///
49/// * `provider` - The CI provider to use for changed files detection and reporting
50/// * `dry_run` - Whether to skip actual task execution
51/// * `specific_pipeline` - If set, only run tasks from this pipeline
52/// * `environment` - Optional environment override for secrets resolution
53/// * `path_filter` - If set, only process projects under this path (relative to module root)
54///
55/// # Errors
56/// Returns error if IO errors occur or tasks fail
57#[allow(clippy::too_many_lines)]
58pub async fn run_ci(
59    provider: Arc<dyn CIProvider>,
60    dry_run: DryRun,
61    specific_pipeline: Option<String>,
62    environment: Option<String>,
63    path_filter: Option<&str>,
64) -> Result<()> {
65    let context = provider.context();
66    cuenv_events::emit_ci_context!(&context.provider, &context.event, &context.ref_name);
67
68    // Get changed files
69    let changed_files = provider.changed_files().await?;
70    cuenv_events::emit_ci_changed_files!(changed_files.len());
71
72    // Evaluate module and discover projects
73    let module = evaluate_module_from_cwd()?;
74    let project_count = module.project_count();
75    if project_count == 0 {
76        return Err(cuenv_core::Error::configuration(
77            "No cuenv projects found. Ensure env.cue files declare 'package cuenv'",
78        ));
79    }
80    cuenv_events::emit_ci_projects_discovered!(project_count);
81
82    // Collect projects with their configs
83    let mut projects: Vec<(PathBuf, Project)> = Vec::new();
84    for instance in module.projects() {
85        let config = Project::try_from(instance)?;
86        let project_path = module.root.join(&instance.path);
87        projects.push((project_path, config));
88    }
89
90    // Filter projects by path if specified (and not the default ".")
91    let projects: Vec<(PathBuf, Project)> = match path_filter {
92        Some(filter) if filter != "." => {
93            let filter_path = module.root.join(filter);
94            projects
95                .into_iter()
96                .filter(|(path, _)| path.starts_with(&filter_path))
97                .collect()
98        }
99        _ => projects,
100    };
101
102    if projects.is_empty() {
103        return Err(cuenv_core::Error::configuration(format!(
104            "No cuenv projects found under path '{}'",
105            path_filter.unwrap_or(".")
106        )));
107    }
108
109    // Build project maps for cross-project dependency resolution and hook lookup.
110    let mut project_map = HashMap::new();
111    let mut project_configs = HashMap::new();
112    for (path, config) in &projects {
113        let name = config.name.trim();
114        if !name.is_empty() {
115            project_map.insert(name.to_string(), (path.clone(), config.clone()));
116        }
117
118        project_configs.insert(path.clone(), config.clone());
119        if let Ok(canonical) = path.canonicalize() {
120            project_configs.insert(canonical, config.clone());
121        }
122    }
123
124    // Track failures with structured errors
125    let mut failures: Vec<(String, cuenv_core::Error)> = Vec::new();
126
127    // Process each project
128    for (project_path, config) in &projects {
129        // Determine pipeline to run
130        let pipeline_name = specific_pipeline
131            .clone()
132            .unwrap_or_else(|| "default".to_string());
133
134        // Find pipeline in config
135        let Some(ci) = &config.ci else {
136            return Err(cuenv_core::Error::configuration(format!(
137                "Project {} has no CI configuration",
138                project_path.display()
139            )));
140        };
141
142        let available_pipelines: Vec<&str> = ci.pipelines.keys().map(String::as_str).collect();
143        let Some(pipeline) = ci.pipelines.get(&pipeline_name) else {
144            return Err(cuenv_core::Error::configuration(format!(
145                "Pipeline '{}' not found in project {}. Available pipelines: {}",
146                pipeline_name,
147                project_path.display(),
148                available_pipelines.join(", ")
149            )));
150        };
151
152        let resolved_environment =
153            resolve_environment(environment.as_deref(), pipeline.environment.as_deref());
154
155        // Extract task names from pipeline tasks (which can be simple strings or matrix tasks)
156        let pipeline_task_names: Vec<String> = pipeline
157            .tasks
158            .iter()
159            .map(|t| t.task_name().to_string())
160            .collect();
161
162        // For release events, run all tasks unconditionally (no affected-file filtering)
163        let tasks_to_run = if context.event == "release" {
164            pipeline_task_names
165        } else {
166            compute_affected_tasks(
167                &changed_files,
168                &pipeline_task_names,
169                project_path,
170                config,
171                &project_map,
172            )
173        };
174
175        if tasks_to_run.is_empty() {
176            cuenv_events::emit_ci_project_skipped!(project_path.display(), "No affected tasks");
177            continue;
178        }
179
180        tracing::info!(
181            project = %project_path.display(),
182            tasks = ?tasks_to_run,
183            "Running tasks for project"
184        );
185
186        if !dry_run.is_dry_run() {
187            let result = execute_project_pipeline(&PipelineExecutionRequest {
188                project_path,
189                config,
190                pipeline_name: &pipeline_name,
191                tasks_to_run: &tasks_to_run,
192                environment: resolved_environment.as_deref(),
193                context,
194                changed_files: &changed_files,
195                provider: provider.as_ref(),
196                project_configs: &project_configs,
197            })
198            .await;
199
200            match result {
201                Err(e) => {
202                    tracing::error!(error = %e, "Pipeline execution error");
203                    let project_name = project_path.display().to_string();
204                    failures.push((project_name, e));
205                }
206                Ok((status, task_errors)) => {
207                    if status == PipelineStatus::Failed {
208                        failures.extend(task_errors);
209                    }
210                }
211            }
212        }
213    }
214
215    if !failures.is_empty() {
216        let details = failures
217            .iter()
218            .map(|(project, err)| format!("  [{project}]\n    {err}"))
219            .collect::<Vec<_>>()
220            .join("\n\n");
221        return Err(cuenv_core::Error::execution(format!(
222            "CI pipeline failed:\n\n{details}"
223        )));
224    }
225
226    Ok(())
227}
228
229/// All parameters needed to execute a project pipeline.
230pub struct PipelineExecutionRequest<'a> {
231    pub project_path: &'a Path,
232    pub config: &'a Project,
233    pub pipeline_name: &'a str,
234    pub tasks_to_run: &'a [String],
235    pub environment: Option<&'a str>,
236    pub context: &'a crate::context::CIContext,
237    pub changed_files: &'a [PathBuf],
238    pub provider: &'a dyn CIProvider,
239    pub project_configs: &'a HashMap<PathBuf, Project>,
240}
241
242/// Execute a project's pipeline and handle reporting
243///
244/// Returns the pipeline status and a list of task failures (project path, error).
245#[allow(clippy::too_many_lines)] // Complex orchestration logic
246async fn execute_project_pipeline(
247    request: &PipelineExecutionRequest<'_>,
248) -> Result<(PipelineStatus, Vec<(String, cuenv_core::Error)>)> {
249    let project_path = request.project_path;
250    let config = request.config;
251    let pipeline_name = request.pipeline_name;
252    let tasks_to_run = request.tasks_to_run;
253    let environment = request.environment;
254    let context = request.context;
255    let changed_files = request.changed_files;
256    let provider = request.provider;
257    let project_configs = request.project_configs;
258
259    let start_time = Utc::now();
260    let mut tasks_reports = Vec::new();
261    let mut pipeline_status = PipelineStatus::Success;
262    let mut task_errors: Vec<(String, cuenv_core::Error)> = Vec::new();
263    let project_display = project_path.display().to_string();
264
265    // Determine cache policy override based on context
266    let cache_policy_override = if is_fork_pr(context) {
267        Some(CachePolicy::Readonly)
268    } else {
269        None
270    };
271
272    // Create executor configuration with salt rotation support
273    let mut executor_config = CIExecutorConfig::new(project_path.to_path_buf())
274        .with_capture_output(cuenv_core::OutputCapture::Capture)
275        .with_dry_run(DryRun::No)
276        .with_secret_salt(std::env::var("CUENV_SECRET_SALT").unwrap_or_default());
277
278    // Add previous salt for rotation support
279    if let Ok(prev_salt) = std::env::var("CUENV_SECRET_SALT_PREV")
280        && !prev_salt.is_empty()
281    {
282        executor_config = executor_config.with_secret_salt_prev(prev_salt);
283    }
284
285    let _executor_config = if let Some(policy) = cache_policy_override {
286        executor_config.with_cache_policy_override(policy)
287    } else {
288        executor_config
289    };
290
291    // Register common CI secret patterns for redaction.
292    // These are typically passed via GitHub Actions secrets or similar.
293    register_ci_secrets();
294
295    // Merge static + hook-generated environment once per project, then reuse for all tasks.
296    let hook_env = build_hook_environment(project_path, config, project_configs).await?;
297
298    // Execute tasks
299    for task_name in tasks_to_run {
300        let inputs_matched =
301            matched_inputs_for_task(task_name, config, changed_files, project_path);
302        let outputs = config
303            .tasks
304            .get(task_name)
305            .and_then(|def| def.as_task())
306            .map(|task| task.outputs.clone())
307            .unwrap_or_default();
308
309        cuenv_events::emit_ci_task_executing!(&project_display, task_name);
310        let task_start = std::time::Instant::now();
311
312        // Execute the task with all dependencies (uses TaskGraph for proper ordering)
313        let result = execute_task_with_deps(
314            config,
315            task_name,
316            project_path,
317            cache_policy_override,
318            environment,
319            &hook_env,
320        )
321        .await;
322
323        let duration = u64::try_from(task_start.elapsed().as_millis()).unwrap_or(0);
324
325        let (status, exit_code, cache_key) = match result {
326            Ok(output) => {
327                if output.success {
328                    cuenv_events::emit_ci_task_result!(&project_display, task_name, true);
329                    (
330                        TaskStatus::Success,
331                        Some(output.exit_code),
332                        if output.from_cache {
333                            Some(format!("cached:{}", output.task_id))
334                        } else {
335                            Some(output.task_id)
336                        },
337                    )
338                } else {
339                    cuenv_events::emit_ci_task_result!(&project_display, task_name, false);
340                    pipeline_status = PipelineStatus::Failed;
341                    // Capture task failure with structured error
342                    task_errors.push((
343                        project_display.clone(),
344                        cuenv_core::Error::task_failed(
345                            task_name,
346                            output.exit_code,
347                            &output.stdout,
348                            &output.stderr,
349                        ),
350                    ));
351                    (TaskStatus::Failed, Some(output.exit_code), None)
352                }
353            }
354            Err(e) => {
355                tracing::error!(error = %e, task = task_name, "Task execution error");
356                cuenv_events::emit_ci_task_result!(&project_display, task_name, false);
357                pipeline_status = PipelineStatus::Failed;
358                // Capture execution error with structured error
359                task_errors.push((project_display.clone(), e.into()));
360                (TaskStatus::Failed, None, None)
361            }
362        };
363
364        tasks_reports.push(TaskReport {
365            name: task_name.clone(),
366            status,
367            duration_ms: duration,
368            exit_code,
369            cache_key,
370            inputs_matched,
371            outputs,
372        });
373    }
374
375    let completed_at = Utc::now();
376    #[allow(clippy::cast_sign_loss)]
377    let duration_ms = (completed_at - start_time).num_milliseconds() as u64;
378
379    // Generate report
380    let report = PipelineReport {
381        version: cuenv_core::VERSION.to_string(),
382        project: project_path.display().to_string(),
383        pipeline: pipeline_name.to_string(),
384        context: ContextReport {
385            provider: context.provider.clone(),
386            event: context.event.clone(),
387            ref_name: context.ref_name.clone(),
388            base_ref: context.base_ref.clone(),
389            sha: context.sha.clone(),
390            changed_files: changed_files
391                .iter()
392                .map(|p| p.to_string_lossy().to_string())
393                .collect(),
394        },
395        started_at: start_time,
396        completed_at: Some(completed_at),
397        duration_ms: Some(duration_ms),
398        status: pipeline_status,
399        tasks: tasks_reports,
400    };
401
402    // Write reports and notify provider
403    write_pipeline_report(&report, context, project_path);
404    notify_provider(provider, &report, pipeline_name).await;
405
406    Ok((pipeline_status, task_errors))
407}
408
409/// Write pipeline report to disk
410fn write_pipeline_report(
411    report: &PipelineReport,
412    context: &crate::context::CIContext,
413    project_path: &Path,
414) {
415    // Ensure report directory exists
416    let report_dir = Path::new(".cuenv/reports");
417    if let Err(e) = std::fs::create_dir_all(report_dir) {
418        tracing::warn!(error = %e, "Failed to create report directory");
419        return;
420    }
421
422    let sha_dir = report_dir.join(&context.sha);
423    let _ = std::fs::create_dir_all(&sha_dir);
424
425    let project_filename = project_path.display().to_string().replace(['/', '\\'], "-") + ".json";
426    let report_path = sha_dir.join(project_filename);
427
428    if let Err(e) = write_report(report, &report_path) {
429        tracing::warn!(error = %e, "Failed to write report");
430    } else {
431        cuenv_events::emit_ci_report!(report_path.display());
432    }
433
434    // Write GitHub Job Summary
435    if let Err(e) = crate::report::markdown::write_job_summary(report) {
436        tracing::warn!(error = %e, "Failed to write job summary");
437    }
438}
439
440/// Notify CI provider about pipeline results
441async fn notify_provider(provider: &dyn CIProvider, report: &PipelineReport, pipeline_name: &str) {
442    // Post results to CI provider
443    let check_name = format!("cuenv: {pipeline_name}");
444    match provider.create_check(&check_name).await {
445        Ok(handle) => {
446            if let Err(e) = provider.complete_check(&handle, report).await {
447                tracing::warn!(error = %e, "Failed to complete check run");
448            }
449        }
450        Err(e) => {
451            tracing::warn!(error = %e, "Failed to create check run");
452        }
453    }
454
455    // Post PR comment with report summary
456    if let Err(e) = provider.upload_report(report).await {
457        tracing::warn!(error = %e, "Failed to post PR comment");
458    }
459}
460
461/// Check if this is a fork PR (should use readonly cache)
462fn is_fork_pr(context: &crate::context::CIContext) -> bool {
463    // Fork PRs typically have a different head repo than base repo
464    // This is a simplified check - providers may need more sophisticated detection
465    context.event == "pull_request" && context.ref_name.starts_with("refs/pull/")
466}
467
468/// Register common CI secret environment variables for redaction.
469///
470/// This ensures that secrets passed via CI provider (GitHub Actions, etc.)
471/// are automatically redacted from task output.
472fn register_ci_secrets() {
473    // Common secret environment variable patterns
474    const SECRET_PATTERNS: &[&str] = &[
475        "GITHUB_TOKEN",
476        "GH_TOKEN",
477        "ACTIONS_RUNTIME_TOKEN",
478        "ACTIONS_ID_TOKEN_REQUEST_TOKEN",
479        "AWS_SECRET_ACCESS_KEY",
480        "AWS_SESSION_TOKEN",
481        "AZURE_CLIENT_SECRET",
482        "GCP_SERVICE_ACCOUNT_KEY",
483        "CACHIX_AUTH_TOKEN",
484        "CODECOV_TOKEN",
485        "CUE_REGISTRY_TOKEN",
486        "VSCE_PAT",
487        "NPM_TOKEN",
488        "CARGO_REGISTRY_TOKEN",
489        "PYPI_TOKEN",
490        "DOCKER_PASSWORD",
491        "CLOUDFLARE_API_TOKEN",
492        "OP_SERVICE_ACCOUNT_TOKEN",
493        "CUENV_SECRET_SALT",
494        "CUENV_SECRET_SALT_PREV",
495    ];
496
497    for pattern in SECRET_PATTERNS {
498        if let Ok(value) = std::env::var(pattern) {
499            cuenv_events::register_secret(value);
500        }
501    }
502}
503
504fn resolve_environment(
505    cli_environment: Option<&str>,
506    pipeline_environment: Option<&str>,
507) -> Option<String> {
508    if let Some(env) = cli_environment.filter(|name| !name.is_empty()) {
509        return Some(env.to_string());
510    }
511
512    if let Ok(env) = std::env::var("CUENV_ENVIRONMENT")
513        && !env.is_empty()
514    {
515        return Some(env);
516    }
517
518    pipeline_environment
519        .filter(|name| !name.is_empty())
520        .map(|name| name.to_string())
521}
522
523/// Build the project environment by merging static env with hook-generated values.
524async fn build_hook_environment(
525    project_root: &Path,
526    config: &Project,
527    project_configs: &HashMap<PathBuf, Project>,
528) -> Result<BTreeMap<String, String>> {
529    let static_env = extract_static_env_vars(config);
530    let hooks = collect_hooks_from_ancestors(project_root, config, project_configs)?;
531
532    if hooks.is_empty() {
533        return Ok(static_env);
534    }
535
536    let config_hash = cuenv_hooks::compute_execution_hash(&hooks, project_root);
537    let instance_hash = compute_instance_hash(project_root, &config_hash);
538
539    let state_dir = if let Ok(dir) = std::env::var("CUENV_STATE_DIR") {
540        PathBuf::from(dir)
541    } else {
542        StateManager::default_state_dir()?
543    };
544    let state_manager = StateManager::new(state_dir);
545
546    let hook_config = HookExecutionConfig {
547        default_timeout_seconds: 600,
548        fail_fast: true,
549        state_dir: None,
550    };
551
552    let mut state = HookExecutionState::new(
553        project_root.to_path_buf(),
554        instance_hash,
555        config_hash,
556        hooks.clone(),
557    );
558
559    execute_hooks(
560        hooks,
561        project_root,
562        &hook_config,
563        &state_manager,
564        &mut state,
565    )
566    .await?;
567
568    match state.status {
569        ExecutionStatus::Completed | ExecutionStatus::Failed => {
570            Ok(collect_all_env_vars(config, &state.environment_vars))
571        }
572        ExecutionStatus::Running | ExecutionStatus::Cancelled => Ok(static_env),
573    }
574}
575
576/// Collect `onEnter` hooks from ancestor env.cue files (root-to-leaf order).
577fn collect_hooks_from_ancestors(
578    project_root: &Path,
579    config: &Project,
580    project_configs: &HashMap<PathBuf, Project>,
581) -> Result<Vec<cuenv_hooks::Hook>> {
582    let ancestors = find_ancestor_env_files(project_root, "cuenv")?;
583    let ancestors_len = ancestors.len();
584    let mut all_hooks = Vec::new();
585
586    for (idx, ancestor_dir) in ancestors.into_iter().enumerate() {
587        let is_current_dir = idx + 1 == ancestors_len;
588        let source_config = if is_current_dir {
589            Some(config)
590        } else {
591            project_configs.get(&ancestor_dir).or_else(|| {
592                ancestor_dir
593                    .canonicalize()
594                    .ok()
595                    .and_then(|canonical| project_configs.get(&canonical))
596            })
597        };
598
599        let Some(source_config) = source_config else {
600            continue;
601        };
602
603        let mut hooks = source_config.on_enter_hooks();
604        for hook in &mut hooks {
605            resolve_hook_dir(hook, &ancestor_dir);
606        }
607
608        // Ancestor hooks only run when propagate=true.
609        if !is_current_dir {
610            hooks.retain(|hook| hook.propagate);
611        }
612
613        all_hooks.extend(hooks);
614    }
615
616    Ok(all_hooks)
617}
618
619/// Resolve hook.dir relative to the env.cue directory where the hook is defined.
620fn resolve_hook_dir(hook: &mut cuenv_hooks::Hook, env_cue_dir: &Path) {
621    let relative_dir = hook.dir.as_deref().unwrap_or(".");
622    let absolute_dir = env_cue_dir.join(relative_dir);
623    let resolved = absolute_dir.canonicalize().unwrap_or(absolute_dir);
624    hook.dir = Some(resolved.to_string_lossy().to_string());
625}
626
627/// Extract static (non-secret) environment variables from config.
628fn extract_static_env_vars(config: &Project) -> BTreeMap<String, String> {
629    let mut env_vars = BTreeMap::new();
630    if let Some(env) = &config.env {
631        for (key, value) in &env.base {
632            if value.is_secret() {
633                continue;
634            }
635            env_vars.insert(key.clone(), value.to_string_value());
636        }
637    }
638    env_vars
639}
640
641/// Merge static config env vars with hook-generated values (hooks win).
642fn collect_all_env_vars(
643    config: &Project,
644    hook_env: &std::collections::HashMap<String, String>,
645) -> BTreeMap<String, String> {
646    let mut merged = extract_static_env_vars(config);
647    for (key, value) in hook_env {
648        merged.insert(key.clone(), value.clone());
649    }
650    merged
651}
652
653/// Execute a task with all its dependencies in correct order.
654///
655/// Uses TaskIndex to flatten nested tasks and TaskGraph to resolve dependencies,
656/// ensuring tasks run in proper topological order (same as CLI).
657async fn execute_task_with_deps(
658    config: &Project,
659    task_name: &str,
660    project_root: &Path,
661    cache_policy_override: Option<CachePolicy>,
662    environment: Option<&str>,
663    hook_env: &BTreeMap<String, String>,
664) -> std::result::Result<TaskOutput, ExecutorError> {
665    // 1. Build TaskIndex (same flattening as CLI)
666    let index =
667        TaskIndex::build(&config.tasks).map_err(|e| ExecutorError::Compilation(e.to_string()))?;
668
669    // 2. Resolve to canonical name
670    let entry = index
671        .resolve(task_name)
672        .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
673    let canonical_name = entry.name.clone();
674
675    // 3. Get flattened tasks where all names are top-level
676    let flattened_tasks = index.to_tasks();
677
678    // 4. Build TaskGraph (respects dependsOn!)
679    let mut graph = TaskGraph::new();
680    graph
681        .build_for_task(&canonical_name, &flattened_tasks)
682        .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
683
684    // 5. Get topological execution order
685    let execution_order = graph
686        .topological_sort()
687        .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
688
689    tracing::info!(
690        task = task_name,
691        canonical = %canonical_name,
692        execution_order = ?execution_order.iter().map(|n| &n.name).collect::<Vec<_>>(),
693        "Resolved task dependencies"
694    );
695
696    // 6. Execute each task in dependency order
697    let mut final_output = None;
698    for node in execution_order {
699        let output = compile_and_execute_ir(
700            config,
701            &node.name,
702            project_root,
703            cache_policy_override,
704            environment,
705            hook_env,
706        )
707        .await?;
708
709        if !output.success {
710            return Ok(output); // Stop on first failure
711        }
712        final_output = Some(output);
713    }
714
715    final_output.ok_or_else(|| ExecutorError::Compilation("No tasks to execute".into()))
716}
717
718/// Compile a single task to IR and execute it.
719///
720/// This is the inner execution loop - it does NOT handle dependencies.
721/// Dependencies are resolved by the outer loop using TaskGraph.
722/// Uses the Compiler to convert task definitions to IR.
723async fn compile_and_execute_ir(
724    config: &Project,
725    task_name: &str,
726    project_root: &Path,
727    cache_policy_override: Option<CachePolicy>,
728    environment: Option<&str>,
729    hook_env: &BTreeMap<String, String>,
730) -> std::result::Result<TaskOutput, ExecutorError> {
731    let start = std::time::Instant::now();
732
733    // Use the Compiler to compile the task (handles both single tasks and groups)
734    let options = crate::compiler::CompilerOptions {
735        project_root: Some(project_root.to_path_buf()),
736        ..Default::default()
737    };
738    let compiler = Compiler::with_options(config.clone(), options);
739    let ir = compiler
740        .compile_task(task_name)
741        .map_err(|e| ExecutorError::Compilation(e.to_string()))?;
742
743    if ir.tasks.is_empty() {
744        return Err(ExecutorError::Compilation(format!(
745            "Task '{task_name}' produced no executable tasks"
746        )));
747    }
748
749    // Resolve secrets from project environment (same as CLI).
750    // Prefer an explicit environment name, then fall back to CUENV_ENVIRONMENT.
751    let env_name = environment
752        .filter(|name| !name.is_empty())
753        .map(str::to_string)
754        .or_else(|| {
755            std::env::var("CUENV_ENVIRONMENT")
756                .ok()
757                .filter(|name| !name.is_empty())
758        });
759    let project_env_vars = config
760        .env
761        .as_ref()
762        .map(|env| match env_name.as_deref() {
763            Some(name) => env.for_environment(name),
764            None => env.base.clone(),
765        })
766        .unwrap_or_default();
767    let (resolved_env, secrets) =
768        cuenv_core::environment::Environment::resolve_for_task_with_secrets(
769            task_name,
770            &project_env_vars,
771        )
772        .await
773        .map_err(|e| ExecutorError::Compilation(format!("Secret resolution failed: {e}")))?;
774
775    // Register resolved secrets for redaction
776    cuenv_events::register_secrets(secrets.into_iter());
777
778    // Execute all compiled IR tasks sequentially
779    let runner = IRTaskRunner::new(
780        project_root.to_path_buf(),
781        cuenv_core::OutputCapture::Capture,
782    );
783    let mut combined_stdout = String::new();
784    let mut combined_stderr = String::new();
785    let mut all_success = true;
786    let mut last_exit_code = 0;
787
788    // Ensure tools are downloaded before getting activation paths.
789    // Tool activation failures are fatal.
790    ensure_tools_downloaded(project_root).await?;
791    let activation_steps = resolve_tool_activation_steps(project_root)?;
792    if !activation_steps.is_empty() {
793        tracing::debug!(
794            steps = activation_steps.len(),
795            "Applying configured tool activation operations for CI task execution"
796        );
797    }
798
799    for ir_task in &ir.tasks {
800        // Build environment with explicit precedence:
801        // task env > resolved project env > hook/static env.
802        let mut env: BTreeMap<String, String> = hook_env.clone();
803        for (key, value) in &resolved_env {
804            env.insert(key.clone(), value.clone());
805        }
806        for (key, value) in &ir_task.env {
807            env.insert(key.clone(), value.clone());
808        }
809
810        for step in &activation_steps {
811            let current = env.get(&step.var).map(String::as_str);
812            if let Some(new_value) = apply_resolved_tool_activation(current, step) {
813                env.insert(step.var.clone(), new_value);
814            }
815        }
816
817        if !env.contains_key("HOME")
818            && let Ok(home) = std::env::var("HOME")
819        {
820            env.insert("HOME".to_string(), home);
821        }
822
823        // Apply cache policy override if specified
824        let mut task_to_run = ir_task.clone();
825        if let Some(policy) = cache_policy_override {
826            task_to_run.cache_policy = policy;
827        }
828
829        let output = runner.execute(&task_to_run, env).await?;
830
831        combined_stdout.push_str(&output.stdout);
832        combined_stderr.push_str(&output.stderr);
833        last_exit_code = output.exit_code;
834
835        if !output.success {
836            all_success = false;
837            break; // Stop on first failure
838        }
839    }
840
841    let duration = start.elapsed();
842    let duration_ms = u64::try_from(duration.as_millis()).unwrap_or(u64::MAX);
843
844    Ok(TaskOutput {
845        task_id: task_name.to_string(),
846        exit_code: last_exit_code,
847        stdout: combined_stdout,
848        stderr: combined_stderr,
849        success: all_success,
850        from_cache: false,
851        duration_ms,
852    })
853}
854
855// ============================================================================
856// Tool Activation Helpers
857// ============================================================================
858
859/// Find the lockfile starting from a directory.
860fn find_lockfile(start_dir: &Path) -> Option<PathBuf> {
861    let lockfile_path = start_dir.join(LOCKFILE_NAME);
862    if lockfile_path.exists() {
863        return Some(lockfile_path);
864    }
865
866    // Check parent directories
867    let mut current = start_dir.parent();
868    while let Some(dir) = current {
869        let lockfile_path = dir.join(LOCKFILE_NAME);
870        if lockfile_path.exists() {
871            return Some(lockfile_path);
872        }
873        current = dir.parent();
874    }
875
876    None
877}
878
879/// Create a tool registry with all available providers.
880fn create_tool_registry() -> ToolRegistry {
881    let mut registry = ToolRegistry::new();
882
883    registry.register(cuenv_tools_nix::NixToolProvider::new());
884    registry.register(cuenv_tools_github::GitHubToolProvider::new());
885    registry.register(cuenv_tools_rustup::RustupToolProvider::new());
886    registry.register(cuenv_tools_url::UrlToolProvider::new());
887
888    registry
889}
890
891/// Convert a lockfile entry to a `ToolSource`.
892fn lockfile_entry_to_source(locked: &LockedToolPlatform) -> Option<ToolSource> {
893    match locked.provider.as_str() {
894        "oci" => {
895            let image = locked
896                .source
897                .get("image")
898                .and_then(|v| v.as_str())
899                .unwrap_or_default();
900            let path = locked
901                .source
902                .get("path")
903                .and_then(|v| v.as_str())
904                .unwrap_or_default();
905            Some(ToolSource::Oci {
906                image: image.to_string(),
907                path: path.to_string(),
908            })
909        }
910        "github" => {
911            let repo = locked
912                .source
913                .get("repo")
914                .and_then(|v| v.as_str())
915                .unwrap_or_default();
916            let tag = locked
917                .source
918                .get("tag")
919                .and_then(|v| v.as_str())
920                .unwrap_or_default();
921            let asset = locked
922                .source
923                .get("asset")
924                .and_then(|v| v.as_str())
925                .unwrap_or_default();
926            let extract = parse_github_extract_list(&locked.source);
927            Some(ToolSource::GitHub {
928                repo: repo.to_string(),
929                tag: tag.to_string(),
930                asset: asset.to_string(),
931                extract,
932            })
933        }
934        "nix" => {
935            let flake = locked
936                .source
937                .get("flake")
938                .and_then(|v| v.as_str())
939                .unwrap_or_default();
940            let package = locked
941                .source
942                .get("package")
943                .and_then(|v| v.as_str())
944                .unwrap_or_default();
945            let output = locked
946                .source
947                .get("output")
948                .and_then(|v| v.as_str())
949                .map(String::from);
950            Some(ToolSource::Nix {
951                flake: flake.to_string(),
952                package: package.to_string(),
953                output,
954            })
955        }
956        "rustup" => {
957            let toolchain = locked
958                .source
959                .get("toolchain")
960                .and_then(|v| v.as_str())
961                .unwrap_or("stable");
962            let profile = locked
963                .source
964                .get("profile")
965                .and_then(|v| v.as_str())
966                .map(String::from);
967            let components: Vec<String> = locked
968                .source
969                .get("components")
970                .and_then(|v| v.as_array())
971                .map(|arr| {
972                    arr.iter()
973                        .filter_map(|v| v.as_str().map(String::from))
974                        .collect()
975                })
976                .unwrap_or_default();
977            let targets: Vec<String> = locked
978                .source
979                .get("targets")
980                .and_then(|v| v.as_array())
981                .map(|arr| {
982                    arr.iter()
983                        .filter_map(|v| v.as_str().map(String::from))
984                        .collect()
985                })
986                .unwrap_or_default();
987            Some(ToolSource::Rustup {
988                toolchain: toolchain.to_string(),
989                profile,
990                components,
991                targets,
992            })
993        }
994        "url" => {
995            let url = locked
996                .source
997                .get("url")
998                .and_then(|v| v.as_str())
999                .unwrap_or_default();
1000            let extract = parse_github_extract_list(&locked.source);
1001            Some(ToolSource::Url {
1002                url: url.to_string(),
1003                extract,
1004            })
1005        }
1006        _ => None,
1007    }
1008}
1009
1010fn parse_github_extract_list(source: &serde_json::Value) -> Vec<ToolExtract> {
1011    let mut extract = source
1012        .get("extract")
1013        .cloned()
1014        .and_then(|value| serde_json::from_value::<Vec<ToolExtract>>(value).ok())
1015        .unwrap_or_default();
1016
1017    if extract.is_empty()
1018        && let Some(path) = source.get("path").and_then(|v| v.as_str())
1019    {
1020        if path_looks_like_library(path) {
1021            extract.push(ToolExtract::Lib {
1022                path: path.to_string(),
1023                env: None,
1024            });
1025        } else {
1026            extract.push(ToolExtract::Bin {
1027                path: path.to_string(),
1028                as_name: None,
1029            });
1030        }
1031    }
1032
1033    extract
1034}
1035
1036fn path_looks_like_library(path: &str) -> bool {
1037    let ext_is = |target: &str| {
1038        std::path::Path::new(path)
1039            .extension()
1040            .and_then(|ext| ext.to_str())
1041            .is_some_and(|ext| ext.eq_ignore_ascii_case(target))
1042    };
1043    ext_is("dylib") || ext_is("so") || path.to_ascii_lowercase().contains(".so.") || ext_is("dll")
1044}
1045
1046/// Resolve activation steps from lockfile for CI execution.
1047fn resolve_tool_activation_steps(
1048    project_root: &Path,
1049) -> std::result::Result<Vec<ResolvedToolActivationStep>, ExecutorError> {
1050    let Some(lockfile_path) = find_lockfile(project_root) else {
1051        return Ok(Vec::new());
1052    };
1053
1054    let lockfile = match Lockfile::load(&lockfile_path) {
1055        Ok(Some(lf)) => lf,
1056        Ok(None) => return Ok(Vec::new()),
1057        Err(e) => {
1058            return Err(ExecutorError::Compilation(format!(
1059                "Failed to load lockfile: {e}"
1060            )));
1061        }
1062    };
1063
1064    let options = ToolActivationResolveOptions::new(&lockfile, &lockfile_path);
1065    resolve_tool_activation(&options).map_err(|e| {
1066        ExecutorError::Compilation(format!("Invalid tool activation configuration: {e}"))
1067    })
1068}
1069
1070/// Ensure all tools from the lockfile are downloaded for the current platform.
1071async fn ensure_tools_downloaded(project_root: &Path) -> std::result::Result<(), ExecutorError> {
1072    let Some(lockfile_path) = find_lockfile(project_root) else {
1073        tracing::debug!("No lockfile found - skipping tool download");
1074        return Ok(());
1075    };
1076
1077    let lockfile = match Lockfile::load(&lockfile_path) {
1078        Ok(Some(lf)) => lf,
1079        Ok(None) => {
1080            tracing::debug!("Empty lockfile - skipping tool download");
1081            return Ok(());
1082        }
1083        Err(e) => {
1084            return Err(ExecutorError::Compilation(format!(
1085                "Failed to load lockfile: {e}"
1086            )));
1087        }
1088    };
1089
1090    if lockfile.tools.is_empty() {
1091        tracing::debug!("No tools in lockfile - skipping download");
1092        return Ok(());
1093    }
1094
1095    let activation_options = ToolActivationResolveOptions::new(&lockfile, &lockfile_path);
1096    validate_tool_activation(&activation_options).map_err(|e| {
1097        ExecutorError::Compilation(format!("Invalid tool activation configuration: {e}"))
1098    })?;
1099
1100    let platform = Platform::current();
1101    let platform_str = platform.to_string();
1102    let options = ToolOptions::default();
1103    let registry = create_tool_registry();
1104
1105    // Check prerequisites for all providers we'll use
1106    let mut providers_used = HashSet::new();
1107    for tool in lockfile.tools.values() {
1108        if let Some(locked) = tool.platforms.get(&platform_str) {
1109            providers_used.insert(locked.provider.clone());
1110        }
1111    }
1112
1113    for provider_name in &providers_used {
1114        if let Some(provider) = registry.get(provider_name)
1115            && let Err(e) = provider.check_prerequisites().await
1116        {
1117            tracing::warn!(
1118                "Provider '{}' prerequisites check failed: {} - skipping tools from this provider",
1119                provider_name,
1120                e
1121            );
1122        }
1123    }
1124
1125    // Download tools that aren't cached
1126    let mut errors: Vec<String> = Vec::new();
1127
1128    for (name, tool) in &lockfile.tools {
1129        let Some(locked) = tool.platforms.get(&platform_str) else {
1130            continue;
1131        };
1132
1133        let Some(source) = lockfile_entry_to_source(locked) else {
1134            tracing::debug!(
1135                "Unknown provider '{}' for tool '{}' - skipping",
1136                locked.provider,
1137                name
1138            );
1139            continue;
1140        };
1141
1142        let Some(provider) = registry.find_for_source(&source) else {
1143            tracing::debug!("No provider found for tool '{}' - skipping", name);
1144            continue;
1145        };
1146
1147        let resolved = ResolvedTool {
1148            name: name.clone(),
1149            version: tool.version.clone(),
1150            platform: platform.clone(),
1151            source,
1152        };
1153
1154        // Check if already cached
1155        if provider.is_cached(&resolved, &options) {
1156            continue;
1157        }
1158
1159        // Fetch the tool
1160        tracing::info!("Downloading {} v{}...", name, tool.version);
1161        match provider.fetch(&resolved, &options).await {
1162            Ok(fetched) => {
1163                tracing::info!("Downloaded {} -> {}", name, fetched.binary_path.display());
1164            }
1165            Err(e) => {
1166                tracing::warn!("Failed to download tool '{}': {}", name, e);
1167                errors.push(format!("{}: {}", name, e));
1168            }
1169        }
1170    }
1171
1172    if !errors.is_empty() {
1173        return Err(ExecutorError::Compilation(format!(
1174            "Failed to download tools: {}",
1175            errors.join(", ")
1176        )));
1177    }
1178
1179    Ok(())
1180}