Skip to main content

opal/executor/
orchestrator.rs

1use super::{core::ExecutorCore, job_runner};
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::model::ArtifactSourceOutcome;
4use crate::pipeline::{
5    self, HaltKind, JobEvent, JobFailureKind, JobStatus, JobSummary, ResourceGroupManager, RuleWhen,
6};
7use crate::runtime;
8use crate::ui::{UiBridge, UiCommand, UiJobStatus};
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use tokio::{
13    sync::{Semaphore, mpsc},
14    task, time as tokio_time,
15};
16
17fn interruptible_running_jobs(plan: &ExecutionPlan, running: &HashSet<String>) -> Vec<String> {
18    let mut names = running
19        .iter()
20        .filter(|name| {
21            plan.nodes
22                .get(*name)
23                .map(|planned| planned.instance.interruptible)
24                .unwrap_or(false)
25        })
26        .cloned()
27        .collect::<Vec<_>>();
28    names.sort_by_key(|name| plan.order_index.get(name).copied().unwrap_or(usize::MAX));
29    names
30}
31
32pub(crate) async fn execute_plan(
33    exec: &ExecutorCore,
34    plan: Arc<ExecutionPlan>,
35    ui: Option<Arc<UiBridge>>,
36    mut commands: Option<&mut mpsc::UnboundedReceiver<UiCommand>>,
37) -> (Vec<JobSummary>, Result<()>) {
38    //TODO: too complicated function, does semaphores, expects signals over channels, fucking no
39    //refactor this trash and structure properly
40    let total = plan.ordered.len();
41    if total == 0 {
42        return (Vec::new(), Ok(()));
43    }
44
45    let mut remaining: HashMap<String, usize> = plan
46        .nodes
47        .iter()
48        .map(|(name, job)| (name.clone(), job.instance.dependencies.len()))
49        .collect();
50    let mut ready: VecDeque<String> = VecDeque::new();
51    let mut waiting_on_failure: VecDeque<String> = VecDeque::new();
52    let mut delayed_pending: HashSet<String> = HashSet::new();
53    let mut resource_retry_pending: HashSet<String> = HashSet::new();
54    let mut manual_waiting: HashSet<String> = HashSet::new();
55    let mut running = HashSet::new();
56    let mut abort_requested = false;
57    let mut completed = 0usize;
58    let mut pipeline_failed = false;
59    let mut halt_kind = HaltKind::None;
60    let mut halt_error: Option<anyhow::Error> = None;
61    let mut summaries: Vec<JobSummary> = Vec::new();
62    let mut attempts: HashMap<String, u32> = HashMap::new();
63    let mut resource_waiting: HashMap<String, VecDeque<String>> = HashMap::new();
64    let resource_groups = ResourceGroupManager::new(runtime::resource_group_root());
65    let mut manual_input_available = commands.is_some();
66
67    let semaphore = Arc::new(Semaphore::new(exec.config.max_parallel_jobs.max(1)));
68    let exec = Arc::new(exec.clone());
69    let (tx, mut rx) = mpsc::unbounded_channel::<JobEvent>();
70    let (delay_tx, mut delay_rx) = mpsc::unbounded_channel::<String>();
71
72    let enqueue_ready = |job_name: &str,
73                         pipeline_failed_flag: bool,
74                         ready_queue: &mut VecDeque<String>,
75                         wait_failure_queue: &mut VecDeque<String>,
76                         delayed_set: &mut HashSet<String>| {
77        let Some(planned) = plan.nodes.get(job_name) else {
78            return;
79        };
80        match planned.instance.rule.when {
81            RuleWhen::OnFailure => {
82                if pipeline_failed_flag {
83                    ready_queue.push_back(job_name.to_string());
84                } else {
85                    wait_failure_queue.push_back(job_name.to_string());
86                }
87            }
88            RuleWhen::Delayed => {
89                if pipeline_failed_flag {
90                    return;
91                }
92                if let Some(delay) = planned.instance.rule.start_in {
93                    if delayed_set.insert(job_name.to_string()) {
94                        let tx_clone = delay_tx.clone();
95                        let name = job_name.to_string();
96                        task::spawn(async move {
97                            tokio_time::sleep(delay).await;
98                            let _ = tx_clone.send(name);
99                        });
100                    }
101                } else {
102                    ready_queue.push_back(job_name.to_string());
103                }
104            }
105            RuleWhen::Manual | RuleWhen::OnSuccess => {
106                if pipeline_failed_flag && planned.instance.rule.when.requires_success() {
107                    return;
108                }
109                ready_queue.push_back(job_name.to_string());
110            }
111            RuleWhen::Always => {
112                ready_queue.push_back(job_name.to_string());
113            }
114            RuleWhen::Never => {}
115        }
116    };
117
118    for name in &plan.ordered {
119        if remaining.get(name).copied().unwrap_or(0) == 0 && !abort_requested {
120            enqueue_ready(
121                name,
122                pipeline_failed,
123                &mut ready,
124                &mut waiting_on_failure,
125                &mut delayed_pending,
126            );
127        }
128    }
129
130    while completed < total {
131        while let Some(name) = ready.pop_front() {
132            if abort_requested {
133                break;
134            }
135            let planned = match plan.nodes.get(&name).cloned() {
136                Some(job) => job,
137                None => continue,
138            };
139            if pipeline_failed && planned.instance.rule.when.requires_success() {
140                continue;
141            }
142
143            if matches!(planned.instance.rule.when, RuleWhen::Manual)
144                && !planned.instance.rule.manual_auto_run
145            {
146                if manual_input_available {
147                    if manual_waiting.insert(name.clone())
148                        && let Some(ui_ref) = ui.as_deref()
149                    {
150                        ui_ref.job_manual_pending(&name);
151                    }
152                } else {
153                    let reason = planned
154                        .instance
155                        .rule
156                        .manual_reason
157                        .clone()
158                        .unwrap_or_else(|| "manual job not run".to_string());
159                    if let Some(ui_ref) = ui.as_deref() {
160                        ui_ref.job_finished(
161                            &planned.instance.job.name,
162                            UiJobStatus::Skipped,
163                            0.0,
164                            Some(reason.clone()),
165                        );
166                    }
167                    summaries.push(JobSummary {
168                        name: planned.instance.job.name.clone(),
169                        stage_name: planned.instance.stage_name.clone(),
170                        duration: 0.0,
171                        status: JobStatus::Skipped(reason.clone()),
172                        log_path: None,
173                        log_hash: planned.log_hash.clone(),
174                        allow_failure: planned.instance.rule.allow_failure,
175                        environment: exec.expanded_environment(&planned.instance.job),
176                    });
177                    completed += 1;
178                    release_dependents(
179                        &plan,
180                        &name,
181                        &mut remaining,
182                        abort_requested,
183                        pipeline_failed,
184                        &mut ReadyQueues {
185                            ready: &mut ready,
186                            waiting_on_failure: &mut waiting_on_failure,
187                            delayed_pending: &mut delayed_pending,
188                        },
189                        &enqueue_ready,
190                    );
191                }
192                continue;
193            }
194
195            if let Some(group) = &planned.instance.resource_group {
196                let acquired =
197                    match resource_groups.try_acquire(group, &planned.instance.job.name) {
198                        Ok(acquired) => acquired,
199                        Err(err) => {
200                            halt_kind = HaltKind::JobFailure;
201                            pipeline_failed = true;
202                            if halt_error.is_none() {
203                                halt_error = Some(err.context(format!(
204                                    "failed to acquire resource group '{}'",
205                                    group
206                                )));
207                            }
208                            break;
209                        }
210                    };
211                if !acquired {
212                    if running.is_empty() && ready.is_empty() {
213                        loop {
214                            tokio_time::sleep(std::time::Duration::from_millis(500)).await;
215                            match resource_groups.try_acquire(group, &planned.instance.job.name) {
216                                Ok(true) => break,
217                                Ok(false) => continue,
218                                Err(err) => {
219                                    halt_kind = HaltKind::JobFailure;
220                                    pipeline_failed = true;
221                                    if halt_error.is_none() {
222                                        halt_error = Some(err.context(format!(
223                                            "failed to acquire resource group '{}'",
224                                            group
225                                        )));
226                                    }
227                                    break;
228                                }
229                            }
230                        }
231                        if pipeline_failed {
232                            break;
233                        }
234                    } else {
235                        if resource_retry_pending.insert(name.clone()) {
236                            let tx_clone = delay_tx.clone();
237                            let retry_name = name.clone();
238                            task::spawn(async move {
239                                tokio_time::sleep(std::time::Duration::from_millis(500)).await;
240                                let _ = tx_clone.send(retry_name);
241                            });
242                        }
243                        continue;
244                    }
245                }
246            }
247
248            let entry = attempts.entry(name.clone()).or_insert(0);
249            *entry += 1;
250
251            let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
252                Ok(info) => info,
253                Err(err) => {
254                    summaries.push(JobSummary {
255                        name: planned.instance.job.name.clone(),
256                        stage_name: planned.instance.stage_name.clone(),
257                        duration: 0.0,
258                        status: JobStatus::Failed(err.to_string()),
259                        log_path: Some(planned.log_path.clone()),
260                        log_hash: planned.log_hash.clone(),
261                        allow_failure: false,
262                        environment: exec.expanded_environment(&planned.instance.job),
263                    });
264                    return (summaries, Err(err));
265                }
266            };
267            running.insert(name.clone());
268            pipeline::spawn_job(
269                exec.clone(),
270                plan.clone(),
271                planned,
272                run_info,
273                semaphore.clone(),
274                tx.clone(),
275                ui.clone(),
276            );
277        }
278
279        if completed >= total {
280            break;
281        }
282
283        if running.is_empty()
284            && ready.is_empty()
285            && delayed_pending.is_empty()
286            && pipeline_failed
287            && waiting_on_failure.is_empty()
288            && manual_waiting.is_empty()
289        {
290            break;
291        }
292
293        if running.is_empty()
294            && ready.is_empty()
295            && delayed_pending.is_empty()
296            && !pipeline_failed
297            && waiting_on_failure.is_empty()
298            && manual_waiting.is_empty()
299        {
300            let remaining_jobs: Vec<_> = remaining
301                .iter()
302                .filter_map(|(name, &count)| if count > 0 { Some(name.clone()) } else { None })
303                .collect();
304            if !remaining_jobs.is_empty() {
305                halt_kind = HaltKind::Deadlock;
306                halt_error = Some(anyhow!(
307                    "no runnable jobs, potential dependency cycle involving: {:?}",
308                    remaining_jobs
309                ));
310            }
311            break;
312        }
313
314        if running.is_empty()
315            && ready.is_empty()
316            && delayed_pending.is_empty()
317            && !pipeline_failed
318            && !waiting_on_failure.is_empty()
319            && manual_waiting.is_empty()
320        {
321            break;
322        }
323
324        enum SchedulerEvent {
325            Job(JobEvent),
326            Delay(String),
327            Command(UiCommand),
328        }
329
330        let next_event = tokio::select! {
331            Some(event) = rx.recv() => Some(SchedulerEvent::Job(event)),
332            Some(name) = delay_rx.recv() => Some(SchedulerEvent::Delay(name)),
333            cmd = async {
334                if let Some(rx) = commands.as_mut() {
335                    (*rx).recv().await
336                } else {
337                    std::future::pending().await
338                }
339            } => {
340                match cmd {
341                    Some(command) => Some(SchedulerEvent::Command(command)),
342                    None => {
343                        manual_input_available = false;
344                        commands = None;
345                        None
346                    }
347                }
348            }
349            else => None,
350        };
351
352        if !manual_input_available && !manual_waiting.is_empty() {
353            let pending: Vec<String> = manual_waiting.drain().collect();
354            for name in pending {
355                if let Some(planned) = plan.nodes.get(&name) {
356                    let reason = planned
357                        .instance
358                        .rule
359                        .manual_reason
360                        .clone()
361                        .unwrap_or_else(|| "manual job not run".to_string());
362                    if let Some(ui_ref) = ui.as_deref() {
363                        ui_ref.job_finished(
364                            &planned.instance.job.name,
365                            UiJobStatus::Skipped,
366                            0.0,
367                            Some(reason.clone()),
368                        );
369                    }
370                    summaries.push(JobSummary {
371                        name: planned.instance.job.name.clone(),
372                        stage_name: planned.instance.stage_name.clone(),
373                        duration: 0.0,
374                        status: JobStatus::Skipped(reason),
375                        log_path: None,
376                        log_hash: planned.log_hash.clone(),
377                        allow_failure: planned.instance.rule.allow_failure,
378                        environment: exec.expanded_environment(&planned.instance.job),
379                    });
380                    completed += 1;
381                    release_dependents(
382                        &plan,
383                        &name,
384                        &mut remaining,
385                        abort_requested,
386                        pipeline_failed,
387                        &mut ReadyQueues {
388                            ready: &mut ready,
389                            waiting_on_failure: &mut waiting_on_failure,
390                            delayed_pending: &mut delayed_pending,
391                        },
392                        &enqueue_ready,
393                    );
394                }
395            }
396        }
397
398        let Some(event) = next_event else {
399            if running.is_empty()
400                && ready.is_empty()
401                && delayed_pending.is_empty()
402                && resource_retry_pending.is_empty()
403            {
404                halt_kind = HaltKind::ChannelClosed;
405                halt_error = Some(anyhow!(
406                    "job worker channel closed unexpectedly while {} jobs remained",
407                    total - completed
408                ));
409                break;
410            }
411            continue;
412        };
413
414        match event {
415            SchedulerEvent::Delay(name) => {
416                if abort_requested {
417                    continue;
418                }
419                if resource_retry_pending.remove(&name) {
420                    ready.push_back(name);
421                    continue;
422                }
423                delayed_pending.remove(&name);
424                if pipeline_failed
425                    && let Some(planned) = plan.nodes.get(&name)
426                    && planned.instance.rule.when.requires_success()
427                {
428                    continue;
429                }
430                ready.push_back(name);
431            }
432            SchedulerEvent::Command(cmd) => match cmd {
433                UiCommand::StartManual { name } => {
434                    if manual_waiting.remove(&name) {
435                        ready.push_back(name);
436                    }
437                }
438                UiCommand::CancelJob { name } => {
439                    exec.cancel_running_job(&name);
440                }
441                UiCommand::AbortPipeline => {
442                    abort_requested = true;
443                    pipeline_failed = true;
444                    halt_kind = HaltKind::Aborted;
445                    if halt_error.is_none() {
446                        halt_error = Some(anyhow!("pipeline aborted by user"));
447                    }
448                    for name in interruptible_running_jobs(plan.as_ref(), &running) {
449                        exec.cancel_running_job(&name);
450                    }
451                    ready.clear();
452                    waiting_on_failure.clear();
453                    delayed_pending.clear();
454                    manual_waiting.clear();
455                }
456                UiCommand::RestartJob { .. } => {}
457            },
458            SchedulerEvent::Job(event) => {
459                running.remove(&event.name);
460                let Some(planned) = plan.nodes.get(&event.name) else {
461                    let message = format!(
462                        "completed job '{}' was not found in execution plan",
463                        event.name
464                    );
465                    if !pipeline_failed {
466                        pipeline_failed = true;
467                        halt_kind = HaltKind::JobFailure;
468                        if halt_error.is_none() {
469                            halt_error = Some(anyhow!(message.clone()));
470                        }
471                    }
472                    summaries.push(JobSummary {
473                        name: event.name.clone(),
474                        stage_name: event.stage_name.clone(),
475                        duration: event.duration,
476                        status: JobStatus::Failed(message),
477                        log_path: event.log_path.clone(),
478                        log_hash: event.log_hash.clone(),
479                        allow_failure: false,
480                        environment: None,
481                    });
482                    completed += 1;
483                    continue;
484                };
485                match event.result {
486                    Ok(_) => {
487                        exec.record_completed_job(&event.name, ArtifactSourceOutcome::Success);
488                        release_resource_lock(
489                            planned,
490                            &mut ready,
491                            &resource_groups,
492                            &mut resource_waiting,
493                        );
494                        release_dependents(
495                            &plan,
496                            &event.name,
497                            &mut remaining,
498                            abort_requested,
499                            pipeline_failed,
500                            &mut ReadyQueues {
501                                ready: &mut ready,
502                                waiting_on_failure: &mut waiting_on_failure,
503                                delayed_pending: &mut delayed_pending,
504                            },
505                            &enqueue_ready,
506                        );
507                        summaries.push(JobSummary {
508                            name: event.name.clone(),
509                            stage_name: event.stage_name.clone(),
510                            duration: event.duration,
511                            status: JobStatus::Success,
512                            log_path: event.log_path.clone(),
513                            log_hash: event.log_hash.clone(),
514                            allow_failure: planned.instance.rule.allow_failure,
515                            environment: exec.expanded_environment(&planned.instance.job),
516                        });
517                        completed += 1;
518                    }
519                    Err(err) => {
520                        if event.cancelled {
521                            exec.record_completed_job(&event.name, ArtifactSourceOutcome::Skipped);
522                            release_resource_lock(
523                                planned,
524                                &mut ready,
525                                &resource_groups,
526                                &mut resource_waiting,
527                            );
528                            summaries.push(JobSummary {
529                                name: event.name.clone(),
530                                stage_name: event.stage_name.clone(),
531                                duration: event.duration,
532                                status: JobStatus::Skipped("aborted by user".to_string()),
533                                log_path: event.log_path.clone(),
534                                log_hash: event.log_hash.clone(),
535                                allow_failure: true,
536                                environment: exec.expanded_environment(&planned.instance.job),
537                            });
538                            completed += 1;
539                            continue;
540                        }
541                        let err_msg = err.to_string();
542                        let attempts_so_far = attempts.get(&event.name).copied().unwrap_or(1);
543                        let retries_used = attempts_so_far.saturating_sub(1);
544                        if retries_used < planned.instance.retry.max
545                            && retry_allowed(
546                                &planned.instance.retry.when,
547                                &planned.instance.retry.exit_codes,
548                                event.failure_kind,
549                                event.exit_code,
550                            )
551                        {
552                            release_resource_lock(
553                                planned,
554                                &mut ready,
555                                &resource_groups,
556                                &mut resource_waiting,
557                            );
558                            ready.push_back(event.name.clone());
559                            continue;
560                        }
561                        exec.record_completed_job(&event.name, ArtifactSourceOutcome::Failed);
562                        release_resource_lock(
563                            planned,
564                            &mut ready,
565                            &resource_groups,
566                            &mut resource_waiting,
567                        );
568                        if !planned.instance.rule.allow_failure && !pipeline_failed {
569                            pipeline_failed = true;
570                            halt_kind = HaltKind::JobFailure;
571                            if halt_error.is_none() {
572                                halt_error =
573                                    Some(anyhow!("job '{}' failed: {}", event.name, err_msg));
574                            }
575                            while let Some(name) = waiting_on_failure.pop_front() {
576                                ready.push_back(name);
577                            }
578                        }
579                        release_dependents(
580                            &plan,
581                            &event.name,
582                            &mut remaining,
583                            abort_requested,
584                            pipeline_failed,
585                            &mut ReadyQueues {
586                                ready: &mut ready,
587                                waiting_on_failure: &mut waiting_on_failure,
588                                delayed_pending: &mut delayed_pending,
589                            },
590                            &enqueue_ready,
591                        );
592                        summaries.push(JobSummary {
593                            name: event.name.clone(),
594                            stage_name: event.stage_name.clone(),
595                            duration: event.duration,
596                            status: JobStatus::Failed(err_msg),
597                            log_path: event.log_path.clone(),
598                            log_hash: event.log_hash.clone(),
599                            allow_failure: planned.instance.rule.allow_failure,
600                            environment: exec.expanded_environment(&planned.instance.job),
601                        });
602                        completed += 1;
603                    }
604                }
605            }
606        }
607    }
608
609    let skip_reason = match halt_kind {
610        HaltKind::JobFailure => Some("not run (pipeline stopped after failure)".to_string()),
611        HaltKind::Deadlock => Some("not run (dependency cycle detected)".to_string()),
612        HaltKind::ChannelClosed => {
613            Some("not run (executor channel closed unexpectedly)".to_string())
614        }
615        HaltKind::Aborted => Some("not run (pipeline aborted by user)".to_string()),
616        HaltKind::None => None,
617    };
618
619    let mut recorded: HashSet<String> = summaries.iter().map(|entry| entry.name.clone()).collect();
620    for job_name in &plan.ordered {
621        if recorded.contains(job_name) {
622            continue;
623        }
624        let Some(planned) = plan.nodes.get(job_name) else {
625            continue;
626        };
627        let reason = if let Some(reason) = skip_reason.clone() {
628            Some(reason)
629        } else if planned.instance.rule.when == RuleWhen::OnFailure {
630            Some("skipped (rules: on_failure and pipeline succeeded)".to_string())
631        } else {
632            None
633        };
634
635        if let Some(reason) = reason {
636            if let Some(ui_ref) = ui.as_deref() {
637                ui_ref.job_finished(job_name, UiJobStatus::Skipped, 0.0, Some(reason.clone()));
638            }
639            summaries.push(JobSummary {
640                name: job_name.clone(),
641                stage_name: planned.instance.stage_name.clone(),
642                duration: 0.0,
643                status: JobStatus::Skipped(reason.clone()),
644                log_path: Some(planned.log_path.clone()),
645                log_hash: planned.log_hash.clone(),
646                allow_failure: planned.instance.rule.allow_failure,
647                environment: exec.expanded_environment(&planned.instance.job),
648            });
649            recorded.insert(job_name.clone());
650        }
651    }
652
653    let result = halt_error.map_or(Ok(()), Err);
654    (summaries, result)
655}
656
657fn retry_allowed(
658    conditions: &[String],
659    exit_codes: &[i32],
660    failure_kind: Option<JobFailureKind>,
661    exit_code: Option<i32>,
662) -> bool {
663    if conditions.is_empty() && exit_codes.is_empty() {
664        return true;
665    }
666    let when_matches = failure_kind.is_some_and(|kind| {
667        conditions
668            .iter()
669            .any(|condition| retry_condition_matches(condition, kind))
670    });
671    let exit_code_matches = exit_code.is_some_and(|code| exit_codes.contains(&code));
672    when_matches || exit_code_matches
673}
674
675fn retry_condition_matches(condition: &str, failure_kind: JobFailureKind) -> bool {
676    match condition {
677        "always" => true,
678        "unknown_failure" => failure_kind == JobFailureKind::UnknownFailure,
679        "script_failure" => failure_kind == JobFailureKind::ScriptFailure,
680        "api_failure" => failure_kind == JobFailureKind::ApiFailure,
681        "job_execution_timeout" => failure_kind == JobFailureKind::JobExecutionTimeout,
682        "runner_system_failure" => failure_kind == JobFailureKind::RunnerSystemFailure,
683        "runner_unsupported" => failure_kind == JobFailureKind::RunnerUnsupported,
684        "stale_schedule" => failure_kind == JobFailureKind::StaleSchedule,
685        "archived_failure" => failure_kind == JobFailureKind::ArchivedFailure,
686        "unmet_prerequisites" => failure_kind == JobFailureKind::UnmetPrerequisites,
687        "scheduler_failure" => failure_kind == JobFailureKind::SchedulerFailure,
688        "data_integrity_failure" => failure_kind == JobFailureKind::DataIntegrityFailure,
689        "stuck_or_timeout_failure" => {
690            matches!(
691                failure_kind,
692                JobFailureKind::StuckOrTimeoutFailure | JobFailureKind::JobExecutionTimeout
693            )
694        }
695        _ => false,
696    }
697}
698
699pub(crate) async fn handle_restart_commands(
700    exec: &ExecutorCore,
701    plan: Arc<ExecutionPlan>,
702    ui: Option<Arc<UiBridge>>,
703    commands: &mut mpsc::UnboundedReceiver<UiCommand>,
704    summaries: &mut Vec<JobSummary>,
705) -> Result<()> {
706    while let Some(command) = commands.recv().await {
707        match command {
708            UiCommand::RestartJob { name } => {
709                let Some(planned) = plan.nodes.get(&name).cloned() else {
710                    continue;
711                };
712
713                if let Some(ui_ref) = ui.as_deref() {
714                    ui_ref.job_restarted(&name);
715                }
716
717                let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
718                    Ok(info) => info,
719                    Err(err) => {
720                        summaries.push(JobSummary {
721                            name: planned.instance.job.name.clone(),
722                            stage_name: planned.instance.stage_name.clone(),
723                            duration: 0.0,
724                            status: JobStatus::Failed(err.to_string()),
725                            log_path: Some(planned.log_path.clone()),
726                            log_hash: planned.log_hash.clone(),
727                            allow_failure: false,
728                            environment: exec.expanded_environment(&planned.instance.job),
729                        });
730                        return Err(err);
731                    }
732                };
733                let restart_exec = exec.clone();
734                let ui_clone = ui.clone();
735                let run_info_clone = run_info.clone();
736                let job_plan = plan.clone();
737                let event = task::spawn_blocking(move || {
738                    job_runner::run_planned_job(
739                        &restart_exec,
740                        job_plan,
741                        planned,
742                        run_info_clone,
743                        ui_clone,
744                    )
745                })
746                .await
747                .context("job restart task failed")?;
748                update_summaries_from_event(exec, plan.as_ref(), event, summaries);
749            }
750            UiCommand::StartManual { .. } => {}
751            UiCommand::CancelJob { .. } => {}
752            UiCommand::AbortPipeline => break,
753        }
754    }
755    Ok(())
756}
757
758fn update_summaries_from_event(
759    exec: &ExecutorCore,
760    plan: &ExecutionPlan,
761    event: JobEvent,
762    summaries: &mut Vec<JobSummary>,
763) {
764    let JobEvent {
765        name,
766        stage_name,
767        duration,
768        log_path,
769        log_hash,
770        result,
771        failure_kind: _,
772        exit_code: _,
773        cancelled,
774    } = event;
775
776    let allow_failure = plan
777        .nodes
778        .get(&name)
779        .map(|planned| planned.instance.rule.allow_failure)
780        .unwrap_or(false);
781    let environment = plan
782        .nodes
783        .get(&name)
784        .and_then(|planned| exec.expanded_environment(&planned.instance.job));
785
786    let status = match result {
787        Ok(_) => JobStatus::Success,
788        Err(err) => {
789            if cancelled {
790                JobStatus::Skipped("aborted by user".to_string())
791            } else {
792                JobStatus::Failed(err.to_string())
793            }
794        }
795    };
796    let outcome = match &status {
797        JobStatus::Success => ArtifactSourceOutcome::Success,
798        JobStatus::Failed(_) => ArtifactSourceOutcome::Failed,
799        JobStatus::Skipped(_) => ArtifactSourceOutcome::Skipped,
800    };
801    exec.record_completed_job(&name, outcome);
802
803    summaries.retain(|entry| entry.name != name);
804    summaries.push(JobSummary {
805        name,
806        stage_name,
807        duration,
808        status,
809        log_path,
810        log_hash,
811        allow_failure,
812        environment,
813    });
814}
815
816struct ReadyQueues<'a> {
817    ready: &'a mut VecDeque<String>,
818    waiting_on_failure: &'a mut VecDeque<String>,
819    delayed_pending: &'a mut HashSet<String>,
820}
821
822fn release_dependents<F>(
823    plan: &ExecutionPlan,
824    name: &str,
825    remaining: &mut HashMap<String, usize>,
826    abort_requested: bool,
827    pipeline_failed: bool,
828    queues: &mut ReadyQueues<'_>,
829    enqueue_ready: &F,
830) where
831    F: Fn(&str, bool, &mut VecDeque<String>, &mut VecDeque<String>, &mut HashSet<String>),
832{
833    if let Some(children) = plan.dependents.get(name) {
834        for child in children {
835            if let Some(count) = remaining.get_mut(child)
836                && *count > 0
837            {
838                *count -= 1;
839                if *count == 0 && !abort_requested {
840                    enqueue_ready(
841                        child,
842                        pipeline_failed,
843                        queues.ready,
844                        queues.waiting_on_failure,
845                        queues.delayed_pending,
846                    );
847                }
848            }
849        }
850    }
851}
852
853fn release_resource_lock(
854    planned: &ExecutableJob,
855    ready: &mut VecDeque<String>,
856    resource_groups: &ResourceGroupManager,
857    resource_waiting: &mut HashMap<String, VecDeque<String>>,
858) {
859    if let Some(group) = &planned.instance.resource_group {
860        let _ = resource_groups.release(group);
861        if let Some(queue) = resource_waiting.get_mut(group)
862            && let Some(next) = queue.pop_front()
863        {
864            ready.push_back(next);
865        }
866    }
867}
868
869#[cfg(test)]
870mod tests {
871    use super::{interruptible_running_jobs, release_resource_lock, retry_allowed};
872    use crate::compiler::JobInstance;
873    use crate::execution_plan::{ExecutableJob, ExecutionPlan};
874    use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
875    use crate::pipeline::{JobFailureKind, ResourceGroupManager, RuleEvaluation, RuleWhen};
876    use std::collections::{HashMap, HashSet, VecDeque};
877    use std::path::PathBuf;
878    use tempfile::tempdir;
879
880    #[test]
881    fn release_resource_lock_requeues_next_waiting_job() {
882        let planned = ExecutableJob {
883            instance: JobInstance {
884                job: job("build"),
885                stage_name: "build".into(),
886                dependencies: Vec::new(),
887                rule: RuleEvaluation {
888                    included: true,
889                    when: RuleWhen::OnSuccess,
890                    ..Default::default()
891                },
892                timeout: None,
893                retry: RetryPolicySpec::default(),
894                interruptible: false,
895                resource_group: Some("builder".into()),
896            },
897            log_path: PathBuf::from("/tmp/build.log"),
898            log_hash: "hash".into(),
899        };
900        let mut ready = VecDeque::new();
901        let temp = tempdir().expect("tempdir");
902        let manager = ResourceGroupManager::new(temp.path().join("locks"));
903        manager
904            .try_acquire("builder", "build")
905            .expect("lock acquires");
906        let mut resource_waiting = HashMap::from([(
907            "builder".to_string(),
908            VecDeque::from(["package".to_string()]),
909        )]);
910
911        release_resource_lock(&planned, &mut ready, &manager, &mut resource_waiting);
912
913        assert_eq!(ready, VecDeque::from(["package".to_string()]));
914        assert!(
915            manager
916                .try_acquire("builder", "package")
917                .expect("lock re-acquires")
918        );
919        assert!(resource_waiting["builder"].is_empty());
920    }
921
922    #[test]
923    fn retry_allowed_defaults_to_true_without_conditions() {
924        assert!(retry_allowed(
925            &[],
926            &[],
927            Some(JobFailureKind::ScriptFailure),
928            Some(1)
929        ));
930    }
931
932    #[test]
933    fn retry_allowed_matches_script_failure_condition() {
934        assert!(retry_allowed(
935            &["script_failure".into()],
936            &[],
937            Some(JobFailureKind::ScriptFailure),
938            Some(1)
939        ));
940        assert!(!retry_allowed(
941            &["runner_system_failure".into()],
942            &[],
943            Some(JobFailureKind::ScriptFailure),
944            Some(1)
945        ));
946    }
947
948    #[test]
949    fn retry_allowed_treats_job_timeout_as_stuck_or_timeout_failure() {
950        assert!(retry_allowed(
951            &["stuck_or_timeout_failure".into()],
952            &[],
953            Some(JobFailureKind::JobExecutionTimeout),
954            None
955        ));
956    }
957
958    #[test]
959    fn retry_allowed_matches_api_failure_condition() {
960        assert!(retry_allowed(
961            &["api_failure".into()],
962            &[],
963            Some(JobFailureKind::ApiFailure),
964            None
965        ));
966        assert!(!retry_allowed(
967            &["api_failure".into()],
968            &[],
969            Some(JobFailureKind::UnknownFailure),
970            None
971        ));
972    }
973
974    #[test]
975    fn retry_allowed_matches_unmet_prerequisites_condition() {
976        assert!(retry_allowed(
977            &["unmet_prerequisites".into()],
978            &[],
979            Some(JobFailureKind::UnmetPrerequisites),
980            None
981        ));
982    }
983
984    #[test]
985    fn retry_allowed_matches_exit_code_condition() {
986        assert!(retry_allowed(
987            &[],
988            &[137],
989            Some(JobFailureKind::ScriptFailure),
990            Some(137)
991        ));
992        assert!(!retry_allowed(
993            &[],
994            &[137],
995            Some(JobFailureKind::ScriptFailure),
996            Some(1)
997        ));
998    }
999
1000    #[test]
1001    fn retry_allowed_matches_when_or_exit_code() {
1002        assert!(retry_allowed(
1003            &["runner_system_failure".into()],
1004            &[137],
1005            Some(JobFailureKind::ScriptFailure),
1006            Some(137)
1007        ));
1008    }
1009
1010    #[test]
1011    fn interruptible_running_jobs_selects_only_interruptible_nodes() {
1012        let plan = ExecutionPlan {
1013            ordered: vec!["build".into(), "deploy".into()],
1014            nodes: HashMap::from([
1015                ("build".into(), executable_job("build", true, "build", 0)),
1016                (
1017                    "deploy".into(),
1018                    executable_job("deploy", false, "deploy", 1),
1019                ),
1020            ]),
1021            dependents: HashMap::new(),
1022            order_index: HashMap::from([("build".into(), 0), ("deploy".into(), 1)]),
1023            variants: HashMap::new(),
1024        };
1025        let running = HashSet::from(["build".to_string(), "deploy".to_string()]);
1026
1027        assert_eq!(interruptible_running_jobs(&plan, &running), vec!["build"]);
1028    }
1029
1030    #[test]
1031    fn interruptible_running_jobs_respects_plan_order() {
1032        let plan = ExecutionPlan {
1033            ordered: vec!["test".into(), "build".into()],
1034            nodes: HashMap::from([
1035                ("build".into(), executable_job("build", true, "build", 1)),
1036                ("test".into(), executable_job("test", true, "test", 0)),
1037            ]),
1038            dependents: HashMap::new(),
1039            order_index: HashMap::from([("test".into(), 0), ("build".into(), 1)]),
1040            variants: HashMap::new(),
1041        };
1042        let running = HashSet::from(["build".to_string(), "test".to_string()]);
1043
1044        assert_eq!(
1045            interruptible_running_jobs(&plan, &running),
1046            vec!["test", "build"]
1047        );
1048    }
1049
1050    fn executable_job(name: &str, interruptible: bool, stage: &str, order: usize) -> ExecutableJob {
1051        let mut job = job(name);
1052        job.interruptible = interruptible;
1053        ExecutableJob {
1054            instance: JobInstance {
1055                job,
1056                stage_name: stage.into(),
1057                dependencies: Vec::new(),
1058                rule: RuleEvaluation::default(),
1059                timeout: None,
1060                retry: RetryPolicySpec::default(),
1061                interruptible,
1062                resource_group: None,
1063            },
1064            log_path: PathBuf::from(format!("/tmp/{name}-{order}.log")),
1065            log_hash: format!("hash-{name}-{order}"),
1066        }
1067    }
1068
1069    fn job(name: &str) -> JobSpec {
1070        JobSpec {
1071            name: name.into(),
1072            stage: "build".into(),
1073            commands: vec!["true".into()],
1074            needs: Vec::new(),
1075            explicit_needs: false,
1076            dependencies: Vec::new(),
1077            before_script: None,
1078            after_script: None,
1079            inherit_default_before_script: true,
1080            inherit_default_after_script: true,
1081            inherit_default_image: true,
1082            inherit_default_cache: true,
1083            inherit_default_services: true,
1084            inherit_default_timeout: true,
1085            inherit_default_retry: true,
1086            inherit_default_interruptible: true,
1087            when: None,
1088            rules: Vec::new(),
1089            only: Vec::new(),
1090            except: Vec::new(),
1091            artifacts: ArtifactSpec::default(),
1092            cache: Vec::new(),
1093            image: None,
1094            variables: HashMap::new(),
1095            services: Vec::new(),
1096            timeout: None,
1097            retry: RetryPolicySpec::default(),
1098            interruptible: false,
1099            resource_group: None,
1100            parallel: None,
1101            tags: Vec::new(),
1102            environment: None,
1103        }
1104    }
1105}