1use super::core::ExecutorCore;
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::pipeline::{JobEvent, JobFailureKind, JobRunInfo};
4use crate::runner::ExecuteContext;
5use crate::ui::{UiBridge, UiJobStatus};
6use anyhow::{Result, anyhow};
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::Instant;
11
12pub(crate) fn run_planned_job(
13 exec: &ExecutorCore,
14 plan: Arc<ExecutionPlan>,
15 planned: ExecutableJob,
16 run_info: JobRunInfo,
17 ui: Option<Arc<UiBridge>>,
18) -> JobEvent {
19 let ExecutableJob {
20 instance,
21 log_path,
22 log_hash,
23 } = planned;
24 let job = instance.job;
25 let stage_name = instance.stage_name;
26 let job_name = job.name.clone();
27 let job_start = Instant::now();
28 let ui_ref = ui.as_deref();
29
30 let result = (|| -> Result<()> {
33 let mut prepared = exec.prepare_job_run(plan.as_ref(), &job)?;
34 let container_name = run_info.container_name.clone();
35 let exec_result = exec.execute(ExecuteContext {
36 host_workdir: &prepared.host_workdir,
37 script_path: &prepared.script_path,
38 log_path: &log_path,
39 mounts: &prepared.mounts,
40 image: &prepared.job_image.name,
41 image_platform: prepared.job_image.docker_platform.as_deref(),
42 image_user: prepared.job_image.docker_user.as_deref(),
43 image_entrypoint: &prepared.job_image.entrypoint,
44 container_name: &container_name,
45 job: &job,
46 ui: ui_ref,
47 env_vars: &prepared.env_vars,
48 network: prepared
49 .service_runtime
50 .as_ref()
51 .map(|runtime| runtime.network_name()),
52 preserve_runtime_objects: exec.config.settings.preserve_runtime_objects(),
53 arch: prepared.arch.as_deref(),
54 privileged: prepared.privileged,
55 cap_add: &prepared.cap_add,
56 cap_drop: &prepared.cap_drop,
57 });
58 let service_network = prepared
59 .service_runtime
60 .as_ref()
61 .map(|runtime| runtime.network_name().to_string());
62 let service_containers = prepared
63 .service_runtime
64 .as_ref()
65 .map(|runtime| runtime.container_names().to_vec())
66 .unwrap_or_default();
67 let runtime_summary_path = exec.write_runtime_summary(
68 &job.name,
69 &container_name,
70 prepared
71 .service_runtime
72 .as_ref()
73 .map(|runtime| runtime.network_name()),
74 &service_containers,
75 )?;
76 exec.record_runtime_objects(
77 &job.name,
78 container_name.clone(),
79 service_network,
80 service_containers,
81 runtime_summary_path,
82 );
83 if !exec.config.settings.preserve_runtime_objects() {
84 exec.cleanup_finished_container(&container_name);
85 }
86 if let Some(mut runtime) = prepared.service_runtime.take()
87 && !exec.config.settings.preserve_runtime_objects()
88 {
89 runtime.cleanup();
90 }
91 exec.collect_declared_artifacts(&job, &prepared.host_workdir)?;
92 exec.collect_untracked_artifacts(&job, &prepared.host_workdir)?;
93 exec.collect_dotenv_artifacts(&job, &prepared.host_workdir)?;
94 exec_result?;
95 exec.print_job_completion(
96 &stage_name,
97 &prepared.script_path,
98 &log_path,
99 job_start.elapsed().as_secs_f32(),
100 );
101 Ok(())
102 })();
103
104 let duration = job_start.elapsed().as_secs_f32();
105 let cancelled = exec.take_cancelled_job(&job_name);
106 let final_result = completion_result(result, cancelled, &log_path);
107 if let Some(ui) = ui_ref {
108 match &final_result {
109 Ok(_) => ui.job_finished(&job_name, UiJobStatus::Success, duration, None),
110 Err(err) => {
111 if cancelled {
112 ui.job_finished(
113 &job_name,
114 UiJobStatus::Skipped,
115 duration,
116 Some("aborted by user".to_string()),
117 );
118 } else {
119 ui.job_finished(
120 &job_name,
121 UiJobStatus::Failed,
122 duration,
123 Some(err.to_string()),
124 );
125 }
126 }
127 }
128 }
129
130 exec.clear_running_container(&job_name);
131
132 let exit_code = extract_exit_code(&final_result, cancelled);
133 let failure_kind = classify_failure(&job_name, &final_result, cancelled);
134
135 JobEvent {
136 name: job_name,
137 stage_name,
138 duration,
139 log_path: Some(log_path.clone()),
140 log_hash,
141 result: final_result,
142 failure_kind,
143 exit_code,
144 cancelled,
145 }
146}
147
148fn completion_result(result: Result<()>, cancelled: bool, log_path: &Path) -> Result<()> {
149 if cancelled {
150 Err(anyhow!("job cancelled by user"))
151 } else {
152 enrich_failure_with_log_hint(result, log_path)
153 }
154}
155
156fn enrich_failure_with_log_hint(result: Result<()>, log_path: &Path) -> Result<()> {
157 let err = match result {
158 Ok(()) => return Ok(()),
159 Err(err) => err,
160 };
161 let message = err.to_string();
162 if !message.contains("container command exited with status") {
163 return Err(err);
164 }
165 let Some(hint) = failure_hint_from_log(log_path) else {
166 return Err(err);
167 };
168 Err(anyhow!("{message}; hint: {hint}"))
169}
170
171fn failure_hint_from_log(log_path: &Path) -> Option<&'static str> {
172 let tail = read_log_tail(log_path, 32 * 1024)?;
173 if tail.contains("error: rustup could not choose a version of rustc to run")
174 && tail.contains("no default is configured")
175 {
176 return Some(
177 "the job log shows an empty rustup home. A workspace-local `RUSTUP_HOME` can mask the Rust toolchain bundled in the image, especially after a cold branch/tag cache key. Prefer leaving `RUSTUP_HOME` unset in Rust images, or bootstrap the toolchain explicitly before running `rustc`/`cargo`",
178 );
179 }
180 None
181}
182
183fn read_log_tail(log_path: &Path, max_bytes: usize) -> Option<String> {
184 let bytes = fs::read(log_path).ok()?;
185 let start = bytes.len().saturating_sub(max_bytes);
186 Some(String::from_utf8_lossy(&bytes[start..]).into_owned())
187}
188
189fn extract_exit_code(result: &Result<()>, cancelled: bool) -> Option<i32> {
190 if cancelled {
191 return None;
192 }
193 let message = result.as_ref().err()?.to_string();
194 let marker = "container command exited with status Some(";
195 let start = message.find(marker)? + marker.len();
196 let rest = &message[start..];
197 let end = rest.find(')')?;
198 rest[..end].parse::<i32>().ok()
199}
200
201fn classify_failure(
202 job_name: &str,
203 result: &Result<()>,
204 cancelled: bool,
205) -> Option<JobFailureKind> {
206 if cancelled {
207 return None;
208 }
209 let err = result.as_ref().err()?;
210 let message = err.to_string();
211 let normalized = message.to_ascii_lowercase();
212 if normalized.contains("job exceeded timeout") {
213 return Some(JobFailureKind::JobExecutionTimeout);
214 }
215 if is_api_failure(&normalized) {
216 return Some(JobFailureKind::ApiFailure);
217 }
218 if is_runner_unsupported(&normalized) {
219 return Some(JobFailureKind::RunnerUnsupported);
220 }
221 if is_stale_schedule_failure(&normalized) {
222 return Some(JobFailureKind::StaleSchedule);
223 }
224 if is_archived_failure(&normalized) {
225 return Some(JobFailureKind::ArchivedFailure);
226 }
227 if is_unmet_prerequisites(&normalized) {
228 return Some(JobFailureKind::UnmetPrerequisites);
229 }
230 if is_scheduler_failure(&normalized) {
231 return Some(JobFailureKind::SchedulerFailure);
232 }
233 if is_runner_system_failure(&normalized) {
234 return Some(JobFailureKind::RunnerSystemFailure);
235 }
236 if is_stuck_or_timeout_failure(&normalized) {
237 return Some(JobFailureKind::StuckOrTimeoutFailure);
238 }
239 if is_data_integrity_failure(&normalized) {
240 return Some(JobFailureKind::DataIntegrityFailure);
241 }
242 if is_script_failure(&normalized) {
243 return Some(JobFailureKind::ScriptFailure);
244 }
245 let _ = job_name;
246 Some(JobFailureKind::UnknownFailure)
247}
248
249fn is_api_failure(message: &str) -> bool {
250 message.contains("failed to invoke curl to download artifacts")
251 || message.contains("curl failed to download artifacts")
252 || message.contains("failed to download artifacts for")
253}
254
255fn is_runner_unsupported(message: &str) -> bool {
256 message.contains("services are only supported when using")
257}
258
259fn is_stale_schedule_failure(message: &str) -> bool {
260 message.contains("stale schedule")
261}
262
263fn is_archived_failure(message: &str) -> bool {
264 message.contains("archived failure")
265 || message.contains("project is archived")
266 || message.contains("repository is archived")
267}
268
269fn is_unmet_prerequisites(message: &str) -> bool {
270 message.contains("has no image")
271 || message.contains("requires artifacts from project")
272 || message.contains("requires artifacts from '")
273 || message.contains("but it did not run")
274 || message.contains("no gitlab token is configured")
275 || message.contains("depends on unknown job")
276}
277
278fn is_scheduler_failure(message: &str) -> bool {
279 message.contains("failed to acquire job slot")
280}
281
282fn is_runner_system_failure(message: &str) -> bool {
283 message.contains("failed to start service")
284 || message.contains("failed readiness check")
285 || message.contains("failed to create network")
286 || message.contains("job task panicked")
287 || message.contains("failed to run docker command")
288 || message.contains("failed to run podman command")
289 || message.contains("failed to run nerdctl command")
290 || message.contains("failed to run containercli command")
291 || message.contains("failed to run orbstack command")
292 || message.contains("missing stdout from container process")
293 || message.contains("missing stderr from container process")
294 || message.contains("failed to create log at")
295 || message.contains("failed to invoke python3 to extract artifacts")
296}
297
298fn is_stuck_or_timeout_failure(message: &str) -> bool {
299 message.contains("timed out")
300}
301
302fn is_data_integrity_failure(message: &str) -> bool {
303 message.contains("unable to extract artifacts archive")
304}
305
306fn is_script_failure(message: &str) -> bool {
307 message.contains("container command exited with status")
308}
309
310#[cfg(test)]
311mod tests {
312 use super::{classify_failure, completion_result, extract_exit_code};
313 use crate::pipeline::JobFailureKind;
314 use anyhow::anyhow;
315 use std::fs;
316 use std::path::PathBuf;
317 use std::time::{SystemTime, UNIX_EPOCH};
318
319 #[test]
320 fn completion_result_prefers_cancelled_state() {
321 let log_path = temp_path("job-run-cancelled").join("job.log");
322 let result =
323 completion_result(Ok(()), true, &log_path).expect_err("cancelled job should fail");
324 assert_eq!(result.to_string(), "job cancelled by user");
325 }
326
327 #[test]
328 fn completion_result_adds_rustup_hint_from_log() {
329 let temp_root = temp_path("job-run-rustup-hint");
330 fs::create_dir_all(&temp_root).expect("create temp root");
331 let log_path = temp_root.join("job.log");
332 fs::write(
333 &log_path,
334 "[fetch-sources] error: rustup could not choose a version of rustc to run, because one wasn't specified explicitly, and no default is configured.\nhelp: run 'rustup default stable' to download the latest stable release of Rust and set it as your default toolchain.\n",
335 )
336 .expect("write log");
337
338 let result = completion_result(
339 Err(anyhow!("container command exited with status Some(1)")),
340 false,
341 &log_path,
342 )
343 .expect_err("rustup failure should remain an error");
344 let message = result.to_string();
345 assert!(message.contains("container command exited with status Some(1)"));
346 assert!(message.contains("RUSTUP_HOME"));
347 assert!(message.contains("cold branch/tag cache key"));
348
349 let _ = fs::remove_dir_all(temp_root);
350 }
351
352 #[test]
353 fn classify_failure_distinguishes_timeout() {
354 let result = Err(anyhow!("job exceeded timeout of 5m"));
355 assert_eq!(
356 classify_failure("job", &result, false),
357 Some(JobFailureKind::JobExecutionTimeout)
358 );
359 }
360
361 #[test]
362 fn classify_failure_defaults_to_script_failure() {
363 let result = Err(anyhow!("container command exited with status Some(1)"));
364 assert_eq!(
365 classify_failure("job", &result, false),
366 Some(JobFailureKind::ScriptFailure)
367 );
368 }
369
370 #[test]
371 fn classify_failure_detects_api_failure() {
372 let result = Err(anyhow!(
373 "failed to download artifacts for 'build' from project 'group/project': curl failed to download artifacts from https://gitlab.example/api/v4/projects/group%2Fproject/jobs/artifacts/main/download?job=build (status 404)"
374 ));
375 assert_eq!(
376 classify_failure("job", &result, false),
377 Some(JobFailureKind::ApiFailure)
378 );
379 }
380
381 #[test]
382 fn classify_failure_detects_unmet_prerequisites() {
383 let result = Err(anyhow!(
384 "job 'build' has no image (use --base-image or set image in pipeline/job)"
385 ));
386 assert_eq!(
387 classify_failure("job", &result, false),
388 Some(JobFailureKind::UnmetPrerequisites)
389 );
390 }
391
392 #[test]
393 fn classify_failure_falls_back_to_unknown_failure() {
394 let result = Err(anyhow!("unexpected executor error"));
395 assert_eq!(
396 classify_failure("job", &result, false),
397 Some(JobFailureKind::UnknownFailure)
398 );
399 }
400
401 #[test]
402 fn classify_failure_detects_stale_schedule() {
403 let result = Err(anyhow!("stale schedule prevented delayed job execution"));
404 assert_eq!(
405 classify_failure("job", &result, false),
406 Some(JobFailureKind::StaleSchedule)
407 );
408 }
409
410 #[test]
411 fn classify_failure_detects_archived_failure() {
412 let result = Err(anyhow!("project is archived"));
413 assert_eq!(
414 classify_failure("job", &result, false),
415 Some(JobFailureKind::ArchivedFailure)
416 );
417 }
418
419 #[test]
420 fn classify_failure_detects_data_integrity_failure() {
421 let result = Err(anyhow!(
422 "unable to extract artifacts archive /tmp/archive.zip"
423 ));
424 assert_eq!(
425 classify_failure("job", &result, false),
426 Some(JobFailureKind::DataIntegrityFailure)
427 );
428 }
429
430 #[test]
431 fn extract_exit_code_reads_container_exit_status() {
432 let result = Err(anyhow!("container command exited with status Some(137)"));
433 assert_eq!(extract_exit_code(&result, false), Some(137));
434 }
435
436 fn temp_path(prefix: &str) -> PathBuf {
437 let nanos = SystemTime::now()
438 .duration_since(UNIX_EPOCH)
439 .expect("system time before epoch")
440 .as_nanos();
441 std::env::temp_dir().join(format!("opal-{prefix}-{nanos}"))
442 }
443}