Skip to main content

opal/executor/
job_runner.rs

1use super::core::ExecutorCore;
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::pipeline::{JobEvent, JobFailureKind, JobRunInfo};
4use crate::runner::ExecuteContext;
5use crate::ui::{UiBridge, UiJobStatus};
6use anyhow::{Result, anyhow};
7use std::sync::Arc;
8use std::time::Instant;
9
10pub(crate) fn run_planned_job(
11    exec: &ExecutorCore,
12    plan: Arc<ExecutionPlan>,
13    planned: ExecutableJob,
14    run_info: JobRunInfo,
15    ui: Option<Arc<UiBridge>>,
16) -> JobEvent {
17    let ExecutableJob {
18        instance,
19        log_path,
20        log_hash,
21    } = planned;
22    let job = instance.job;
23    let stage_name = instance.stage_name;
24    let job_name = job.name.clone();
25    let job_start = Instant::now();
26    let ui_ref = ui.as_deref();
27
28    // TODO: what the fuck is this, why is this a function here?
29    // the logic needs to be simplified, garbage
30    let result = (|| -> Result<()> {
31        let mut prepared = exec.prepare_job_run(plan.as_ref(), &job)?;
32        let container_name = run_info.container_name.clone();
33        let exec_result = exec.execute(ExecuteContext {
34            host_workdir: &prepared.host_workdir,
35            script_path: &prepared.script_path,
36            log_path: &log_path,
37            mounts: &prepared.mounts,
38            image: &prepared.job_image.name,
39            image_platform: prepared.job_image.docker_platform.as_deref(),
40            image_user: prepared.job_image.docker_user.as_deref(),
41            image_entrypoint: &prepared.job_image.entrypoint,
42            container_name: &container_name,
43            job: &job,
44            ui: ui_ref,
45            env_vars: &prepared.env_vars,
46            network: prepared
47                .service_runtime
48                .as_ref()
49                .map(|runtime| runtime.network_name()),
50            preserve_runtime_objects: exec.config.settings.preserve_runtime_objects(),
51            arch: prepared.arch.as_deref(),
52            privileged: prepared.privileged,
53            cap_add: &prepared.cap_add,
54            cap_drop: &prepared.cap_drop,
55        });
56        let service_network = prepared
57            .service_runtime
58            .as_ref()
59            .map(|runtime| runtime.network_name().to_string());
60        let service_containers = prepared
61            .service_runtime
62            .as_ref()
63            .map(|runtime| runtime.container_names().to_vec())
64            .unwrap_or_default();
65        let runtime_summary_path = exec.write_runtime_summary(
66            &job.name,
67            &container_name,
68            prepared
69                .service_runtime
70                .as_ref()
71                .map(|runtime| runtime.network_name()),
72            &service_containers,
73        )?;
74        exec.record_runtime_objects(
75            &job.name,
76            container_name.clone(),
77            service_network,
78            service_containers,
79            runtime_summary_path,
80        );
81        if !exec.config.settings.preserve_runtime_objects() {
82            exec.cleanup_finished_container(&container_name);
83        }
84        if let Some(mut runtime) = prepared.service_runtime.take()
85            && !exec.config.settings.preserve_runtime_objects()
86        {
87            runtime.cleanup();
88        }
89        exec.collect_declared_artifacts(&job, &prepared.host_workdir)?;
90        exec.collect_untracked_artifacts(&job, &prepared.host_workdir)?;
91        exec.collect_dotenv_artifacts(&job, &prepared.host_workdir)?;
92        exec_result?;
93        exec.print_job_completion(
94            &stage_name,
95            &prepared.script_path,
96            &log_path,
97            job_start.elapsed().as_secs_f32(),
98        );
99        Ok(())
100    })();
101
102    let duration = job_start.elapsed().as_secs_f32();
103    let cancelled = exec.take_cancelled_job(&job_name);
104    let final_result = completion_result(result, cancelled);
105    if let Some(ui) = ui_ref {
106        match &final_result {
107            Ok(_) => ui.job_finished(&job_name, UiJobStatus::Success, duration, None),
108            Err(err) => {
109                if cancelled {
110                    ui.job_finished(
111                        &job_name,
112                        UiJobStatus::Skipped,
113                        duration,
114                        Some("aborted by user".to_string()),
115                    );
116                } else {
117                    ui.job_finished(
118                        &job_name,
119                        UiJobStatus::Failed,
120                        duration,
121                        Some(err.to_string()),
122                    );
123                }
124            }
125        }
126    }
127
128    exec.clear_running_container(&job_name);
129
130    let exit_code = extract_exit_code(&final_result, cancelled);
131    let failure_kind = classify_failure(&job_name, &final_result, cancelled);
132
133    JobEvent {
134        name: job_name,
135        stage_name,
136        duration,
137        log_path: Some(log_path.clone()),
138        log_hash,
139        result: final_result,
140        failure_kind,
141        exit_code,
142        cancelled,
143    }
144}
145
146fn completion_result(result: Result<()>, cancelled: bool) -> Result<()> {
147    if cancelled {
148        Err(anyhow!("job cancelled by user"))
149    } else {
150        result
151    }
152}
153
154fn extract_exit_code(result: &Result<()>, cancelled: bool) -> Option<i32> {
155    if cancelled {
156        return None;
157    }
158    let message = result.as_ref().err()?.to_string();
159    let marker = "container command exited with status Some(";
160    let start = message.find(marker)? + marker.len();
161    let rest = &message[start..];
162    let end = rest.find(')')?;
163    rest[..end].parse::<i32>().ok()
164}
165
166fn classify_failure(
167    job_name: &str,
168    result: &Result<()>,
169    cancelled: bool,
170) -> Option<JobFailureKind> {
171    if cancelled {
172        return None;
173    }
174    let err = result.as_ref().err()?;
175    let message = err.to_string();
176    let normalized = message.to_ascii_lowercase();
177    if normalized.contains("job exceeded timeout") {
178        return Some(JobFailureKind::JobExecutionTimeout);
179    }
180    if is_api_failure(&normalized) {
181        return Some(JobFailureKind::ApiFailure);
182    }
183    if is_runner_unsupported(&normalized) {
184        return Some(JobFailureKind::RunnerUnsupported);
185    }
186    if is_stale_schedule_failure(&normalized) {
187        return Some(JobFailureKind::StaleSchedule);
188    }
189    if is_archived_failure(&normalized) {
190        return Some(JobFailureKind::ArchivedFailure);
191    }
192    if is_unmet_prerequisites(&normalized) {
193        return Some(JobFailureKind::UnmetPrerequisites);
194    }
195    if is_scheduler_failure(&normalized) {
196        return Some(JobFailureKind::SchedulerFailure);
197    }
198    if is_runner_system_failure(&normalized) {
199        return Some(JobFailureKind::RunnerSystemFailure);
200    }
201    if is_stuck_or_timeout_failure(&normalized) {
202        return Some(JobFailureKind::StuckOrTimeoutFailure);
203    }
204    if is_data_integrity_failure(&normalized) {
205        return Some(JobFailureKind::DataIntegrityFailure);
206    }
207    if is_script_failure(&normalized) {
208        return Some(JobFailureKind::ScriptFailure);
209    }
210    let _ = job_name;
211    Some(JobFailureKind::UnknownFailure)
212}
213
214fn is_api_failure(message: &str) -> bool {
215    message.contains("failed to invoke curl to download artifacts")
216        || message.contains("curl failed to download artifacts")
217        || message.contains("failed to download artifacts for")
218}
219
220fn is_runner_unsupported(message: &str) -> bool {
221    message.contains("services are only supported when using")
222}
223
224fn is_stale_schedule_failure(message: &str) -> bool {
225    message.contains("stale schedule")
226}
227
228fn is_archived_failure(message: &str) -> bool {
229    message.contains("archived failure")
230        || message.contains("project is archived")
231        || message.contains("repository is archived")
232}
233
234fn is_unmet_prerequisites(message: &str) -> bool {
235    message.contains("has no image")
236        || message.contains("requires artifacts from project")
237        || message.contains("requires artifacts from '")
238        || message.contains("but it did not run")
239        || message.contains("no gitlab token is configured")
240        || message.contains("depends on unknown job")
241}
242
243fn is_scheduler_failure(message: &str) -> bool {
244    message.contains("failed to acquire job slot")
245}
246
247fn is_runner_system_failure(message: &str) -> bool {
248    message.contains("failed to start service")
249        || message.contains("failed readiness check")
250        || message.contains("failed to create network")
251        || message.contains("job task panicked")
252        || message.contains("failed to run docker command")
253        || message.contains("failed to run podman command")
254        || message.contains("failed to run nerdctl command")
255        || message.contains("failed to run containercli command")
256        || message.contains("failed to run orbstack command")
257        || message.contains("missing stdout from container process")
258        || message.contains("missing stderr from container process")
259        || message.contains("failed to create log at")
260        || message.contains("failed to invoke python3 to extract artifacts")
261}
262
263fn is_stuck_or_timeout_failure(message: &str) -> bool {
264    message.contains("timed out")
265}
266
267fn is_data_integrity_failure(message: &str) -> bool {
268    message.contains("unable to extract artifacts archive")
269}
270
271fn is_script_failure(message: &str) -> bool {
272    message.contains("container command exited with status")
273}
274
275#[cfg(test)]
276mod tests {
277    use super::{classify_failure, completion_result, extract_exit_code};
278    use crate::pipeline::JobFailureKind;
279    use anyhow::anyhow;
280
281    #[test]
282    fn completion_result_prefers_cancelled_state() {
283        let result = completion_result(Ok(()), true).expect_err("cancelled job should fail");
284        assert_eq!(result.to_string(), "job cancelled by user");
285    }
286
287    #[test]
288    fn classify_failure_distinguishes_timeout() {
289        let result = Err(anyhow!("job exceeded timeout of 5m"));
290        assert_eq!(
291            classify_failure("job", &result, false),
292            Some(JobFailureKind::JobExecutionTimeout)
293        );
294    }
295
296    #[test]
297    fn classify_failure_defaults_to_script_failure() {
298        let result = Err(anyhow!("container command exited with status Some(1)"));
299        assert_eq!(
300            classify_failure("job", &result, false),
301            Some(JobFailureKind::ScriptFailure)
302        );
303    }
304
305    #[test]
306    fn classify_failure_detects_api_failure() {
307        let result = Err(anyhow!(
308            "failed to download artifacts for 'build' from project 'group/project': curl failed to download artifacts from https://gitlab.example/api/v4/projects/group%2Fproject/jobs/artifacts/main/download?job=build (status 404)"
309        ));
310        assert_eq!(
311            classify_failure("job", &result, false),
312            Some(JobFailureKind::ApiFailure)
313        );
314    }
315
316    #[test]
317    fn classify_failure_detects_unmet_prerequisites() {
318        let result = Err(anyhow!(
319            "job 'build' has no image (use --base-image or set image in pipeline/job)"
320        ));
321        assert_eq!(
322            classify_failure("job", &result, false),
323            Some(JobFailureKind::UnmetPrerequisites)
324        );
325    }
326
327    #[test]
328    fn classify_failure_falls_back_to_unknown_failure() {
329        let result = Err(anyhow!("unexpected executor error"));
330        assert_eq!(
331            classify_failure("job", &result, false),
332            Some(JobFailureKind::UnknownFailure)
333        );
334    }
335
336    #[test]
337    fn classify_failure_detects_stale_schedule() {
338        let result = Err(anyhow!("stale schedule prevented delayed job execution"));
339        assert_eq!(
340            classify_failure("job", &result, false),
341            Some(JobFailureKind::StaleSchedule)
342        );
343    }
344
345    #[test]
346    fn classify_failure_detects_archived_failure() {
347        let result = Err(anyhow!("project is archived"));
348        assert_eq!(
349            classify_failure("job", &result, false),
350            Some(JobFailureKind::ArchivedFailure)
351        );
352    }
353
354    #[test]
355    fn classify_failure_detects_data_integrity_failure() {
356        let result = Err(anyhow!(
357            "unable to extract artifacts archive /tmp/archive.zip"
358        ));
359        assert_eq!(
360            classify_failure("job", &result, false),
361            Some(JobFailureKind::DataIntegrityFailure)
362        );
363    }
364
365    #[test]
366    fn extract_exit_code_reads_container_exit_status() {
367        let result = Err(anyhow!("container command exited with status Some(137)"));
368        assert_eq!(extract_exit_code(&result, false), Some(137));
369    }
370}