Skip to main content

opal/executor/
job_runner.rs

1use super::core::ExecutorCore;
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::pipeline::{JobEvent, JobFailureKind, JobRunInfo};
4use crate::runner::ExecuteContext;
5use crate::ui::{UiBridge, UiJobStatus};
6use anyhow::{Result, anyhow};
7use std::fs;
8use std::path::Path;
9use std::sync::Arc;
10use std::time::Instant;
11
12pub(crate) fn run_planned_job(
13    exec: &ExecutorCore,
14    plan: Arc<ExecutionPlan>,
15    planned: ExecutableJob,
16    run_info: JobRunInfo,
17    ui: Option<Arc<UiBridge>>,
18) -> JobEvent {
19    let ExecutableJob {
20        instance,
21        log_path,
22        log_hash,
23    } = planned;
24    let job = instance.job;
25    let stage_name = instance.stage_name;
26    let job_name = job.name.clone();
27    let job_start = Instant::now();
28    let ui_ref = ui.as_deref();
29
30    // TODO: what the fuck is this, why is this a function here?
31    // the logic needs to be simplified, garbage
32    let result = (|| -> Result<()> {
33        let mut prepared = exec.prepare_job_run(plan.as_ref(), &job)?;
34        let container_name = run_info.container_name.clone();
35        let exec_result = exec.execute(ExecuteContext {
36            host_workdir: &prepared.host_workdir,
37            script_path: &prepared.script_path,
38            log_path: &log_path,
39            mounts: &prepared.mounts,
40            image: &prepared.job_image.name,
41            image_platform: prepared.job_image.docker_platform.as_deref(),
42            image_user: prepared.job_image.docker_user.as_deref(),
43            image_entrypoint: &prepared.job_image.entrypoint,
44            container_name: &container_name,
45            job: &job,
46            ui: ui_ref,
47            env_vars: &prepared.env_vars,
48            network: prepared
49                .service_runtime
50                .as_ref()
51                .map(|runtime| runtime.network_name()),
52            preserve_runtime_objects: exec.config.settings.preserve_runtime_objects(),
53            arch: prepared.arch.as_deref(),
54            privileged: prepared.privileged,
55            cap_add: &prepared.cap_add,
56            cap_drop: &prepared.cap_drop,
57        });
58        let service_network = prepared
59            .service_runtime
60            .as_ref()
61            .map(|runtime| runtime.network_name().to_string());
62        let service_containers = prepared
63            .service_runtime
64            .as_ref()
65            .map(|runtime| runtime.container_names().to_vec())
66            .unwrap_or_default();
67        let runtime_summary_path = exec.write_runtime_summary(
68            &job.name,
69            &container_name,
70            prepared
71                .service_runtime
72                .as_ref()
73                .map(|runtime| runtime.network_name()),
74            &service_containers,
75        )?;
76        exec.record_runtime_objects(
77            &job.name,
78            container_name.clone(),
79            service_network,
80            service_containers,
81            runtime_summary_path,
82        );
83        if !exec.config.settings.preserve_runtime_objects() {
84            exec.cleanup_finished_container(&container_name);
85        }
86        if let Some(mut runtime) = prepared.service_runtime.take()
87            && !exec.config.settings.preserve_runtime_objects()
88        {
89            runtime.cleanup();
90        }
91        exec.collect_declared_artifacts(&job, &prepared.host_workdir)?;
92        exec.collect_untracked_artifacts(&job, &prepared.host_workdir)?;
93        exec.collect_dotenv_artifacts(&job, &prepared.host_workdir)?;
94        exec_result?;
95        exec.print_job_completion(
96            &stage_name,
97            &prepared.script_path,
98            &log_path,
99            job_start.elapsed().as_secs_f32(),
100        );
101        Ok(())
102    })();
103
104    let duration = job_start.elapsed().as_secs_f32();
105    let cancelled = exec.take_cancelled_job(&job_name);
106    let final_result = completion_result(result, cancelled, &log_path);
107    if let Some(ui) = ui_ref {
108        match &final_result {
109            Ok(_) => ui.job_finished(&job_name, UiJobStatus::Success, duration, None),
110            Err(err) => {
111                if cancelled {
112                    ui.job_finished(
113                        &job_name,
114                        UiJobStatus::Skipped,
115                        duration,
116                        Some("aborted by user".to_string()),
117                    );
118                } else {
119                    ui.job_finished(
120                        &job_name,
121                        UiJobStatus::Failed,
122                        duration,
123                        Some(err.to_string()),
124                    );
125                }
126            }
127        }
128    }
129
130    exec.clear_running_container(&job_name);
131
132    let exit_code = extract_exit_code(&final_result, cancelled);
133    let failure_kind = classify_failure(&job_name, &final_result, cancelled);
134
135    JobEvent {
136        name: job_name,
137        stage_name,
138        duration,
139        log_path: Some(log_path.clone()),
140        log_hash,
141        result: final_result,
142        failure_kind,
143        exit_code,
144        cancelled,
145    }
146}
147
148fn completion_result(result: Result<()>, cancelled: bool, log_path: &Path) -> Result<()> {
149    if cancelled {
150        Err(anyhow!("job cancelled by user"))
151    } else {
152        enrich_failure_with_log_hint(result, log_path)
153    }
154}
155
156fn enrich_failure_with_log_hint(result: Result<()>, log_path: &Path) -> Result<()> {
157    let err = match result {
158        Ok(()) => return Ok(()),
159        Err(err) => err,
160    };
161    let message = err.to_string();
162    if !message.contains("container command exited with status") {
163        return Err(err);
164    }
165    let Some(hint) = failure_hint_from_log(log_path) else {
166        return Err(err);
167    };
168    Err(anyhow!("{message}; hint: {hint}"))
169}
170
171fn failure_hint_from_log(log_path: &Path) -> Option<&'static str> {
172    let tail = read_log_tail(log_path, 32 * 1024)?;
173    if tail.contains("error: rustup could not choose a version of rustc to run")
174        && tail.contains("no default is configured")
175    {
176        return Some(
177            "the job log shows an empty rustup home. A workspace-local `RUSTUP_HOME` can mask the Rust toolchain bundled in the image, especially after a cold branch/tag cache key. Prefer leaving `RUSTUP_HOME` unset in Rust images, or bootstrap the toolchain explicitly before running `rustc`/`cargo`",
178        );
179    }
180    None
181}
182
183fn read_log_tail(log_path: &Path, max_bytes: usize) -> Option<String> {
184    let bytes = fs::read(log_path).ok()?;
185    let start = bytes.len().saturating_sub(max_bytes);
186    Some(String::from_utf8_lossy(&bytes[start..]).into_owned())
187}
188
189fn extract_exit_code(result: &Result<()>, cancelled: bool) -> Option<i32> {
190    if cancelled {
191        return None;
192    }
193    let message = result.as_ref().err()?.to_string();
194    let marker = "container command exited with status Some(";
195    let start = message.find(marker)? + marker.len();
196    let rest = &message[start..];
197    let end = rest.find(')')?;
198    rest[..end].parse::<i32>().ok()
199}
200
201fn classify_failure(
202    job_name: &str,
203    result: &Result<()>,
204    cancelled: bool,
205) -> Option<JobFailureKind> {
206    if cancelled {
207        return None;
208    }
209    let err = result.as_ref().err()?;
210    let message = err.to_string();
211    let normalized = message.to_ascii_lowercase();
212    if normalized.contains("job exceeded timeout") {
213        return Some(JobFailureKind::JobExecutionTimeout);
214    }
215    if is_api_failure(&normalized) {
216        return Some(JobFailureKind::ApiFailure);
217    }
218    if is_runner_unsupported(&normalized) {
219        return Some(JobFailureKind::RunnerUnsupported);
220    }
221    if is_stale_schedule_failure(&normalized) {
222        return Some(JobFailureKind::StaleSchedule);
223    }
224    if is_archived_failure(&normalized) {
225        return Some(JobFailureKind::ArchivedFailure);
226    }
227    if is_unmet_prerequisites(&normalized) {
228        return Some(JobFailureKind::UnmetPrerequisites);
229    }
230    if is_scheduler_failure(&normalized) {
231        return Some(JobFailureKind::SchedulerFailure);
232    }
233    if is_runner_system_failure(&normalized) {
234        return Some(JobFailureKind::RunnerSystemFailure);
235    }
236    if is_stuck_or_timeout_failure(&normalized) {
237        return Some(JobFailureKind::StuckOrTimeoutFailure);
238    }
239    if is_data_integrity_failure(&normalized) {
240        return Some(JobFailureKind::DataIntegrityFailure);
241    }
242    if is_script_failure(&normalized) {
243        return Some(JobFailureKind::ScriptFailure);
244    }
245    let _ = job_name;
246    Some(JobFailureKind::UnknownFailure)
247}
248
249fn is_api_failure(message: &str) -> bool {
250    message.contains("failed to invoke curl to download artifacts")
251        || message.contains("curl failed to download artifacts")
252        || message.contains("failed to download artifacts for")
253}
254
255fn is_runner_unsupported(message: &str) -> bool {
256    message.contains("services are only supported when using")
257}
258
259fn is_stale_schedule_failure(message: &str) -> bool {
260    message.contains("stale schedule")
261}
262
263fn is_archived_failure(message: &str) -> bool {
264    message.contains("archived failure")
265        || message.contains("project is archived")
266        || message.contains("repository is archived")
267}
268
269fn is_unmet_prerequisites(message: &str) -> bool {
270    message.contains("has no image")
271        || message.contains("requires artifacts from project")
272        || message.contains("requires artifacts from '")
273        || message.contains("but it did not run")
274        || message.contains("no gitlab token is configured")
275        || message.contains("depends on unknown job")
276}
277
278fn is_scheduler_failure(message: &str) -> bool {
279    message.contains("failed to acquire job slot")
280}
281
282fn is_runner_system_failure(message: &str) -> bool {
283    message.contains("failed to start service")
284        || message.contains("failed readiness check")
285        || message.contains("failed to create network")
286        || message.contains("job task panicked")
287        || message.contains("failed to run docker command")
288        || message.contains("failed to run podman command")
289        || message.contains("failed to run nerdctl command")
290        || message.contains("failed to run containercli command")
291        || message.contains("failed to run orbstack command")
292        || message.contains("missing stdout from container process")
293        || message.contains("missing stderr from container process")
294        || message.contains("failed to create log at")
295        || message.contains("failed to invoke python3 to extract artifacts")
296}
297
298fn is_stuck_or_timeout_failure(message: &str) -> bool {
299    message.contains("timed out")
300}
301
302fn is_data_integrity_failure(message: &str) -> bool {
303    message.contains("unable to extract artifacts archive")
304}
305
306fn is_script_failure(message: &str) -> bool {
307    message.contains("container command exited with status")
308}
309
310#[cfg(test)]
311mod tests {
312    use super::{classify_failure, completion_result, extract_exit_code};
313    use crate::pipeline::JobFailureKind;
314    use anyhow::anyhow;
315    use std::fs;
316    use std::path::PathBuf;
317    use std::time::{SystemTime, UNIX_EPOCH};
318
319    #[test]
320    fn completion_result_prefers_cancelled_state() {
321        let log_path = temp_path("job-run-cancelled").join("job.log");
322        let result =
323            completion_result(Ok(()), true, &log_path).expect_err("cancelled job should fail");
324        assert_eq!(result.to_string(), "job cancelled by user");
325    }
326
327    #[test]
328    fn completion_result_adds_rustup_hint_from_log() {
329        let temp_root = temp_path("job-run-rustup-hint");
330        fs::create_dir_all(&temp_root).expect("create temp root");
331        let log_path = temp_root.join("job.log");
332        fs::write(
333            &log_path,
334            "[fetch-sources] error: rustup could not choose a version of rustc to run, because one wasn't specified explicitly, and no default is configured.\nhelp: run 'rustup default stable' to download the latest stable release of Rust and set it as your default toolchain.\n",
335        )
336        .expect("write log");
337
338        let result = completion_result(
339            Err(anyhow!("container command exited with status Some(1)")),
340            false,
341            &log_path,
342        )
343        .expect_err("rustup failure should remain an error");
344        let message = result.to_string();
345        assert!(message.contains("container command exited with status Some(1)"));
346        assert!(message.contains("RUSTUP_HOME"));
347        assert!(message.contains("cold branch/tag cache key"));
348
349        let _ = fs::remove_dir_all(temp_root);
350    }
351
352    #[test]
353    fn classify_failure_distinguishes_timeout() {
354        let result = Err(anyhow!("job exceeded timeout of 5m"));
355        assert_eq!(
356            classify_failure("job", &result, false),
357            Some(JobFailureKind::JobExecutionTimeout)
358        );
359    }
360
361    #[test]
362    fn classify_failure_defaults_to_script_failure() {
363        let result = Err(anyhow!("container command exited with status Some(1)"));
364        assert_eq!(
365            classify_failure("job", &result, false),
366            Some(JobFailureKind::ScriptFailure)
367        );
368    }
369
370    #[test]
371    fn classify_failure_detects_api_failure() {
372        let result = Err(anyhow!(
373            "failed to download artifacts for 'build' from project 'group/project': curl failed to download artifacts from https://gitlab.example/api/v4/projects/group%2Fproject/jobs/artifacts/main/download?job=build (status 404)"
374        ));
375        assert_eq!(
376            classify_failure("job", &result, false),
377            Some(JobFailureKind::ApiFailure)
378        );
379    }
380
381    #[test]
382    fn classify_failure_detects_unmet_prerequisites() {
383        let result = Err(anyhow!(
384            "job 'build' has no image (use --base-image or set image in pipeline/job)"
385        ));
386        assert_eq!(
387            classify_failure("job", &result, false),
388            Some(JobFailureKind::UnmetPrerequisites)
389        );
390    }
391
392    #[test]
393    fn classify_failure_falls_back_to_unknown_failure() {
394        let result = Err(anyhow!("unexpected executor error"));
395        assert_eq!(
396            classify_failure("job", &result, false),
397            Some(JobFailureKind::UnknownFailure)
398        );
399    }
400
401    #[test]
402    fn classify_failure_detects_stale_schedule() {
403        let result = Err(anyhow!("stale schedule prevented delayed job execution"));
404        assert_eq!(
405            classify_failure("job", &result, false),
406            Some(JobFailureKind::StaleSchedule)
407        );
408    }
409
410    #[test]
411    fn classify_failure_detects_archived_failure() {
412        let result = Err(anyhow!("project is archived"));
413        assert_eq!(
414            classify_failure("job", &result, false),
415            Some(JobFailureKind::ArchivedFailure)
416        );
417    }
418
419    #[test]
420    fn classify_failure_detects_data_integrity_failure() {
421        let result = Err(anyhow!(
422            "unable to extract artifacts archive /tmp/archive.zip"
423        ));
424        assert_eq!(
425            classify_failure("job", &result, false),
426            Some(JobFailureKind::DataIntegrityFailure)
427        );
428    }
429
430    #[test]
431    fn extract_exit_code_reads_container_exit_status() {
432        let result = Err(anyhow!("container command exited with status Some(137)"));
433        assert_eq!(extract_exit_code(&result, false), Some(137));
434    }
435
436    fn temp_path(prefix: &str) -> PathBuf {
437        let nanos = SystemTime::now()
438            .duration_since(UNIX_EPOCH)
439            .expect("system time before epoch")
440            .as_nanos();
441        std::env::temp_dir().join(format!("opal-{prefix}-{nanos}"))
442    }
443}