cuenv_ci/executor/
orchestrator.rs

1//! CI Pipeline Orchestrator
2//!
3//! Main entry point for CI pipeline execution, integrating with the provider
4//! system for context detection, file change tracking, and reporting.
5
6use crate::affected::{compute_affected_tasks, matched_inputs_for_task};
7use crate::discovery::discover_projects;
8use crate::ir::CachePolicy;
9use crate::provider::CIProvider;
10use crate::report::json::write_report;
11use crate::report::{ContextReport, PipelineReport, PipelineStatus, TaskReport, TaskStatus};
12use chrono::Utc;
13use cuenv_core::Result;
14use cuenv_core::manifest::Project;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use super::ExecutorError;
19use super::config::CIExecutorConfig;
20use super::runner::{IRTaskRunner, TaskOutput};
21
22/// Run the CI pipeline logic
23///
24/// This is the main entry point for CI execution, integrating with the provider
25/// system for context detection, file change tracking, and reporting.
26///
27/// # Arguments
28///
29/// * `provider` - The CI provider to use for changed files detection and reporting
30/// * `dry_run` - If true, don't actually run tasks
31/// * `specific_pipeline` - If set, only run tasks from this pipeline
32///
33/// # Errors
34/// Returns error if IO errors occur or tasks fail
35#[allow(clippy::too_many_lines)]
36pub async fn run_ci(
37    provider: Arc<dyn CIProvider>,
38    dry_run: bool,
39    specific_pipeline: Option<String>,
40) -> Result<()> {
41    let context = provider.context();
42    cuenv_events::emit_ci_context!(&context.provider, &context.event, &context.ref_name);
43
44    // Get changed files
45    let changed_files = provider.changed_files().await?;
46    cuenv_events::emit_ci_changed_files!(changed_files.len());
47
48    // Discover projects
49    let projects = discover_projects()?;
50    if projects.is_empty() {
51        return Err(cuenv_core::Error::configuration(
52            "No cuenv projects found. Ensure env.cue files declare 'package cuenv'",
53        ));
54    }
55    cuenv_events::emit_ci_projects_discovered!(projects.len());
56
57    // Build project map for cross-project dependency resolution
58    let mut project_map = std::collections::HashMap::new();
59    for project in &projects {
60        let name = project.config.name.trim();
61        if !name.is_empty() {
62            project_map.insert(name.to_string(), project.clone());
63        }
64    }
65
66    // Track if any project failed
67    let mut any_failed = false;
68
69    // Process each project
70    for project in &projects {
71        let config = &project.config;
72
73        // Determine pipeline to run
74        let pipeline_name = specific_pipeline
75            .clone()
76            .unwrap_or_else(|| "default".to_string());
77
78        // Find pipeline in config
79        let Some(ci) = &config.ci else {
80            return Err(cuenv_core::Error::configuration(format!(
81                "Project {} has no CI configuration",
82                project.path.display()
83            )));
84        };
85
86        let available_pipelines: Vec<&str> = ci.pipelines.iter().map(|p| p.name.as_str()).collect();
87        let Some(pipeline) = ci.pipelines.iter().find(|p| p.name == pipeline_name) else {
88            return Err(cuenv_core::Error::configuration(format!(
89                "Pipeline '{}' not found in project {}. Available pipelines: {}",
90                pipeline_name,
91                project.path.display(),
92                available_pipelines.join(", ")
93            )));
94        };
95
96        // Get the directory containing the env.cue file
97        let project_root = project.path.parent().map_or_else(
98            || std::path::Path::new("."),
99            |p| {
100                if p.as_os_str().is_empty() {
101                    std::path::Path::new(".")
102                } else {
103                    p
104                }
105            },
106        );
107
108        // For release events, run all tasks unconditionally (no affected-file filtering)
109        let tasks_to_run = if context.event == "release" {
110            pipeline.tasks.clone()
111        } else {
112            compute_affected_tasks(
113                &changed_files,
114                &pipeline.tasks,
115                project_root,
116                config,
117                &project_map,
118            )
119        };
120
121        if tasks_to_run.is_empty() {
122            cuenv_events::emit_ci_project_skipped!(project.path.display(), "No affected tasks");
123            continue;
124        }
125
126        tracing::info!(
127            project = %project.path.display(),
128            tasks = ?tasks_to_run,
129            "Running tasks for project"
130        );
131
132        if !dry_run {
133            let result = execute_project_pipeline(
134                project,
135                config,
136                &pipeline.name,
137                &tasks_to_run,
138                project_root,
139                context,
140                &changed_files,
141                provider.as_ref(),
142            )
143            .await;
144
145            if let Err(e) = result {
146                tracing::error!(error = %e, "Pipeline execution error");
147                any_failed = true;
148            } else if result.is_ok_and(|status| status == PipelineStatus::Failed) {
149                any_failed = true;
150            }
151        }
152    }
153
154    if any_failed {
155        return Err(cuenv_core::Error::configuration(
156            "One or more CI tasks failed",
157        ));
158    }
159
160    Ok(())
161}
162
163/// Execute a project's pipeline and handle reporting
164#[allow(clippy::too_many_arguments)] // Pipeline execution requires many context params
165#[allow(clippy::too_many_lines)] // Complex orchestration logic
166async fn execute_project_pipeline(
167    project: &crate::discovery::DiscoveredCIProject,
168    config: &Project,
169    pipeline_name: &str,
170    tasks_to_run: &[String],
171    project_root: &std::path::Path,
172    context: &crate::context::CIContext,
173    changed_files: &[std::path::PathBuf],
174    provider: &dyn CIProvider,
175) -> Result<PipelineStatus> {
176    let start_time = Utc::now();
177    let mut tasks_reports = Vec::new();
178    let mut pipeline_status = PipelineStatus::Success;
179
180    // Determine cache policy override based on context
181    let cache_policy_override = if is_fork_pr(context) {
182        Some(CachePolicy::Readonly)
183    } else {
184        None
185    };
186
187    // Create executor configuration with salt rotation support
188    let mut executor_config = CIExecutorConfig::new(project_root.to_path_buf())
189        .with_capture_output(true)
190        .with_dry_run(false)
191        .with_secret_salt(std::env::var("CUENV_SECRET_SALT").unwrap_or_default());
192
193    // Add previous salt for rotation support
194    if let Ok(prev_salt) = std::env::var("CUENV_SECRET_SALT_PREV")
195        && !prev_salt.is_empty()
196    {
197        executor_config = executor_config.with_secret_salt_prev(prev_salt);
198    }
199
200    let _executor_config = if let Some(policy) = cache_policy_override {
201        executor_config.with_cache_policy_override(policy)
202    } else {
203        executor_config
204    };
205
206    // Execute tasks
207    for task_name in tasks_to_run {
208        let inputs_matched =
209            matched_inputs_for_task(task_name, config, changed_files, project_root);
210        let outputs = config
211            .tasks
212            .get(task_name)
213            .and_then(|def| def.as_single())
214            .map(|task| task.outputs.clone())
215            .unwrap_or_default();
216
217        let project_display = project.path.display().to_string();
218        cuenv_events::emit_ci_task_executing!(&project_display, task_name);
219        let task_start = std::time::Instant::now();
220
221        // Execute the task using the runner
222        let result =
223            execute_single_task_by_name(config, task_name, project_root, cache_policy_override)
224                .await;
225
226        let duration = u64::try_from(task_start.elapsed().as_millis()).unwrap_or(0);
227
228        let (status, exit_code, cache_key) = match result {
229            Ok(output) => {
230                if output.success {
231                    cuenv_events::emit_ci_task_result!(&project_display, task_name, true);
232                    (
233                        TaskStatus::Success,
234                        Some(output.exit_code),
235                        if output.from_cache {
236                            Some(format!("cached:{}", output.task_id))
237                        } else {
238                            None
239                        },
240                    )
241                } else {
242                    cuenv_events::emit_ci_task_result!(&project_display, task_name, false);
243                    pipeline_status = PipelineStatus::Failed;
244                    (TaskStatus::Failed, Some(output.exit_code), None)
245                }
246            }
247            Err(e) => {
248                cuenv_events::emit_ci_task_result!(
249                    &project_display,
250                    task_name,
251                    false,
252                    e.to_string()
253                );
254                pipeline_status = PipelineStatus::Failed;
255                (TaskStatus::Failed, Some(1), None)
256            }
257        };
258
259        tasks_reports.push(TaskReport {
260            name: task_name.clone(),
261            status,
262            duration_ms: duration,
263            exit_code,
264            inputs_matched,
265            cache_key,
266            outputs,
267        });
268    }
269
270    let completed_at = Utc::now();
271    #[allow(clippy::cast_sign_loss)]
272    let duration_ms = (completed_at - start_time).num_milliseconds() as u64;
273
274    // Generate report
275    let report = PipelineReport {
276        version: cuenv_core::VERSION.to_string(),
277        project: project.path.display().to_string(),
278        pipeline: pipeline_name.to_string(),
279        context: ContextReport {
280            provider: context.provider.clone(),
281            event: context.event.clone(),
282            ref_name: context.ref_name.clone(),
283            base_ref: context.base_ref.clone(),
284            sha: context.sha.clone(),
285            changed_files: changed_files
286                .iter()
287                .map(|p| p.to_string_lossy().to_string())
288                .collect(),
289        },
290        started_at: start_time,
291        completed_at: Some(completed_at),
292        duration_ms: Some(duration_ms),
293        status: pipeline_status,
294        tasks: tasks_reports,
295    };
296
297    // Write reports and notify provider
298    write_pipeline_report(&report, context, project);
299    notify_provider(provider, &report, pipeline_name).await;
300
301    Ok(pipeline_status)
302}
303
304/// Write pipeline report to disk
305fn write_pipeline_report(
306    report: &PipelineReport,
307    context: &crate::context::CIContext,
308    project: &crate::discovery::DiscoveredCIProject,
309) {
310    // Ensure report directory exists
311    let report_dir = std::path::Path::new(".cuenv/reports");
312    if let Err(e) = std::fs::create_dir_all(report_dir) {
313        tracing::warn!(error = %e, "Failed to create report directory");
314        return;
315    }
316
317    let sha_dir = report_dir.join(&context.sha);
318    let _ = std::fs::create_dir_all(&sha_dir);
319
320    let project_filename = project.path.display().to_string().replace(['/', '\\'], "-") + ".json";
321    let report_path = sha_dir.join(project_filename);
322
323    if let Err(e) = write_report(report, &report_path) {
324        tracing::warn!(error = %e, "Failed to write report");
325    } else {
326        cuenv_events::emit_ci_report!(report_path.display());
327    }
328
329    // Write GitHub Job Summary
330    if let Err(e) = crate::report::markdown::write_job_summary(report) {
331        tracing::warn!(error = %e, "Failed to write job summary");
332    }
333}
334
335/// Notify CI provider about pipeline results
336async fn notify_provider(provider: &dyn CIProvider, report: &PipelineReport, pipeline_name: &str) {
337    // Post results to CI provider
338    let check_name = format!("cuenv: {pipeline_name}");
339    match provider.create_check(&check_name).await {
340        Ok(handle) => {
341            if let Err(e) = provider.complete_check(&handle, report).await {
342                tracing::warn!(error = %e, "Failed to complete check run");
343            }
344        }
345        Err(e) => {
346            tracing::warn!(error = %e, "Failed to create check run");
347        }
348    }
349
350    // Post PR comment with report summary
351    if let Err(e) = provider.upload_report(report).await {
352        tracing::warn!(error = %e, "Failed to post PR comment");
353    }
354}
355
356/// Check if this is a fork PR (should use readonly cache)
357fn is_fork_pr(context: &crate::context::CIContext) -> bool {
358    // Fork PRs typically have a different head repo than base repo
359    // This is a simplified check - providers may need more sophisticated detection
360    context.event == "pull_request" && context.ref_name.starts_with("refs/pull/")
361}
362
363/// Execute a single task by name using the existing project config
364///
365/// This bridges the gap between the task name-based execution in `run_ci`
366/// and the IR-based execution in `CIExecutor`.
367async fn execute_single_task_by_name(
368    config: &Project,
369    task_name: &str,
370    project_root: &std::path::Path,
371    cache_policy_override: Option<CachePolicy>,
372) -> std::result::Result<TaskOutput, ExecutorError> {
373    // Get task definition
374    let Some(task_def) = config.tasks.get(task_name) else {
375        return Err(ExecutorError::Compilation(format!(
376            "Task '{task_name}' not found in project config"
377        )));
378    };
379
380    let Some(task) = task_def.as_single() else {
381        return Err(ExecutorError::Compilation(format!(
382            "Task '{task_name}' is a group, not a single task"
383        )));
384    };
385
386    // Build a minimal IR task for execution
387    let ir_task = crate::ir::Task {
388        id: task_name.to_string(),
389        runtime: None, // TODO: Support runtime from task config
390        command: if task.command.is_empty() {
391            // If no command, use script
392            vec![task.script.clone().unwrap_or_default()]
393        } else {
394            vec![task.command.clone()]
395        },
396        shell: task.script.is_some() || !task.command.is_empty(),
397        env: task
398            .env
399            .iter()
400            .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
401            .collect(),
402        secrets: HashMap::new(), // Secrets handled separately
403        resources: None,
404        concurrency_group: None,
405        inputs: task
406            .inputs
407            .iter()
408            .filter_map(|i| i.as_path())
409            .cloned()
410            .collect(),
411        outputs: vec![],
412        depends_on: task.depends_on.clone(),
413        cache_policy: cache_policy_override.unwrap_or(CachePolicy::Normal),
414        deployment: false,
415        manual_approval: false,
416    };
417
418    // Build environment
419    let mut env: HashMap<String, String> = task
420        .env
421        .iter()
422        .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
423        .collect();
424
425    // Add PATH and HOME
426    if let Ok(path) = std::env::var("PATH") {
427        env.insert("PATH".to_string(), path);
428    }
429    if let Ok(home) = std::env::var("HOME") {
430        env.insert("HOME".to_string(), home);
431    }
432
433    // Execute using runner directly
434    let runner = IRTaskRunner::new(project_root.to_path_buf(), true);
435    let output = runner.execute(&ir_task, env).await?;
436
437    Ok(output)
438}