1use super::core::ExecutorCore;
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::pipeline::{JobEvent, JobFailureKind, JobRunInfo};
4use crate::runner::ExecuteContext;
5use crate::ui::{UiBridge, UiJobStatus};
6use anyhow::{Result, anyhow};
7use std::sync::Arc;
8use std::time::Instant;
9
10pub(crate) fn run_planned_job(
11 exec: &ExecutorCore,
12 plan: Arc<ExecutionPlan>,
13 planned: ExecutableJob,
14 run_info: JobRunInfo,
15 ui: Option<Arc<UiBridge>>,
16) -> JobEvent {
17 let ExecutableJob {
18 instance,
19 log_path,
20 log_hash,
21 } = planned;
22 let job = instance.job;
23 let stage_name = instance.stage_name;
24 let job_name = job.name.clone();
25 let job_start = Instant::now();
26 let ui_ref = ui.as_deref();
27
28 let result = (|| -> Result<()> {
31 let mut prepared = exec.prepare_job_run(plan.as_ref(), &job)?;
32 let container_name = run_info.container_name.clone();
33 let exec_result = exec.execute(ExecuteContext {
34 host_workdir: &prepared.host_workdir,
35 script_path: &prepared.script_path,
36 log_path: &log_path,
37 mounts: &prepared.mounts,
38 image: &prepared.job_image.name,
39 image_platform: prepared.job_image.docker_platform.as_deref(),
40 image_user: prepared.job_image.docker_user.as_deref(),
41 image_entrypoint: &prepared.job_image.entrypoint,
42 container_name: &container_name,
43 job: &job,
44 ui: ui_ref,
45 env_vars: &prepared.env_vars,
46 network: prepared
47 .service_runtime
48 .as_ref()
49 .map(|runtime| runtime.network_name()),
50 preserve_runtime_objects: exec.config.settings.preserve_runtime_objects(),
51 arch: prepared.arch.as_deref(),
52 privileged: prepared.privileged,
53 cap_add: &prepared.cap_add,
54 cap_drop: &prepared.cap_drop,
55 });
56 let service_network = prepared
57 .service_runtime
58 .as_ref()
59 .map(|runtime| runtime.network_name().to_string());
60 let service_containers = prepared
61 .service_runtime
62 .as_ref()
63 .map(|runtime| runtime.container_names().to_vec())
64 .unwrap_or_default();
65 let runtime_summary_path = exec.write_runtime_summary(
66 &job.name,
67 &container_name,
68 prepared
69 .service_runtime
70 .as_ref()
71 .map(|runtime| runtime.network_name()),
72 &service_containers,
73 )?;
74 exec.record_runtime_objects(
75 &job.name,
76 container_name.clone(),
77 service_network,
78 service_containers,
79 runtime_summary_path,
80 );
81 if !exec.config.settings.preserve_runtime_objects() {
82 exec.cleanup_finished_container(&container_name);
83 }
84 if let Some(mut runtime) = prepared.service_runtime.take()
85 && !exec.config.settings.preserve_runtime_objects()
86 {
87 runtime.cleanup();
88 }
89 exec.collect_declared_artifacts(&job, &prepared.host_workdir)?;
90 exec.collect_untracked_artifacts(&job, &prepared.host_workdir)?;
91 exec.collect_dotenv_artifacts(&job, &prepared.host_workdir)?;
92 exec_result?;
93 exec.print_job_completion(
94 &stage_name,
95 &prepared.script_path,
96 &log_path,
97 job_start.elapsed().as_secs_f32(),
98 );
99 Ok(())
100 })();
101
102 let duration = job_start.elapsed().as_secs_f32();
103 let cancelled = exec.take_cancelled_job(&job_name);
104 let final_result = completion_result(result, cancelled);
105 if let Some(ui) = ui_ref {
106 match &final_result {
107 Ok(_) => ui.job_finished(&job_name, UiJobStatus::Success, duration, None),
108 Err(err) => {
109 if cancelled {
110 ui.job_finished(
111 &job_name,
112 UiJobStatus::Skipped,
113 duration,
114 Some("aborted by user".to_string()),
115 );
116 } else {
117 ui.job_finished(
118 &job_name,
119 UiJobStatus::Failed,
120 duration,
121 Some(err.to_string()),
122 );
123 }
124 }
125 }
126 }
127
128 exec.clear_running_container(&job_name);
129
130 let exit_code = extract_exit_code(&final_result, cancelled);
131 let failure_kind = classify_failure(&job_name, &final_result, cancelled);
132
133 JobEvent {
134 name: job_name,
135 stage_name,
136 duration,
137 log_path: Some(log_path.clone()),
138 log_hash,
139 result: final_result,
140 failure_kind,
141 exit_code,
142 cancelled,
143 }
144}
145
146fn completion_result(result: Result<()>, cancelled: bool) -> Result<()> {
147 if cancelled {
148 Err(anyhow!("job cancelled by user"))
149 } else {
150 result
151 }
152}
153
154fn extract_exit_code(result: &Result<()>, cancelled: bool) -> Option<i32> {
155 if cancelled {
156 return None;
157 }
158 let message = result.as_ref().err()?.to_string();
159 let marker = "container command exited with status Some(";
160 let start = message.find(marker)? + marker.len();
161 let rest = &message[start..];
162 let end = rest.find(')')?;
163 rest[..end].parse::<i32>().ok()
164}
165
166fn classify_failure(
167 job_name: &str,
168 result: &Result<()>,
169 cancelled: bool,
170) -> Option<JobFailureKind> {
171 if cancelled {
172 return None;
173 }
174 let err = result.as_ref().err()?;
175 let message = err.to_string();
176 let normalized = message.to_ascii_lowercase();
177 if normalized.contains("job exceeded timeout") {
178 return Some(JobFailureKind::JobExecutionTimeout);
179 }
180 if is_api_failure(&normalized) {
181 return Some(JobFailureKind::ApiFailure);
182 }
183 if is_runner_unsupported(&normalized) {
184 return Some(JobFailureKind::RunnerUnsupported);
185 }
186 if is_stale_schedule_failure(&normalized) {
187 return Some(JobFailureKind::StaleSchedule);
188 }
189 if is_archived_failure(&normalized) {
190 return Some(JobFailureKind::ArchivedFailure);
191 }
192 if is_unmet_prerequisites(&normalized) {
193 return Some(JobFailureKind::UnmetPrerequisites);
194 }
195 if is_scheduler_failure(&normalized) {
196 return Some(JobFailureKind::SchedulerFailure);
197 }
198 if is_runner_system_failure(&normalized) {
199 return Some(JobFailureKind::RunnerSystemFailure);
200 }
201 if is_stuck_or_timeout_failure(&normalized) {
202 return Some(JobFailureKind::StuckOrTimeoutFailure);
203 }
204 if is_data_integrity_failure(&normalized) {
205 return Some(JobFailureKind::DataIntegrityFailure);
206 }
207 if is_script_failure(&normalized) {
208 return Some(JobFailureKind::ScriptFailure);
209 }
210 let _ = job_name;
211 Some(JobFailureKind::UnknownFailure)
212}
213
214fn is_api_failure(message: &str) -> bool {
215 message.contains("failed to invoke curl to download artifacts")
216 || message.contains("curl failed to download artifacts")
217 || message.contains("failed to download artifacts for")
218}
219
220fn is_runner_unsupported(message: &str) -> bool {
221 message.contains("services are only supported when using")
222}
223
224fn is_stale_schedule_failure(message: &str) -> bool {
225 message.contains("stale schedule")
226}
227
228fn is_archived_failure(message: &str) -> bool {
229 message.contains("archived failure")
230 || message.contains("project is archived")
231 || message.contains("repository is archived")
232}
233
234fn is_unmet_prerequisites(message: &str) -> bool {
235 message.contains("has no image")
236 || message.contains("requires artifacts from project")
237 || message.contains("requires artifacts from '")
238 || message.contains("but it did not run")
239 || message.contains("no gitlab token is configured")
240 || message.contains("depends on unknown job")
241}
242
243fn is_scheduler_failure(message: &str) -> bool {
244 message.contains("failed to acquire job slot")
245}
246
247fn is_runner_system_failure(message: &str) -> bool {
248 message.contains("failed to start service")
249 || message.contains("failed readiness check")
250 || message.contains("failed to create network")
251 || message.contains("job task panicked")
252 || message.contains("failed to run docker command")
253 || message.contains("failed to run podman command")
254 || message.contains("failed to run nerdctl command")
255 || message.contains("failed to run containercli command")
256 || message.contains("failed to run orbstack command")
257 || message.contains("missing stdout from container process")
258 || message.contains("missing stderr from container process")
259 || message.contains("failed to create log at")
260 || message.contains("failed to invoke python3 to extract artifacts")
261}
262
263fn is_stuck_or_timeout_failure(message: &str) -> bool {
264 message.contains("timed out")
265}
266
267fn is_data_integrity_failure(message: &str) -> bool {
268 message.contains("unable to extract artifacts archive")
269}
270
271fn is_script_failure(message: &str) -> bool {
272 message.contains("container command exited with status")
273}
274
275#[cfg(test)]
276mod tests {
277 use super::{classify_failure, completion_result, extract_exit_code};
278 use crate::pipeline::JobFailureKind;
279 use anyhow::anyhow;
280
281 #[test]
282 fn completion_result_prefers_cancelled_state() {
283 let result = completion_result(Ok(()), true).expect_err("cancelled job should fail");
284 assert_eq!(result.to_string(), "job cancelled by user");
285 }
286
287 #[test]
288 fn classify_failure_distinguishes_timeout() {
289 let result = Err(anyhow!("job exceeded timeout of 5m"));
290 assert_eq!(
291 classify_failure("job", &result, false),
292 Some(JobFailureKind::JobExecutionTimeout)
293 );
294 }
295
296 #[test]
297 fn classify_failure_defaults_to_script_failure() {
298 let result = Err(anyhow!("container command exited with status Some(1)"));
299 assert_eq!(
300 classify_failure("job", &result, false),
301 Some(JobFailureKind::ScriptFailure)
302 );
303 }
304
305 #[test]
306 fn classify_failure_detects_api_failure() {
307 let result = Err(anyhow!(
308 "failed to download artifacts for 'build' from project 'group/project': curl failed to download artifacts from https://gitlab.example/api/v4/projects/group%2Fproject/jobs/artifacts/main/download?job=build (status 404)"
309 ));
310 assert_eq!(
311 classify_failure("job", &result, false),
312 Some(JobFailureKind::ApiFailure)
313 );
314 }
315
316 #[test]
317 fn classify_failure_detects_unmet_prerequisites() {
318 let result = Err(anyhow!(
319 "job 'build' has no image (use --base-image or set image in pipeline/job)"
320 ));
321 assert_eq!(
322 classify_failure("job", &result, false),
323 Some(JobFailureKind::UnmetPrerequisites)
324 );
325 }
326
327 #[test]
328 fn classify_failure_falls_back_to_unknown_failure() {
329 let result = Err(anyhow!("unexpected executor error"));
330 assert_eq!(
331 classify_failure("job", &result, false),
332 Some(JobFailureKind::UnknownFailure)
333 );
334 }
335
336 #[test]
337 fn classify_failure_detects_stale_schedule() {
338 let result = Err(anyhow!("stale schedule prevented delayed job execution"));
339 assert_eq!(
340 classify_failure("job", &result, false),
341 Some(JobFailureKind::StaleSchedule)
342 );
343 }
344
345 #[test]
346 fn classify_failure_detects_archived_failure() {
347 let result = Err(anyhow!("project is archived"));
348 assert_eq!(
349 classify_failure("job", &result, false),
350 Some(JobFailureKind::ArchivedFailure)
351 );
352 }
353
354 #[test]
355 fn classify_failure_detects_data_integrity_failure() {
356 let result = Err(anyhow!(
357 "unable to extract artifacts archive /tmp/archive.zip"
358 ));
359 assert_eq!(
360 classify_failure("job", &result, false),
361 Some(JobFailureKind::DataIntegrityFailure)
362 );
363 }
364
365 #[test]
366 fn extract_exit_code_reads_container_exit_status() {
367 let result = Err(anyhow!("container command exited with status Some(137)"));
368 assert_eq!(extract_exit_code(&result, false), Some(137));
369 }
370}