1use crate::affected::{compute_affected_tasks, matched_inputs_for_task};
7use crate::discovery::discover_projects;
8use crate::ir::CachePolicy;
9use crate::provider::CIProvider;
10use crate::report::json::write_report;
11use crate::report::{ContextReport, PipelineReport, PipelineStatus, TaskReport, TaskStatus};
12use chrono::Utc;
13use cuenv_core::Result;
14use cuenv_core::manifest::Project;
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use super::ExecutorError;
19use super::config::CIExecutorConfig;
20use super::runner::{IRTaskRunner, TaskOutput};
21
22#[allow(clippy::too_many_lines)]
36pub async fn run_ci(
37 provider: Arc<dyn CIProvider>,
38 dry_run: bool,
39 specific_pipeline: Option<String>,
40) -> Result<()> {
41 let context = provider.context();
42 cuenv_events::emit_ci_context!(&context.provider, &context.event, &context.ref_name);
43
44 let changed_files = provider.changed_files().await?;
46 cuenv_events::emit_ci_changed_files!(changed_files.len());
47
48 let projects = discover_projects()?;
50 if projects.is_empty() {
51 return Err(cuenv_core::Error::configuration(
52 "No cuenv projects found. Ensure env.cue files declare 'package cuenv'",
53 ));
54 }
55 cuenv_events::emit_ci_projects_discovered!(projects.len());
56
57 let mut project_map = std::collections::HashMap::new();
59 for project in &projects {
60 let name = project.config.name.trim();
61 if !name.is_empty() {
62 project_map.insert(name.to_string(), project.clone());
63 }
64 }
65
66 let mut any_failed = false;
68
69 for project in &projects {
71 let config = &project.config;
72
73 let pipeline_name = specific_pipeline
75 .clone()
76 .unwrap_or_else(|| "default".to_string());
77
78 let Some(ci) = &config.ci else {
80 return Err(cuenv_core::Error::configuration(format!(
81 "Project {} has no CI configuration",
82 project.path.display()
83 )));
84 };
85
86 let available_pipelines: Vec<&str> = ci.pipelines.iter().map(|p| p.name.as_str()).collect();
87 let Some(pipeline) = ci.pipelines.iter().find(|p| p.name == pipeline_name) else {
88 return Err(cuenv_core::Error::configuration(format!(
89 "Pipeline '{}' not found in project {}. Available pipelines: {}",
90 pipeline_name,
91 project.path.display(),
92 available_pipelines.join(", ")
93 )));
94 };
95
96 let project_root = project.path.parent().map_or_else(
98 || std::path::Path::new("."),
99 |p| {
100 if p.as_os_str().is_empty() {
101 std::path::Path::new(".")
102 } else {
103 p
104 }
105 },
106 );
107
108 let tasks_to_run = if context.event == "release" {
110 pipeline.tasks.clone()
111 } else {
112 compute_affected_tasks(
113 &changed_files,
114 &pipeline.tasks,
115 project_root,
116 config,
117 &project_map,
118 )
119 };
120
121 if tasks_to_run.is_empty() {
122 cuenv_events::emit_ci_project_skipped!(project.path.display(), "No affected tasks");
123 continue;
124 }
125
126 tracing::info!(
127 project = %project.path.display(),
128 tasks = ?tasks_to_run,
129 "Running tasks for project"
130 );
131
132 if !dry_run {
133 let result = execute_project_pipeline(
134 project,
135 config,
136 &pipeline.name,
137 &tasks_to_run,
138 project_root,
139 context,
140 &changed_files,
141 provider.as_ref(),
142 )
143 .await;
144
145 if let Err(e) = result {
146 tracing::error!(error = %e, "Pipeline execution error");
147 any_failed = true;
148 } else if result.is_ok_and(|status| status == PipelineStatus::Failed) {
149 any_failed = true;
150 }
151 }
152 }
153
154 if any_failed {
155 return Err(cuenv_core::Error::configuration(
156 "One or more CI tasks failed",
157 ));
158 }
159
160 Ok(())
161}
162
163#[allow(clippy::too_many_arguments)] #[allow(clippy::too_many_lines)] async fn execute_project_pipeline(
167 project: &crate::discovery::DiscoveredCIProject,
168 config: &Project,
169 pipeline_name: &str,
170 tasks_to_run: &[String],
171 project_root: &std::path::Path,
172 context: &crate::context::CIContext,
173 changed_files: &[std::path::PathBuf],
174 provider: &dyn CIProvider,
175) -> Result<PipelineStatus> {
176 let start_time = Utc::now();
177 let mut tasks_reports = Vec::new();
178 let mut pipeline_status = PipelineStatus::Success;
179
180 let cache_policy_override = if is_fork_pr(context) {
182 Some(CachePolicy::Readonly)
183 } else {
184 None
185 };
186
187 let mut executor_config = CIExecutorConfig::new(project_root.to_path_buf())
189 .with_capture_output(true)
190 .with_dry_run(false)
191 .with_secret_salt(std::env::var("CUENV_SECRET_SALT").unwrap_or_default());
192
193 if let Ok(prev_salt) = std::env::var("CUENV_SECRET_SALT_PREV")
195 && !prev_salt.is_empty()
196 {
197 executor_config = executor_config.with_secret_salt_prev(prev_salt);
198 }
199
200 let _executor_config = if let Some(policy) = cache_policy_override {
201 executor_config.with_cache_policy_override(policy)
202 } else {
203 executor_config
204 };
205
206 for task_name in tasks_to_run {
208 let inputs_matched =
209 matched_inputs_for_task(task_name, config, changed_files, project_root);
210 let outputs = config
211 .tasks
212 .get(task_name)
213 .and_then(|def| def.as_single())
214 .map(|task| task.outputs.clone())
215 .unwrap_or_default();
216
217 let project_display = project.path.display().to_string();
218 cuenv_events::emit_ci_task_executing!(&project_display, task_name);
219 let task_start = std::time::Instant::now();
220
221 let result =
223 execute_single_task_by_name(config, task_name, project_root, cache_policy_override)
224 .await;
225
226 let duration = u64::try_from(task_start.elapsed().as_millis()).unwrap_or(0);
227
228 let (status, exit_code, cache_key) = match result {
229 Ok(output) => {
230 if output.success {
231 cuenv_events::emit_ci_task_result!(&project_display, task_name, true);
232 (
233 TaskStatus::Success,
234 Some(output.exit_code),
235 if output.from_cache {
236 Some(format!("cached:{}", output.task_id))
237 } else {
238 None
239 },
240 )
241 } else {
242 cuenv_events::emit_ci_task_result!(&project_display, task_name, false);
243 pipeline_status = PipelineStatus::Failed;
244 (TaskStatus::Failed, Some(output.exit_code), None)
245 }
246 }
247 Err(e) => {
248 cuenv_events::emit_ci_task_result!(
249 &project_display,
250 task_name,
251 false,
252 e.to_string()
253 );
254 pipeline_status = PipelineStatus::Failed;
255 (TaskStatus::Failed, Some(1), None)
256 }
257 };
258
259 tasks_reports.push(TaskReport {
260 name: task_name.clone(),
261 status,
262 duration_ms: duration,
263 exit_code,
264 inputs_matched,
265 cache_key,
266 outputs,
267 });
268 }
269
270 let completed_at = Utc::now();
271 #[allow(clippy::cast_sign_loss)]
272 let duration_ms = (completed_at - start_time).num_milliseconds() as u64;
273
274 let report = PipelineReport {
276 version: cuenv_core::VERSION.to_string(),
277 project: project.path.display().to_string(),
278 pipeline: pipeline_name.to_string(),
279 context: ContextReport {
280 provider: context.provider.clone(),
281 event: context.event.clone(),
282 ref_name: context.ref_name.clone(),
283 base_ref: context.base_ref.clone(),
284 sha: context.sha.clone(),
285 changed_files: changed_files
286 .iter()
287 .map(|p| p.to_string_lossy().to_string())
288 .collect(),
289 },
290 started_at: start_time,
291 completed_at: Some(completed_at),
292 duration_ms: Some(duration_ms),
293 status: pipeline_status,
294 tasks: tasks_reports,
295 };
296
297 write_pipeline_report(&report, context, project);
299 notify_provider(provider, &report, pipeline_name).await;
300
301 Ok(pipeline_status)
302}
303
304fn write_pipeline_report(
306 report: &PipelineReport,
307 context: &crate::context::CIContext,
308 project: &crate::discovery::DiscoveredCIProject,
309) {
310 let report_dir = std::path::Path::new(".cuenv/reports");
312 if let Err(e) = std::fs::create_dir_all(report_dir) {
313 tracing::warn!(error = %e, "Failed to create report directory");
314 return;
315 }
316
317 let sha_dir = report_dir.join(&context.sha);
318 let _ = std::fs::create_dir_all(&sha_dir);
319
320 let project_filename = project.path.display().to_string().replace(['/', '\\'], "-") + ".json";
321 let report_path = sha_dir.join(project_filename);
322
323 if let Err(e) = write_report(report, &report_path) {
324 tracing::warn!(error = %e, "Failed to write report");
325 } else {
326 cuenv_events::emit_ci_report!(report_path.display());
327 }
328
329 if let Err(e) = crate::report::markdown::write_job_summary(report) {
331 tracing::warn!(error = %e, "Failed to write job summary");
332 }
333}
334
335async fn notify_provider(provider: &dyn CIProvider, report: &PipelineReport, pipeline_name: &str) {
337 let check_name = format!("cuenv: {pipeline_name}");
339 match provider.create_check(&check_name).await {
340 Ok(handle) => {
341 if let Err(e) = provider.complete_check(&handle, report).await {
342 tracing::warn!(error = %e, "Failed to complete check run");
343 }
344 }
345 Err(e) => {
346 tracing::warn!(error = %e, "Failed to create check run");
347 }
348 }
349
350 if let Err(e) = provider.upload_report(report).await {
352 tracing::warn!(error = %e, "Failed to post PR comment");
353 }
354}
355
356fn is_fork_pr(context: &crate::context::CIContext) -> bool {
358 context.event == "pull_request" && context.ref_name.starts_with("refs/pull/")
361}
362
363async fn execute_single_task_by_name(
368 config: &Project,
369 task_name: &str,
370 project_root: &std::path::Path,
371 cache_policy_override: Option<CachePolicy>,
372) -> std::result::Result<TaskOutput, ExecutorError> {
373 let Some(task_def) = config.tasks.get(task_name) else {
375 return Err(ExecutorError::Compilation(format!(
376 "Task '{task_name}' not found in project config"
377 )));
378 };
379
380 let Some(task) = task_def.as_single() else {
381 return Err(ExecutorError::Compilation(format!(
382 "Task '{task_name}' is a group, not a single task"
383 )));
384 };
385
386 let ir_task = crate::ir::Task {
388 id: task_name.to_string(),
389 runtime: None, command: if task.command.is_empty() {
391 vec![task.script.clone().unwrap_or_default()]
393 } else {
394 vec![task.command.clone()]
395 },
396 shell: task.script.is_some() || !task.command.is_empty(),
397 env: task
398 .env
399 .iter()
400 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
401 .collect(),
402 secrets: HashMap::new(), resources: None,
404 concurrency_group: None,
405 inputs: task
406 .inputs
407 .iter()
408 .filter_map(|i| i.as_path())
409 .cloned()
410 .collect(),
411 outputs: vec![],
412 depends_on: task.depends_on.clone(),
413 cache_policy: cache_policy_override.unwrap_or(CachePolicy::Normal),
414 deployment: false,
415 manual_approval: false,
416 };
417
418 let mut env: HashMap<String, String> = task
420 .env
421 .iter()
422 .filter_map(|(k, v)| v.as_str().map(|s| (k.clone(), s.to_string())))
423 .collect();
424
425 if let Ok(path) = std::env::var("PATH") {
427 env.insert("PATH".to_string(), path);
428 }
429 if let Ok(home) = std::env::var("HOME") {
430 env.insert("HOME".to_string(), home);
431 }
432
433 let runner = IRTaskRunner::new(project_root.to_path_buf(), true);
435 let output = runner.execute(&ir_task, env).await?;
436
437 Ok(output)
438}