Skip to main content

opal/executor/
orchestrator.rs

1use super::{core::ExecutorCore, job_runner};
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::model::ArtifactSourceOutcome;
4use crate::pipeline::{
5    self, HaltKind, JobEvent, JobFailureKind, JobStatus, JobSummary, ResourceGroupManager, RuleWhen,
6};
7use crate::runtime;
8use crate::ui::{UiBridge, UiCommand, UiJobStatus};
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use tokio::{
13    sync::{Semaphore, mpsc},
14    task, time as tokio_time,
15};
16
17fn interruptible_running_jobs(plan: &ExecutionPlan, running: &HashSet<String>) -> Vec<String> {
18    let mut names = running
19        .iter()
20        .filter(|name| {
21            plan.nodes
22                .get(*name)
23                .map(|planned| planned.instance.interruptible)
24                .unwrap_or(false)
25        })
26        .cloned()
27        .collect::<Vec<_>>();
28    names.sort_by_key(|name| plan.order_index.get(name).copied().unwrap_or(usize::MAX));
29    names
30}
31
32pub(crate) async fn execute_plan(
33    exec: &ExecutorCore,
34    plan: Arc<ExecutionPlan>,
35    ui: Option<Arc<UiBridge>>,
36    mut commands: Option<&mut mpsc::UnboundedReceiver<UiCommand>>,
37) -> (Vec<JobSummary>, Result<()>) {
38    //TODO: too complicated function, does semaphores, expects signals over channels, fucking no
39    //refactor this trash and structure properly
40    let total = plan.ordered.len();
41    if total == 0 {
42        return (Vec::new(), Ok(()));
43    }
44
45    let mut remaining: HashMap<String, usize> = plan
46        .nodes
47        .iter()
48        .map(|(name, job)| (name.clone(), job.instance.dependencies.len()))
49        .collect();
50    let mut ready: VecDeque<String> = VecDeque::new();
51    let mut waiting_on_failure: VecDeque<String> = VecDeque::new();
52    let mut delayed_pending: HashSet<String> = HashSet::new();
53    let mut resource_retry_pending: HashSet<String> = HashSet::new();
54    let mut manual_waiting: HashSet<String> = HashSet::new();
55    let mut running = HashSet::new();
56    let mut abort_requested = false;
57    let mut completed = 0usize;
58    let mut pipeline_failed = false;
59    let mut halt_kind = HaltKind::None;
60    let mut halt_error: Option<anyhow::Error> = None;
61    let mut summaries: Vec<JobSummary> = Vec::new();
62    let mut attempts: HashMap<String, u32> = HashMap::new();
63    let mut resource_waiting: HashMap<String, VecDeque<String>> = HashMap::new();
64    let resource_groups = ResourceGroupManager::new(runtime::resource_group_root());
65    let mut manual_input_available = commands.is_some();
66
67    let semaphore = Arc::new(Semaphore::new(exec.config.max_parallel_jobs.max(1)));
68    let exec = Arc::new(exec.clone());
69    let (tx, mut rx) = mpsc::unbounded_channel::<JobEvent>();
70    let (delay_tx, mut delay_rx) = mpsc::unbounded_channel::<String>();
71
72    let enqueue_ready = |job_name: &str,
73                         pipeline_failed_flag: bool,
74                         ready_queue: &mut VecDeque<String>,
75                         wait_failure_queue: &mut VecDeque<String>,
76                         delayed_set: &mut HashSet<String>| {
77        let Some(planned) = plan.nodes.get(job_name) else {
78            return;
79        };
80        match planned.instance.rule.when {
81            RuleWhen::OnFailure => {
82                if pipeline_failed_flag {
83                    ready_queue.push_back(job_name.to_string());
84                } else {
85                    wait_failure_queue.push_back(job_name.to_string());
86                }
87            }
88            RuleWhen::Delayed => {
89                if pipeline_failed_flag {
90                    return;
91                }
92                if let Some(delay) = planned.instance.rule.start_in {
93                    if delayed_set.insert(job_name.to_string()) {
94                        let tx_clone = delay_tx.clone();
95                        let name = job_name.to_string();
96                        task::spawn(async move {
97                            tokio_time::sleep(delay).await;
98                            let _ = tx_clone.send(name);
99                        });
100                    }
101                } else {
102                    ready_queue.push_back(job_name.to_string());
103                }
104            }
105            RuleWhen::Manual | RuleWhen::OnSuccess => {
106                if pipeline_failed_flag && planned.instance.rule.when.requires_success() {
107                    return;
108                }
109                ready_queue.push_back(job_name.to_string());
110            }
111            RuleWhen::Always => {
112                ready_queue.push_back(job_name.to_string());
113            }
114            RuleWhen::Never => {}
115        }
116    };
117
118    for name in &plan.ordered {
119        if remaining.get(name).copied().unwrap_or(0) == 0 && !abort_requested {
120            enqueue_ready(
121                name,
122                pipeline_failed,
123                &mut ready,
124                &mut waiting_on_failure,
125                &mut delayed_pending,
126            );
127        }
128    }
129
130    while completed < total {
131        while let Some(name) = ready.pop_front() {
132            if abort_requested {
133                break;
134            }
135            let planned = match plan.nodes.get(&name).cloned() {
136                Some(job) => job,
137                None => continue,
138            };
139            if pipeline_failed && planned.instance.rule.when.requires_success() {
140                continue;
141            }
142
143            if matches!(planned.instance.rule.when, RuleWhen::Manual)
144                && !planned.instance.rule.manual_auto_run
145            {
146                if manual_input_available {
147                    if manual_waiting.insert(name.clone())
148                        && let Some(ui_ref) = ui.as_deref()
149                    {
150                        ui_ref.job_manual_pending(&name);
151                    }
152                } else {
153                    let reason = planned
154                        .instance
155                        .rule
156                        .manual_reason
157                        .clone()
158                        .unwrap_or_else(|| "manual job not run".to_string());
159                    if let Some(ui_ref) = ui.as_deref() {
160                        ui_ref.job_finished(
161                            &planned.instance.job.name,
162                            UiJobStatus::Skipped,
163                            0.0,
164                            Some(reason.clone()),
165                        );
166                    }
167                    summaries.push(JobSummary {
168                        name: planned.instance.job.name.clone(),
169                        stage_name: planned.instance.stage_name.clone(),
170                        duration: 0.0,
171                        status: JobStatus::Skipped(reason.clone()),
172                        log_path: None,
173                        log_hash: planned.log_hash.clone(),
174                        allow_failure: planned.instance.rule.allow_failure,
175                        environment: exec.expanded_environment(&planned.instance.job),
176                    });
177                    completed += 1;
178                    release_dependents(
179                        &plan,
180                        &name,
181                        &mut remaining,
182                        abort_requested,
183                        pipeline_failed,
184                        &mut ReadyQueues {
185                            ready: &mut ready,
186                            waiting_on_failure: &mut waiting_on_failure,
187                            delayed_pending: &mut delayed_pending,
188                        },
189                        &enqueue_ready,
190                    );
191                }
192                continue;
193            }
194
195            if let Some(group) = &planned.instance.resource_group {
196                let acquired =
197                    match resource_groups.try_acquire(group, &planned.instance.job.name) {
198                        Ok(acquired) => acquired,
199                        Err(err) => {
200                            halt_kind = HaltKind::JobFailure;
201                            pipeline_failed = true;
202                            if halt_error.is_none() {
203                                halt_error = Some(err.context(format!(
204                                    "failed to acquire resource group '{}'",
205                                    group
206                                )));
207                            }
208                            break;
209                        }
210                    };
211                if !acquired {
212                    if running.is_empty() && ready.is_empty() {
213                        loop {
214                            tokio_time::sleep(std::time::Duration::from_millis(500)).await;
215                            match resource_groups.try_acquire(group, &planned.instance.job.name) {
216                                Ok(true) => break,
217                                Ok(false) => continue,
218                                Err(err) => {
219                                    halt_kind = HaltKind::JobFailure;
220                                    pipeline_failed = true;
221                                    if halt_error.is_none() {
222                                        halt_error = Some(err.context(format!(
223                                            "failed to acquire resource group '{}'",
224                                            group
225                                        )));
226                                    }
227                                    break;
228                                }
229                            }
230                        }
231                        if pipeline_failed {
232                            break;
233                        }
234                    } else {
235                        if resource_retry_pending.insert(name.clone()) {
236                            let tx_clone = delay_tx.clone();
237                            let retry_name = name.clone();
238                            task::spawn(async move {
239                                tokio_time::sleep(std::time::Duration::from_millis(500)).await;
240                                let _ = tx_clone.send(retry_name);
241                            });
242                        }
243                        continue;
244                    }
245                }
246            }
247
248            let entry = attempts.entry(name.clone()).or_insert(0);
249            *entry += 1;
250
251            let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
252                Ok(info) => info,
253                Err(err) => {
254                    summaries.push(JobSummary {
255                        name: planned.instance.job.name.clone(),
256                        stage_name: planned.instance.stage_name.clone(),
257                        duration: 0.0,
258                        status: JobStatus::Failed(err.to_string()),
259                        log_path: Some(planned.log_path.clone()),
260                        log_hash: planned.log_hash.clone(),
261                        allow_failure: false,
262                        environment: exec.expanded_environment(&planned.instance.job),
263                    });
264                    return (summaries, Err(err));
265                }
266            };
267            running.insert(name.clone());
268            pipeline::spawn_job(
269                exec.clone(),
270                plan.clone(),
271                planned,
272                run_info,
273                semaphore.clone(),
274                tx.clone(),
275                ui.clone(),
276            );
277        }
278
279        if completed >= total {
280            break;
281        }
282
283        if running.is_empty()
284            && ready.is_empty()
285            && delayed_pending.is_empty()
286            && pipeline_failed
287            && waiting_on_failure.is_empty()
288            && manual_waiting.is_empty()
289        {
290            break;
291        }
292
293        if running.is_empty()
294            && ready.is_empty()
295            && delayed_pending.is_empty()
296            && !pipeline_failed
297            && waiting_on_failure.is_empty()
298            && manual_waiting.is_empty()
299        {
300            let remaining_jobs: Vec<_> = remaining
301                .iter()
302                .filter_map(|(name, &count)| if count > 0 { Some(name.clone()) } else { None })
303                .collect();
304            if !remaining_jobs.is_empty() {
305                halt_kind = HaltKind::Deadlock;
306                halt_error = Some(anyhow!(
307                    "no runnable jobs, potential dependency cycle involving: {:?}",
308                    remaining_jobs
309                ));
310            }
311            break;
312        }
313
314        if running.is_empty()
315            && ready.is_empty()
316            && delayed_pending.is_empty()
317            && !pipeline_failed
318            && !waiting_on_failure.is_empty()
319            && manual_waiting.is_empty()
320        {
321            break;
322        }
323
324        enum SchedulerEvent {
325            Job(JobEvent),
326            Delay(String),
327            Command(UiCommand),
328        }
329
330        let next_event = tokio::select! {
331            Some(event) = rx.recv() => Some(SchedulerEvent::Job(event)),
332            Some(name) = delay_rx.recv() => Some(SchedulerEvent::Delay(name)),
333            cmd = async {
334                if let Some(rx) = commands.as_mut() {
335                    (*rx).recv().await
336                } else {
337                    std::future::pending().await
338                }
339            } => {
340                match cmd {
341                    Some(command) => Some(SchedulerEvent::Command(command)),
342                    None => {
343                        manual_input_available = false;
344                        commands = None;
345                        None
346                    }
347                }
348            }
349            else => None,
350        };
351
352        if !manual_input_available && !manual_waiting.is_empty() {
353            let pending: Vec<String> = manual_waiting.drain().collect();
354            for name in pending {
355                if let Some(planned) = plan.nodes.get(&name) {
356                    let reason = planned
357                        .instance
358                        .rule
359                        .manual_reason
360                        .clone()
361                        .unwrap_or_else(|| "manual job not run".to_string());
362                    if let Some(ui_ref) = ui.as_deref() {
363                        ui_ref.job_finished(
364                            &planned.instance.job.name,
365                            UiJobStatus::Skipped,
366                            0.0,
367                            Some(reason.clone()),
368                        );
369                    }
370                    summaries.push(JobSummary {
371                        name: planned.instance.job.name.clone(),
372                        stage_name: planned.instance.stage_name.clone(),
373                        duration: 0.0,
374                        status: JobStatus::Skipped(reason),
375                        log_path: None,
376                        log_hash: planned.log_hash.clone(),
377                        allow_failure: planned.instance.rule.allow_failure,
378                        environment: exec.expanded_environment(&planned.instance.job),
379                    });
380                    completed += 1;
381                    release_dependents(
382                        &plan,
383                        &name,
384                        &mut remaining,
385                        abort_requested,
386                        pipeline_failed,
387                        &mut ReadyQueues {
388                            ready: &mut ready,
389                            waiting_on_failure: &mut waiting_on_failure,
390                            delayed_pending: &mut delayed_pending,
391                        },
392                        &enqueue_ready,
393                    );
394                }
395            }
396        }
397
398        let Some(event) = next_event else {
399            if running.is_empty()
400                && ready.is_empty()
401                && delayed_pending.is_empty()
402                && resource_retry_pending.is_empty()
403            {
404                halt_kind = HaltKind::ChannelClosed;
405                halt_error = Some(anyhow!(
406                    "job worker channel closed unexpectedly while {} jobs remained",
407                    total - completed
408                ));
409                break;
410            }
411            continue;
412        };
413
414        match event {
415            SchedulerEvent::Delay(name) => {
416                if abort_requested {
417                    continue;
418                }
419                if resource_retry_pending.remove(&name) {
420                    ready.push_back(name);
421                    continue;
422                }
423                delayed_pending.remove(&name);
424                if pipeline_failed
425                    && let Some(planned) = plan.nodes.get(&name)
426                    && planned.instance.rule.when.requires_success()
427                {
428                    continue;
429                }
430                ready.push_back(name);
431            }
432            SchedulerEvent::Command(cmd) => match cmd {
433                UiCommand::StartManual { name } => {
434                    if manual_waiting.remove(&name) {
435                        ready.push_back(name);
436                    }
437                }
438                UiCommand::CancelJob { name } => {
439                    exec.cancel_running_job(&name);
440                }
441                UiCommand::AnalyzeJob { name, source_name } => {
442                    spawn_analysis(exec.clone(), plan.clone(), ui.clone(), name, source_name);
443                }
444                UiCommand::PreviewAiPrompt { name, source_name } => {
445                    spawn_prompt_preview(exec.clone(), plan.clone(), ui.clone(), name, source_name);
446                }
447                UiCommand::AbortPipeline => {
448                    abort_requested = true;
449                    pipeline_failed = true;
450                    halt_kind = HaltKind::Aborted;
451                    if halt_error.is_none() {
452                        halt_error = Some(anyhow!("pipeline aborted by user"));
453                    }
454                    for name in interruptible_running_jobs(plan.as_ref(), &running) {
455                        exec.cancel_running_job(&name);
456                    }
457                    ready.clear();
458                    waiting_on_failure.clear();
459                    delayed_pending.clear();
460                    manual_waiting.clear();
461                }
462                UiCommand::RestartJob { .. } => {}
463            },
464            SchedulerEvent::Job(event) => {
465                running.remove(&event.name);
466                let Some(planned) = plan.nodes.get(&event.name) else {
467                    let message = format!(
468                        "completed job '{}' was not found in execution plan",
469                        event.name
470                    );
471                    if !pipeline_failed {
472                        pipeline_failed = true;
473                        halt_kind = HaltKind::JobFailure;
474                        if halt_error.is_none() {
475                            halt_error = Some(anyhow!(message.clone()));
476                        }
477                    }
478                    summaries.push(JobSummary {
479                        name: event.name.clone(),
480                        stage_name: event.stage_name.clone(),
481                        duration: event.duration,
482                        status: JobStatus::Failed(message),
483                        log_path: event.log_path.clone(),
484                        log_hash: event.log_hash.clone(),
485                        allow_failure: false,
486                        environment: None,
487                    });
488                    completed += 1;
489                    continue;
490                };
491                match event.result {
492                    Ok(_) => {
493                        exec.record_completed_job(&event.name, ArtifactSourceOutcome::Success);
494                        release_resource_lock(
495                            planned,
496                            &mut ready,
497                            &resource_groups,
498                            &mut resource_waiting,
499                        );
500                        release_dependents(
501                            &plan,
502                            &event.name,
503                            &mut remaining,
504                            abort_requested,
505                            pipeline_failed,
506                            &mut ReadyQueues {
507                                ready: &mut ready,
508                                waiting_on_failure: &mut waiting_on_failure,
509                                delayed_pending: &mut delayed_pending,
510                            },
511                            &enqueue_ready,
512                        );
513                        summaries.push(JobSummary {
514                            name: event.name.clone(),
515                            stage_name: event.stage_name.clone(),
516                            duration: event.duration,
517                            status: JobStatus::Success,
518                            log_path: event.log_path.clone(),
519                            log_hash: event.log_hash.clone(),
520                            allow_failure: planned.instance.rule.allow_failure,
521                            environment: exec.expanded_environment(&planned.instance.job),
522                        });
523                        completed += 1;
524                    }
525                    Err(err) => {
526                        if event.cancelled {
527                            exec.record_completed_job(&event.name, ArtifactSourceOutcome::Skipped);
528                            release_resource_lock(
529                                planned,
530                                &mut ready,
531                                &resource_groups,
532                                &mut resource_waiting,
533                            );
534                            summaries.push(JobSummary {
535                                name: event.name.clone(),
536                                stage_name: event.stage_name.clone(),
537                                duration: event.duration,
538                                status: JobStatus::Skipped("aborted by user".to_string()),
539                                log_path: event.log_path.clone(),
540                                log_hash: event.log_hash.clone(),
541                                allow_failure: true,
542                                environment: exec.expanded_environment(&planned.instance.job),
543                            });
544                            completed += 1;
545                            continue;
546                        }
547                        let err_msg = err.to_string();
548                        let attempts_so_far = attempts.get(&event.name).copied().unwrap_or(1);
549                        let retries_used = attempts_so_far.saturating_sub(1);
550                        if retries_used < planned.instance.retry.max
551                            && retry_allowed(
552                                &planned.instance.retry.when,
553                                &planned.instance.retry.exit_codes,
554                                event.failure_kind,
555                                event.exit_code,
556                            )
557                        {
558                            release_resource_lock(
559                                planned,
560                                &mut ready,
561                                &resource_groups,
562                                &mut resource_waiting,
563                            );
564                            ready.push_back(event.name.clone());
565                            continue;
566                        }
567                        exec.record_completed_job(&event.name, ArtifactSourceOutcome::Failed);
568                        release_resource_lock(
569                            planned,
570                            &mut ready,
571                            &resource_groups,
572                            &mut resource_waiting,
573                        );
574                        if !planned.instance.rule.allow_failure && !pipeline_failed {
575                            pipeline_failed = true;
576                            halt_kind = HaltKind::JobFailure;
577                            if halt_error.is_none() {
578                                halt_error =
579                                    Some(anyhow!("job '{}' failed: {}", event.name, err_msg));
580                            }
581                            while let Some(name) = waiting_on_failure.pop_front() {
582                                ready.push_back(name);
583                            }
584                        }
585                        release_dependents(
586                            &plan,
587                            &event.name,
588                            &mut remaining,
589                            abort_requested,
590                            pipeline_failed,
591                            &mut ReadyQueues {
592                                ready: &mut ready,
593                                waiting_on_failure: &mut waiting_on_failure,
594                                delayed_pending: &mut delayed_pending,
595                            },
596                            &enqueue_ready,
597                        );
598                        summaries.push(JobSummary {
599                            name: event.name.clone(),
600                            stage_name: event.stage_name.clone(),
601                            duration: event.duration,
602                            status: JobStatus::Failed(err_msg),
603                            log_path: event.log_path.clone(),
604                            log_hash: event.log_hash.clone(),
605                            allow_failure: planned.instance.rule.allow_failure,
606                            environment: exec.expanded_environment(&planned.instance.job),
607                        });
608                        completed += 1;
609                    }
610                }
611            }
612        }
613    }
614
615    let skip_reason = match halt_kind {
616        HaltKind::JobFailure => Some("not run (pipeline stopped after failure)".to_string()),
617        HaltKind::Deadlock => Some("not run (dependency cycle detected)".to_string()),
618        HaltKind::ChannelClosed => {
619            Some("not run (executor channel closed unexpectedly)".to_string())
620        }
621        HaltKind::Aborted => Some("not run (pipeline aborted by user)".to_string()),
622        HaltKind::None => None,
623    };
624
625    let mut recorded: HashSet<String> = summaries.iter().map(|entry| entry.name.clone()).collect();
626    for job_name in &plan.ordered {
627        if recorded.contains(job_name) {
628            continue;
629        }
630        let Some(planned) = plan.nodes.get(job_name) else {
631            continue;
632        };
633        let reason = if let Some(reason) = skip_reason.clone() {
634            Some(reason)
635        } else if planned.instance.rule.when == RuleWhen::OnFailure {
636            Some("skipped (rules: on_failure and pipeline succeeded)".to_string())
637        } else {
638            None
639        };
640
641        if let Some(reason) = reason {
642            if let Some(ui_ref) = ui.as_deref() {
643                ui_ref.job_finished(job_name, UiJobStatus::Skipped, 0.0, Some(reason.clone()));
644            }
645            summaries.push(JobSummary {
646                name: job_name.clone(),
647                stage_name: planned.instance.stage_name.clone(),
648                duration: 0.0,
649                status: JobStatus::Skipped(reason.clone()),
650                log_path: Some(planned.log_path.clone()),
651                log_hash: planned.log_hash.clone(),
652                allow_failure: planned.instance.rule.allow_failure,
653                environment: exec.expanded_environment(&planned.instance.job),
654            });
655            recorded.insert(job_name.clone());
656        }
657    }
658
659    let result = halt_error.map_or(Ok(()), Err);
660    (summaries, result)
661}
662
663fn retry_allowed(
664    conditions: &[String],
665    exit_codes: &[i32],
666    failure_kind: Option<JobFailureKind>,
667    exit_code: Option<i32>,
668) -> bool {
669    if conditions.is_empty() && exit_codes.is_empty() {
670        return true;
671    }
672    let when_matches = failure_kind.is_some_and(|kind| {
673        conditions
674            .iter()
675            .any(|condition| retry_condition_matches(condition, kind))
676    });
677    let exit_code_matches = exit_code.is_some_and(|code| exit_codes.contains(&code));
678    when_matches || exit_code_matches
679}
680
681fn retry_condition_matches(condition: &str, failure_kind: JobFailureKind) -> bool {
682    match condition {
683        "always" => true,
684        "unknown_failure" => failure_kind == JobFailureKind::UnknownFailure,
685        "script_failure" => failure_kind == JobFailureKind::ScriptFailure,
686        "api_failure" => failure_kind == JobFailureKind::ApiFailure,
687        "job_execution_timeout" => failure_kind == JobFailureKind::JobExecutionTimeout,
688        "runner_system_failure" => failure_kind == JobFailureKind::RunnerSystemFailure,
689        "runner_unsupported" => failure_kind == JobFailureKind::RunnerUnsupported,
690        "stale_schedule" => failure_kind == JobFailureKind::StaleSchedule,
691        "archived_failure" => failure_kind == JobFailureKind::ArchivedFailure,
692        "unmet_prerequisites" => failure_kind == JobFailureKind::UnmetPrerequisites,
693        "scheduler_failure" => failure_kind == JobFailureKind::SchedulerFailure,
694        "data_integrity_failure" => failure_kind == JobFailureKind::DataIntegrityFailure,
695        "stuck_or_timeout_failure" => {
696            matches!(
697                failure_kind,
698                JobFailureKind::StuckOrTimeoutFailure | JobFailureKind::JobExecutionTimeout
699            )
700        }
701        _ => false,
702    }
703}
704
705pub(crate) async fn handle_restart_commands(
706    exec: &ExecutorCore,
707    plan: Arc<ExecutionPlan>,
708    ui: Option<Arc<UiBridge>>,
709    commands: &mut mpsc::UnboundedReceiver<UiCommand>,
710    summaries: &mut Vec<JobSummary>,
711) -> Result<()> {
712    while let Some(command) = commands.recv().await {
713        match command {
714            UiCommand::RestartJob { name } => {
715                let Some(planned) = plan.nodes.get(&name).cloned() else {
716                    continue;
717                };
718
719                if let Some(ui_ref) = ui.as_deref() {
720                    ui_ref.job_restarted(&name);
721                }
722
723                let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
724                    Ok(info) => info,
725                    Err(err) => {
726                        summaries.push(JobSummary {
727                            name: planned.instance.job.name.clone(),
728                            stage_name: planned.instance.stage_name.clone(),
729                            duration: 0.0,
730                            status: JobStatus::Failed(err.to_string()),
731                            log_path: Some(planned.log_path.clone()),
732                            log_hash: planned.log_hash.clone(),
733                            allow_failure: false,
734                            environment: exec.expanded_environment(&planned.instance.job),
735                        });
736                        return Err(err);
737                    }
738                };
739                let restart_exec = exec.clone();
740                let ui_clone = ui.clone();
741                let run_info_clone = run_info.clone();
742                let job_plan = plan.clone();
743                let event = task::spawn_blocking(move || {
744                    job_runner::run_planned_job(
745                        &restart_exec,
746                        job_plan,
747                        planned,
748                        run_info_clone,
749                        ui_clone,
750                    )
751                })
752                .await
753                .context("job restart task failed")?;
754                update_summaries_from_event(exec, plan.as_ref(), event, summaries);
755            }
756            UiCommand::AnalyzeJob { name, source_name } => {
757                spawn_analysis(
758                    Arc::new(exec.clone()),
759                    plan.clone(),
760                    ui.clone(),
761                    name,
762                    source_name,
763                );
764            }
765            UiCommand::PreviewAiPrompt { name, source_name } => {
766                spawn_prompt_preview(
767                    Arc::new(exec.clone()),
768                    plan.clone(),
769                    ui.clone(),
770                    name,
771                    source_name,
772                );
773            }
774            UiCommand::StartManual { .. } => {}
775            UiCommand::CancelJob { .. } => {}
776            UiCommand::AbortPipeline => break,
777        }
778    }
779    Ok(())
780}
781
782fn spawn_analysis(
783    exec: Arc<ExecutorCore>,
784    plan: Arc<ExecutionPlan>,
785    ui: Option<Arc<UiBridge>>,
786    name: String,
787    source_name: String,
788) {
789    tokio::task::spawn_blocking(move || {
790        exec.analyze_job_with_default_provider(&plan, &name, &source_name, ui.as_deref());
791    });
792}
793
794fn spawn_prompt_preview(
795    exec: Arc<ExecutorCore>,
796    plan: Arc<ExecutionPlan>,
797    ui: Option<Arc<UiBridge>>,
798    name: String,
799    source_name: String,
800) {
801    tokio::task::spawn_blocking(move || {
802        if let Some(ui) = ui.as_deref()
803            && let Ok(prompt) = exec.render_ai_prompt(&plan, &name, &source_name)
804        {
805            ui.ai_prompt_ready(&name, prompt);
806        }
807    });
808}
809
810fn update_summaries_from_event(
811    exec: &ExecutorCore,
812    plan: &ExecutionPlan,
813    event: JobEvent,
814    summaries: &mut Vec<JobSummary>,
815) {
816    let JobEvent {
817        name,
818        stage_name,
819        duration,
820        log_path,
821        log_hash,
822        result,
823        failure_kind: _,
824        exit_code: _,
825        cancelled,
826    } = event;
827
828    let allow_failure = plan
829        .nodes
830        .get(&name)
831        .map(|planned| planned.instance.rule.allow_failure)
832        .unwrap_or(false);
833    let environment = plan
834        .nodes
835        .get(&name)
836        .and_then(|planned| exec.expanded_environment(&planned.instance.job));
837
838    let status = match result {
839        Ok(_) => JobStatus::Success,
840        Err(err) => {
841            if cancelled {
842                JobStatus::Skipped("aborted by user".to_string())
843            } else {
844                JobStatus::Failed(err.to_string())
845            }
846        }
847    };
848    let outcome = match &status {
849        JobStatus::Success => ArtifactSourceOutcome::Success,
850        JobStatus::Failed(_) => ArtifactSourceOutcome::Failed,
851        JobStatus::Skipped(_) => ArtifactSourceOutcome::Skipped,
852    };
853    exec.record_completed_job(&name, outcome);
854
855    summaries.retain(|entry| entry.name != name);
856    summaries.push(JobSummary {
857        name,
858        stage_name,
859        duration,
860        status,
861        log_path,
862        log_hash,
863        allow_failure,
864        environment,
865    });
866}
867
868struct ReadyQueues<'a> {
869    ready: &'a mut VecDeque<String>,
870    waiting_on_failure: &'a mut VecDeque<String>,
871    delayed_pending: &'a mut HashSet<String>,
872}
873
874fn release_dependents<F>(
875    plan: &ExecutionPlan,
876    name: &str,
877    remaining: &mut HashMap<String, usize>,
878    abort_requested: bool,
879    pipeline_failed: bool,
880    queues: &mut ReadyQueues<'_>,
881    enqueue_ready: &F,
882) where
883    F: Fn(&str, bool, &mut VecDeque<String>, &mut VecDeque<String>, &mut HashSet<String>),
884{
885    if let Some(children) = plan.dependents.get(name) {
886        for child in children {
887            if let Some(count) = remaining.get_mut(child)
888                && *count > 0
889            {
890                *count -= 1;
891                if *count == 0 && !abort_requested {
892                    enqueue_ready(
893                        child,
894                        pipeline_failed,
895                        queues.ready,
896                        queues.waiting_on_failure,
897                        queues.delayed_pending,
898                    );
899                }
900            }
901        }
902    }
903}
904
905fn release_resource_lock(
906    planned: &ExecutableJob,
907    ready: &mut VecDeque<String>,
908    resource_groups: &ResourceGroupManager,
909    resource_waiting: &mut HashMap<String, VecDeque<String>>,
910) {
911    if let Some(group) = &planned.instance.resource_group {
912        let _ = resource_groups.release(group);
913        if let Some(queue) = resource_waiting.get_mut(group)
914            && let Some(next) = queue.pop_front()
915        {
916            ready.push_back(next);
917        }
918    }
919}
920
921#[cfg(test)]
922mod tests {
923    use super::{interruptible_running_jobs, release_resource_lock, retry_allowed};
924    use crate::compiler::JobInstance;
925    use crate::execution_plan::{ExecutableJob, ExecutionPlan};
926    use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
927    use crate::pipeline::{JobFailureKind, ResourceGroupManager, RuleEvaluation, RuleWhen};
928    use std::collections::{HashMap, HashSet, VecDeque};
929    use std::path::PathBuf;
930    use tempfile::tempdir;
931
932    #[test]
933    fn release_resource_lock_requeues_next_waiting_job() {
934        let planned = ExecutableJob {
935            instance: JobInstance {
936                job: job("build"),
937                stage_name: "build".into(),
938                dependencies: Vec::new(),
939                rule: RuleEvaluation {
940                    included: true,
941                    when: RuleWhen::OnSuccess,
942                    ..Default::default()
943                },
944                timeout: None,
945                retry: RetryPolicySpec::default(),
946                interruptible: false,
947                resource_group: Some("builder".into()),
948            },
949            log_path: PathBuf::from("/tmp/build.log"),
950            log_hash: "hash".into(),
951        };
952        let mut ready = VecDeque::new();
953        let temp = tempdir().expect("tempdir");
954        let manager = ResourceGroupManager::new(temp.path().join("locks"));
955        manager
956            .try_acquire("builder", "build")
957            .expect("lock acquires");
958        let mut resource_waiting = HashMap::from([(
959            "builder".to_string(),
960            VecDeque::from(["package".to_string()]),
961        )]);
962
963        release_resource_lock(&planned, &mut ready, &manager, &mut resource_waiting);
964
965        assert_eq!(ready, VecDeque::from(["package".to_string()]));
966        assert!(
967            manager
968                .try_acquire("builder", "package")
969                .expect("lock re-acquires")
970        );
971        assert!(resource_waiting["builder"].is_empty());
972    }
973
974    #[test]
975    fn retry_allowed_defaults_to_true_without_conditions() {
976        assert!(retry_allowed(
977            &[],
978            &[],
979            Some(JobFailureKind::ScriptFailure),
980            Some(1)
981        ));
982    }
983
984    #[test]
985    fn retry_allowed_matches_script_failure_condition() {
986        assert!(retry_allowed(
987            &["script_failure".into()],
988            &[],
989            Some(JobFailureKind::ScriptFailure),
990            Some(1)
991        ));
992        assert!(!retry_allowed(
993            &["runner_system_failure".into()],
994            &[],
995            Some(JobFailureKind::ScriptFailure),
996            Some(1)
997        ));
998    }
999
1000    #[test]
1001    fn retry_allowed_treats_job_timeout_as_stuck_or_timeout_failure() {
1002        assert!(retry_allowed(
1003            &["stuck_or_timeout_failure".into()],
1004            &[],
1005            Some(JobFailureKind::JobExecutionTimeout),
1006            None
1007        ));
1008    }
1009
1010    #[test]
1011    fn retry_allowed_matches_api_failure_condition() {
1012        assert!(retry_allowed(
1013            &["api_failure".into()],
1014            &[],
1015            Some(JobFailureKind::ApiFailure),
1016            None
1017        ));
1018        assert!(!retry_allowed(
1019            &["api_failure".into()],
1020            &[],
1021            Some(JobFailureKind::UnknownFailure),
1022            None
1023        ));
1024    }
1025
1026    #[test]
1027    fn retry_allowed_matches_unmet_prerequisites_condition() {
1028        assert!(retry_allowed(
1029            &["unmet_prerequisites".into()],
1030            &[],
1031            Some(JobFailureKind::UnmetPrerequisites),
1032            None
1033        ));
1034    }
1035
1036    #[test]
1037    fn retry_allowed_matches_exit_code_condition() {
1038        assert!(retry_allowed(
1039            &[],
1040            &[137],
1041            Some(JobFailureKind::ScriptFailure),
1042            Some(137)
1043        ));
1044        assert!(!retry_allowed(
1045            &[],
1046            &[137],
1047            Some(JobFailureKind::ScriptFailure),
1048            Some(1)
1049        ));
1050    }
1051
1052    #[test]
1053    fn retry_allowed_matches_when_or_exit_code() {
1054        assert!(retry_allowed(
1055            &["runner_system_failure".into()],
1056            &[137],
1057            Some(JobFailureKind::ScriptFailure),
1058            Some(137)
1059        ));
1060    }
1061
1062    #[test]
1063    fn interruptible_running_jobs_selects_only_interruptible_nodes() {
1064        let plan = ExecutionPlan {
1065            ordered: vec!["build".into(), "deploy".into()],
1066            nodes: HashMap::from([
1067                ("build".into(), executable_job("build", true, "build", 0)),
1068                (
1069                    "deploy".into(),
1070                    executable_job("deploy", false, "deploy", 1),
1071                ),
1072            ]),
1073            dependents: HashMap::new(),
1074            order_index: HashMap::from([("build".into(), 0), ("deploy".into(), 1)]),
1075            variants: HashMap::new(),
1076        };
1077        let running = HashSet::from(["build".to_string(), "deploy".to_string()]);
1078
1079        assert_eq!(interruptible_running_jobs(&plan, &running), vec!["build"]);
1080    }
1081
1082    #[test]
1083    fn interruptible_running_jobs_respects_plan_order() {
1084        let plan = ExecutionPlan {
1085            ordered: vec!["test".into(), "build".into()],
1086            nodes: HashMap::from([
1087                ("build".into(), executable_job("build", true, "build", 1)),
1088                ("test".into(), executable_job("test", true, "test", 0)),
1089            ]),
1090            dependents: HashMap::new(),
1091            order_index: HashMap::from([("test".into(), 0), ("build".into(), 1)]),
1092            variants: HashMap::new(),
1093        };
1094        let running = HashSet::from(["build".to_string(), "test".to_string()]);
1095
1096        assert_eq!(
1097            interruptible_running_jobs(&plan, &running),
1098            vec!["test", "build"]
1099        );
1100    }
1101
1102    fn executable_job(name: &str, interruptible: bool, stage: &str, order: usize) -> ExecutableJob {
1103        let mut job = job(name);
1104        job.interruptible = interruptible;
1105        ExecutableJob {
1106            instance: JobInstance {
1107                job,
1108                stage_name: stage.into(),
1109                dependencies: Vec::new(),
1110                rule: RuleEvaluation::default(),
1111                timeout: None,
1112                retry: RetryPolicySpec::default(),
1113                interruptible,
1114                resource_group: None,
1115            },
1116            log_path: PathBuf::from(format!("/tmp/{name}-{order}.log")),
1117            log_hash: format!("hash-{name}-{order}"),
1118        }
1119    }
1120
1121    fn job(name: &str) -> JobSpec {
1122        JobSpec {
1123            name: name.into(),
1124            stage: "build".into(),
1125            commands: vec!["true".into()],
1126            needs: Vec::new(),
1127            explicit_needs: false,
1128            dependencies: Vec::new(),
1129            before_script: None,
1130            after_script: None,
1131            inherit_default_before_script: true,
1132            inherit_default_after_script: true,
1133            inherit_default_image: true,
1134            inherit_default_cache: true,
1135            inherit_default_services: true,
1136            inherit_default_timeout: true,
1137            inherit_default_retry: true,
1138            inherit_default_interruptible: true,
1139            when: None,
1140            rules: Vec::new(),
1141            only: Vec::new(),
1142            except: Vec::new(),
1143            artifacts: ArtifactSpec::default(),
1144            cache: Vec::new(),
1145            image: None,
1146            variables: HashMap::new(),
1147            services: Vec::new(),
1148            timeout: None,
1149            retry: RetryPolicySpec::default(),
1150            interruptible: false,
1151            resource_group: None,
1152            parallel: None,
1153            tags: Vec::new(),
1154            environment: None,
1155        }
1156    }
1157}