1use super::{core::ExecutorCore, job_runner};
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::model::ArtifactSourceOutcome;
4use crate::pipeline::{
5 self, HaltKind, JobEvent, JobFailureKind, JobStatus, JobSummary, ResourceGroupManager, RuleWhen,
6};
7use crate::runtime;
8use crate::ui::{UiBridge, UiCommand, UiJobStatus};
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use tokio::{
13 sync::{Semaphore, mpsc},
14 task, time as tokio_time,
15};
16
17fn interruptible_running_jobs(plan: &ExecutionPlan, running: &HashSet<String>) -> Vec<String> {
18 let mut names = running
19 .iter()
20 .filter(|name| {
21 plan.nodes
22 .get(*name)
23 .map(|planned| planned.instance.interruptible)
24 .unwrap_or(false)
25 })
26 .cloned()
27 .collect::<Vec<_>>();
28 names.sort_by_key(|name| plan.order_index.get(name).copied().unwrap_or(usize::MAX));
29 names
30}
31
32pub(crate) async fn execute_plan(
33 exec: &ExecutorCore,
34 plan: Arc<ExecutionPlan>,
35 ui: Option<Arc<UiBridge>>,
36 mut commands: Option<&mut mpsc::UnboundedReceiver<UiCommand>>,
37) -> (Vec<JobSummary>, Result<()>) {
38 let total = plan.ordered.len();
41 if total == 0 {
42 return (Vec::new(), Ok(()));
43 }
44
45 let mut remaining: HashMap<String, usize> = plan
46 .nodes
47 .iter()
48 .map(|(name, job)| (name.clone(), job.instance.dependencies.len()))
49 .collect();
50 let mut ready: VecDeque<String> = VecDeque::new();
51 let mut waiting_on_failure: VecDeque<String> = VecDeque::new();
52 let mut delayed_pending: HashSet<String> = HashSet::new();
53 let mut resource_retry_pending: HashSet<String> = HashSet::new();
54 let mut manual_waiting: HashSet<String> = HashSet::new();
55 let mut running = HashSet::new();
56 let mut abort_requested = false;
57 let mut completed = 0usize;
58 let mut pipeline_failed = false;
59 let mut halt_kind = HaltKind::None;
60 let mut halt_error: Option<anyhow::Error> = None;
61 let mut summaries: Vec<JobSummary> = Vec::new();
62 let mut attempts: HashMap<String, u32> = HashMap::new();
63 let mut resource_waiting: HashMap<String, VecDeque<String>> = HashMap::new();
64 let resource_groups = ResourceGroupManager::new(runtime::resource_group_root());
65 let mut manual_input_available = commands.is_some();
66
67 let semaphore = Arc::new(Semaphore::new(exec.config.max_parallel_jobs.max(1)));
68 let exec = Arc::new(exec.clone());
69 let (tx, mut rx) = mpsc::unbounded_channel::<JobEvent>();
70 let (delay_tx, mut delay_rx) = mpsc::unbounded_channel::<String>();
71
72 let enqueue_ready = |job_name: &str,
73 pipeline_failed_flag: bool,
74 ready_queue: &mut VecDeque<String>,
75 wait_failure_queue: &mut VecDeque<String>,
76 delayed_set: &mut HashSet<String>| {
77 let Some(planned) = plan.nodes.get(job_name) else {
78 return;
79 };
80 match planned.instance.rule.when {
81 RuleWhen::OnFailure => {
82 if pipeline_failed_flag {
83 ready_queue.push_back(job_name.to_string());
84 } else {
85 wait_failure_queue.push_back(job_name.to_string());
86 }
87 }
88 RuleWhen::Delayed => {
89 if pipeline_failed_flag {
90 return;
91 }
92 if let Some(delay) = planned.instance.rule.start_in {
93 if delayed_set.insert(job_name.to_string()) {
94 let tx_clone = delay_tx.clone();
95 let name = job_name.to_string();
96 task::spawn(async move {
97 tokio_time::sleep(delay).await;
98 let _ = tx_clone.send(name);
99 });
100 }
101 } else {
102 ready_queue.push_back(job_name.to_string());
103 }
104 }
105 RuleWhen::Manual | RuleWhen::OnSuccess => {
106 if pipeline_failed_flag && planned.instance.rule.when.requires_success() {
107 return;
108 }
109 ready_queue.push_back(job_name.to_string());
110 }
111 RuleWhen::Always => {
112 ready_queue.push_back(job_name.to_string());
113 }
114 RuleWhen::Never => {}
115 }
116 };
117
118 for name in &plan.ordered {
119 if remaining.get(name).copied().unwrap_or(0) == 0 && !abort_requested {
120 enqueue_ready(
121 name,
122 pipeline_failed,
123 &mut ready,
124 &mut waiting_on_failure,
125 &mut delayed_pending,
126 );
127 }
128 }
129
130 while completed < total {
131 while let Some(name) = ready.pop_front() {
132 if abort_requested {
133 break;
134 }
135 let planned = match plan.nodes.get(&name).cloned() {
136 Some(job) => job,
137 None => continue,
138 };
139 if pipeline_failed && planned.instance.rule.when.requires_success() {
140 continue;
141 }
142
143 if matches!(planned.instance.rule.when, RuleWhen::Manual)
144 && !planned.instance.rule.manual_auto_run
145 {
146 if manual_input_available {
147 if manual_waiting.insert(name.clone())
148 && let Some(ui_ref) = ui.as_deref()
149 {
150 ui_ref.job_manual_pending(&name);
151 }
152 } else {
153 let reason = planned
154 .instance
155 .rule
156 .manual_reason
157 .clone()
158 .unwrap_or_else(|| "manual job not run".to_string());
159 if let Some(ui_ref) = ui.as_deref() {
160 ui_ref.job_finished(
161 &planned.instance.job.name,
162 UiJobStatus::Skipped,
163 0.0,
164 Some(reason.clone()),
165 );
166 }
167 summaries.push(JobSummary {
168 name: planned.instance.job.name.clone(),
169 stage_name: planned.instance.stage_name.clone(),
170 duration: 0.0,
171 status: JobStatus::Skipped(reason.clone()),
172 log_path: None,
173 log_hash: planned.log_hash.clone(),
174 allow_failure: planned.instance.rule.allow_failure,
175 environment: exec.expanded_environment(&planned.instance.job),
176 });
177 completed += 1;
178 release_dependents(
179 &plan,
180 &name,
181 &mut remaining,
182 abort_requested,
183 pipeline_failed,
184 &mut ReadyQueues {
185 ready: &mut ready,
186 waiting_on_failure: &mut waiting_on_failure,
187 delayed_pending: &mut delayed_pending,
188 },
189 &enqueue_ready,
190 );
191 }
192 continue;
193 }
194
195 if let Some(group) = &planned.instance.resource_group {
196 let acquired =
197 match resource_groups.try_acquire(group, &planned.instance.job.name) {
198 Ok(acquired) => acquired,
199 Err(err) => {
200 halt_kind = HaltKind::JobFailure;
201 pipeline_failed = true;
202 if halt_error.is_none() {
203 halt_error = Some(err.context(format!(
204 "failed to acquire resource group '{}'",
205 group
206 )));
207 }
208 break;
209 }
210 };
211 if !acquired {
212 if running.is_empty() && ready.is_empty() {
213 loop {
214 tokio_time::sleep(std::time::Duration::from_millis(500)).await;
215 match resource_groups.try_acquire(group, &planned.instance.job.name) {
216 Ok(true) => break,
217 Ok(false) => continue,
218 Err(err) => {
219 halt_kind = HaltKind::JobFailure;
220 pipeline_failed = true;
221 if halt_error.is_none() {
222 halt_error = Some(err.context(format!(
223 "failed to acquire resource group '{}'",
224 group
225 )));
226 }
227 break;
228 }
229 }
230 }
231 if pipeline_failed {
232 break;
233 }
234 } else {
235 if resource_retry_pending.insert(name.clone()) {
236 let tx_clone = delay_tx.clone();
237 let retry_name = name.clone();
238 task::spawn(async move {
239 tokio_time::sleep(std::time::Duration::from_millis(500)).await;
240 let _ = tx_clone.send(retry_name);
241 });
242 }
243 continue;
244 }
245 }
246 }
247
248 let entry = attempts.entry(name.clone()).or_insert(0);
249 *entry += 1;
250
251 let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
252 Ok(info) => info,
253 Err(err) => {
254 summaries.push(JobSummary {
255 name: planned.instance.job.name.clone(),
256 stage_name: planned.instance.stage_name.clone(),
257 duration: 0.0,
258 status: JobStatus::Failed(err.to_string()),
259 log_path: Some(planned.log_path.clone()),
260 log_hash: planned.log_hash.clone(),
261 allow_failure: false,
262 environment: exec.expanded_environment(&planned.instance.job),
263 });
264 return (summaries, Err(err));
265 }
266 };
267 running.insert(name.clone());
268 pipeline::spawn_job(
269 exec.clone(),
270 plan.clone(),
271 planned,
272 run_info,
273 semaphore.clone(),
274 tx.clone(),
275 ui.clone(),
276 );
277 }
278
279 if completed >= total {
280 break;
281 }
282
283 if running.is_empty()
284 && ready.is_empty()
285 && delayed_pending.is_empty()
286 && pipeline_failed
287 && waiting_on_failure.is_empty()
288 && manual_waiting.is_empty()
289 {
290 break;
291 }
292
293 if running.is_empty()
294 && ready.is_empty()
295 && delayed_pending.is_empty()
296 && !pipeline_failed
297 && waiting_on_failure.is_empty()
298 && manual_waiting.is_empty()
299 {
300 let remaining_jobs: Vec<_> = remaining
301 .iter()
302 .filter_map(|(name, &count)| if count > 0 { Some(name.clone()) } else { None })
303 .collect();
304 if !remaining_jobs.is_empty() {
305 halt_kind = HaltKind::Deadlock;
306 halt_error = Some(anyhow!(
307 "no runnable jobs, potential dependency cycle involving: {:?}",
308 remaining_jobs
309 ));
310 }
311 break;
312 }
313
314 if running.is_empty()
315 && ready.is_empty()
316 && delayed_pending.is_empty()
317 && !pipeline_failed
318 && !waiting_on_failure.is_empty()
319 && manual_waiting.is_empty()
320 {
321 break;
322 }
323
324 enum SchedulerEvent {
325 Job(JobEvent),
326 Delay(String),
327 Command(UiCommand),
328 }
329
330 let next_event = tokio::select! {
331 Some(event) = rx.recv() => Some(SchedulerEvent::Job(event)),
332 Some(name) = delay_rx.recv() => Some(SchedulerEvent::Delay(name)),
333 cmd = async {
334 if let Some(rx) = commands.as_mut() {
335 (*rx).recv().await
336 } else {
337 std::future::pending().await
338 }
339 } => {
340 match cmd {
341 Some(command) => Some(SchedulerEvent::Command(command)),
342 None => {
343 manual_input_available = false;
344 commands = None;
345 None
346 }
347 }
348 }
349 else => None,
350 };
351
352 if !manual_input_available && !manual_waiting.is_empty() {
353 let pending: Vec<String> = manual_waiting.drain().collect();
354 for name in pending {
355 if let Some(planned) = plan.nodes.get(&name) {
356 let reason = planned
357 .instance
358 .rule
359 .manual_reason
360 .clone()
361 .unwrap_or_else(|| "manual job not run".to_string());
362 if let Some(ui_ref) = ui.as_deref() {
363 ui_ref.job_finished(
364 &planned.instance.job.name,
365 UiJobStatus::Skipped,
366 0.0,
367 Some(reason.clone()),
368 );
369 }
370 summaries.push(JobSummary {
371 name: planned.instance.job.name.clone(),
372 stage_name: planned.instance.stage_name.clone(),
373 duration: 0.0,
374 status: JobStatus::Skipped(reason),
375 log_path: None,
376 log_hash: planned.log_hash.clone(),
377 allow_failure: planned.instance.rule.allow_failure,
378 environment: exec.expanded_environment(&planned.instance.job),
379 });
380 completed += 1;
381 release_dependents(
382 &plan,
383 &name,
384 &mut remaining,
385 abort_requested,
386 pipeline_failed,
387 &mut ReadyQueues {
388 ready: &mut ready,
389 waiting_on_failure: &mut waiting_on_failure,
390 delayed_pending: &mut delayed_pending,
391 },
392 &enqueue_ready,
393 );
394 }
395 }
396 }
397
398 let Some(event) = next_event else {
399 if running.is_empty()
400 && ready.is_empty()
401 && delayed_pending.is_empty()
402 && resource_retry_pending.is_empty()
403 {
404 halt_kind = HaltKind::ChannelClosed;
405 halt_error = Some(anyhow!(
406 "job worker channel closed unexpectedly while {} jobs remained",
407 total - completed
408 ));
409 break;
410 }
411 continue;
412 };
413
414 match event {
415 SchedulerEvent::Delay(name) => {
416 if abort_requested {
417 continue;
418 }
419 if resource_retry_pending.remove(&name) {
420 ready.push_back(name);
421 continue;
422 }
423 delayed_pending.remove(&name);
424 if pipeline_failed
425 && let Some(planned) = plan.nodes.get(&name)
426 && planned.instance.rule.when.requires_success()
427 {
428 continue;
429 }
430 ready.push_back(name);
431 }
432 SchedulerEvent::Command(cmd) => match cmd {
433 UiCommand::StartManual { name } => {
434 if manual_waiting.remove(&name) {
435 ready.push_back(name);
436 }
437 }
438 UiCommand::CancelJob { name } => {
439 exec.cancel_running_job(&name);
440 }
441 UiCommand::AbortPipeline => {
442 abort_requested = true;
443 pipeline_failed = true;
444 halt_kind = HaltKind::Aborted;
445 if halt_error.is_none() {
446 halt_error = Some(anyhow!("pipeline aborted by user"));
447 }
448 for name in interruptible_running_jobs(plan.as_ref(), &running) {
449 exec.cancel_running_job(&name);
450 }
451 ready.clear();
452 waiting_on_failure.clear();
453 delayed_pending.clear();
454 manual_waiting.clear();
455 }
456 UiCommand::RestartJob { .. } => {}
457 },
458 SchedulerEvent::Job(event) => {
459 running.remove(&event.name);
460 let Some(planned) = plan.nodes.get(&event.name) else {
461 let message = format!(
462 "completed job '{}' was not found in execution plan",
463 event.name
464 );
465 if !pipeline_failed {
466 pipeline_failed = true;
467 halt_kind = HaltKind::JobFailure;
468 if halt_error.is_none() {
469 halt_error = Some(anyhow!(message.clone()));
470 }
471 }
472 summaries.push(JobSummary {
473 name: event.name.clone(),
474 stage_name: event.stage_name.clone(),
475 duration: event.duration,
476 status: JobStatus::Failed(message),
477 log_path: event.log_path.clone(),
478 log_hash: event.log_hash.clone(),
479 allow_failure: false,
480 environment: None,
481 });
482 completed += 1;
483 continue;
484 };
485 match event.result {
486 Ok(_) => {
487 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Success);
488 release_resource_lock(
489 planned,
490 &mut ready,
491 &resource_groups,
492 &mut resource_waiting,
493 );
494 release_dependents(
495 &plan,
496 &event.name,
497 &mut remaining,
498 abort_requested,
499 pipeline_failed,
500 &mut ReadyQueues {
501 ready: &mut ready,
502 waiting_on_failure: &mut waiting_on_failure,
503 delayed_pending: &mut delayed_pending,
504 },
505 &enqueue_ready,
506 );
507 summaries.push(JobSummary {
508 name: event.name.clone(),
509 stage_name: event.stage_name.clone(),
510 duration: event.duration,
511 status: JobStatus::Success,
512 log_path: event.log_path.clone(),
513 log_hash: event.log_hash.clone(),
514 allow_failure: planned.instance.rule.allow_failure,
515 environment: exec.expanded_environment(&planned.instance.job),
516 });
517 completed += 1;
518 }
519 Err(err) => {
520 if event.cancelled {
521 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Skipped);
522 release_resource_lock(
523 planned,
524 &mut ready,
525 &resource_groups,
526 &mut resource_waiting,
527 );
528 summaries.push(JobSummary {
529 name: event.name.clone(),
530 stage_name: event.stage_name.clone(),
531 duration: event.duration,
532 status: JobStatus::Skipped("aborted by user".to_string()),
533 log_path: event.log_path.clone(),
534 log_hash: event.log_hash.clone(),
535 allow_failure: true,
536 environment: exec.expanded_environment(&planned.instance.job),
537 });
538 completed += 1;
539 continue;
540 }
541 let err_msg = err.to_string();
542 let attempts_so_far = attempts.get(&event.name).copied().unwrap_or(1);
543 let retries_used = attempts_so_far.saturating_sub(1);
544 if retries_used < planned.instance.retry.max
545 && retry_allowed(
546 &planned.instance.retry.when,
547 &planned.instance.retry.exit_codes,
548 event.failure_kind,
549 event.exit_code,
550 )
551 {
552 release_resource_lock(
553 planned,
554 &mut ready,
555 &resource_groups,
556 &mut resource_waiting,
557 );
558 ready.push_back(event.name.clone());
559 continue;
560 }
561 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Failed);
562 release_resource_lock(
563 planned,
564 &mut ready,
565 &resource_groups,
566 &mut resource_waiting,
567 );
568 if !planned.instance.rule.allow_failure && !pipeline_failed {
569 pipeline_failed = true;
570 halt_kind = HaltKind::JobFailure;
571 if halt_error.is_none() {
572 halt_error =
573 Some(anyhow!("job '{}' failed: {}", event.name, err_msg));
574 }
575 while let Some(name) = waiting_on_failure.pop_front() {
576 ready.push_back(name);
577 }
578 }
579 release_dependents(
580 &plan,
581 &event.name,
582 &mut remaining,
583 abort_requested,
584 pipeline_failed,
585 &mut ReadyQueues {
586 ready: &mut ready,
587 waiting_on_failure: &mut waiting_on_failure,
588 delayed_pending: &mut delayed_pending,
589 },
590 &enqueue_ready,
591 );
592 summaries.push(JobSummary {
593 name: event.name.clone(),
594 stage_name: event.stage_name.clone(),
595 duration: event.duration,
596 status: JobStatus::Failed(err_msg),
597 log_path: event.log_path.clone(),
598 log_hash: event.log_hash.clone(),
599 allow_failure: planned.instance.rule.allow_failure,
600 environment: exec.expanded_environment(&planned.instance.job),
601 });
602 completed += 1;
603 }
604 }
605 }
606 }
607 }
608
609 let skip_reason = match halt_kind {
610 HaltKind::JobFailure => Some("not run (pipeline stopped after failure)".to_string()),
611 HaltKind::Deadlock => Some("not run (dependency cycle detected)".to_string()),
612 HaltKind::ChannelClosed => {
613 Some("not run (executor channel closed unexpectedly)".to_string())
614 }
615 HaltKind::Aborted => Some("not run (pipeline aborted by user)".to_string()),
616 HaltKind::None => None,
617 };
618
619 let mut recorded: HashSet<String> = summaries.iter().map(|entry| entry.name.clone()).collect();
620 for job_name in &plan.ordered {
621 if recorded.contains(job_name) {
622 continue;
623 }
624 let Some(planned) = plan.nodes.get(job_name) else {
625 continue;
626 };
627 let reason = if let Some(reason) = skip_reason.clone() {
628 Some(reason)
629 } else if planned.instance.rule.when == RuleWhen::OnFailure {
630 Some("skipped (rules: on_failure and pipeline succeeded)".to_string())
631 } else {
632 None
633 };
634
635 if let Some(reason) = reason {
636 if let Some(ui_ref) = ui.as_deref() {
637 ui_ref.job_finished(job_name, UiJobStatus::Skipped, 0.0, Some(reason.clone()));
638 }
639 summaries.push(JobSummary {
640 name: job_name.clone(),
641 stage_name: planned.instance.stage_name.clone(),
642 duration: 0.0,
643 status: JobStatus::Skipped(reason.clone()),
644 log_path: Some(planned.log_path.clone()),
645 log_hash: planned.log_hash.clone(),
646 allow_failure: planned.instance.rule.allow_failure,
647 environment: exec.expanded_environment(&planned.instance.job),
648 });
649 recorded.insert(job_name.clone());
650 }
651 }
652
653 let result = halt_error.map_or(Ok(()), Err);
654 (summaries, result)
655}
656
657fn retry_allowed(
658 conditions: &[String],
659 exit_codes: &[i32],
660 failure_kind: Option<JobFailureKind>,
661 exit_code: Option<i32>,
662) -> bool {
663 if conditions.is_empty() && exit_codes.is_empty() {
664 return true;
665 }
666 let when_matches = failure_kind.is_some_and(|kind| {
667 conditions
668 .iter()
669 .any(|condition| retry_condition_matches(condition, kind))
670 });
671 let exit_code_matches = exit_code.is_some_and(|code| exit_codes.contains(&code));
672 when_matches || exit_code_matches
673}
674
675fn retry_condition_matches(condition: &str, failure_kind: JobFailureKind) -> bool {
676 match condition {
677 "always" => true,
678 "unknown_failure" => failure_kind == JobFailureKind::UnknownFailure,
679 "script_failure" => failure_kind == JobFailureKind::ScriptFailure,
680 "api_failure" => failure_kind == JobFailureKind::ApiFailure,
681 "job_execution_timeout" => failure_kind == JobFailureKind::JobExecutionTimeout,
682 "runner_system_failure" => failure_kind == JobFailureKind::RunnerSystemFailure,
683 "runner_unsupported" => failure_kind == JobFailureKind::RunnerUnsupported,
684 "stale_schedule" => failure_kind == JobFailureKind::StaleSchedule,
685 "archived_failure" => failure_kind == JobFailureKind::ArchivedFailure,
686 "unmet_prerequisites" => failure_kind == JobFailureKind::UnmetPrerequisites,
687 "scheduler_failure" => failure_kind == JobFailureKind::SchedulerFailure,
688 "data_integrity_failure" => failure_kind == JobFailureKind::DataIntegrityFailure,
689 "stuck_or_timeout_failure" => {
690 matches!(
691 failure_kind,
692 JobFailureKind::StuckOrTimeoutFailure | JobFailureKind::JobExecutionTimeout
693 )
694 }
695 _ => false,
696 }
697}
698
699pub(crate) async fn handle_restart_commands(
700 exec: &ExecutorCore,
701 plan: Arc<ExecutionPlan>,
702 ui: Option<Arc<UiBridge>>,
703 commands: &mut mpsc::UnboundedReceiver<UiCommand>,
704 summaries: &mut Vec<JobSummary>,
705) -> Result<()> {
706 while let Some(command) = commands.recv().await {
707 match command {
708 UiCommand::RestartJob { name } => {
709 let Some(planned) = plan.nodes.get(&name).cloned() else {
710 continue;
711 };
712
713 if let Some(ui_ref) = ui.as_deref() {
714 ui_ref.job_restarted(&name);
715 }
716
717 let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
718 Ok(info) => info,
719 Err(err) => {
720 summaries.push(JobSummary {
721 name: planned.instance.job.name.clone(),
722 stage_name: planned.instance.stage_name.clone(),
723 duration: 0.0,
724 status: JobStatus::Failed(err.to_string()),
725 log_path: Some(planned.log_path.clone()),
726 log_hash: planned.log_hash.clone(),
727 allow_failure: false,
728 environment: exec.expanded_environment(&planned.instance.job),
729 });
730 return Err(err);
731 }
732 };
733 let restart_exec = exec.clone();
734 let ui_clone = ui.clone();
735 let run_info_clone = run_info.clone();
736 let job_plan = plan.clone();
737 let event = task::spawn_blocking(move || {
738 job_runner::run_planned_job(
739 &restart_exec,
740 job_plan,
741 planned,
742 run_info_clone,
743 ui_clone,
744 )
745 })
746 .await
747 .context("job restart task failed")?;
748 update_summaries_from_event(exec, plan.as_ref(), event, summaries);
749 }
750 UiCommand::StartManual { .. } => {}
751 UiCommand::CancelJob { .. } => {}
752 UiCommand::AbortPipeline => break,
753 }
754 }
755 Ok(())
756}
757
758fn update_summaries_from_event(
759 exec: &ExecutorCore,
760 plan: &ExecutionPlan,
761 event: JobEvent,
762 summaries: &mut Vec<JobSummary>,
763) {
764 let JobEvent {
765 name,
766 stage_name,
767 duration,
768 log_path,
769 log_hash,
770 result,
771 failure_kind: _,
772 exit_code: _,
773 cancelled,
774 } = event;
775
776 let allow_failure = plan
777 .nodes
778 .get(&name)
779 .map(|planned| planned.instance.rule.allow_failure)
780 .unwrap_or(false);
781 let environment = plan
782 .nodes
783 .get(&name)
784 .and_then(|planned| exec.expanded_environment(&planned.instance.job));
785
786 let status = match result {
787 Ok(_) => JobStatus::Success,
788 Err(err) => {
789 if cancelled {
790 JobStatus::Skipped("aborted by user".to_string())
791 } else {
792 JobStatus::Failed(err.to_string())
793 }
794 }
795 };
796 let outcome = match &status {
797 JobStatus::Success => ArtifactSourceOutcome::Success,
798 JobStatus::Failed(_) => ArtifactSourceOutcome::Failed,
799 JobStatus::Skipped(_) => ArtifactSourceOutcome::Skipped,
800 };
801 exec.record_completed_job(&name, outcome);
802
803 summaries.retain(|entry| entry.name != name);
804 summaries.push(JobSummary {
805 name,
806 stage_name,
807 duration,
808 status,
809 log_path,
810 log_hash,
811 allow_failure,
812 environment,
813 });
814}
815
816struct ReadyQueues<'a> {
817 ready: &'a mut VecDeque<String>,
818 waiting_on_failure: &'a mut VecDeque<String>,
819 delayed_pending: &'a mut HashSet<String>,
820}
821
822fn release_dependents<F>(
823 plan: &ExecutionPlan,
824 name: &str,
825 remaining: &mut HashMap<String, usize>,
826 abort_requested: bool,
827 pipeline_failed: bool,
828 queues: &mut ReadyQueues<'_>,
829 enqueue_ready: &F,
830) where
831 F: Fn(&str, bool, &mut VecDeque<String>, &mut VecDeque<String>, &mut HashSet<String>),
832{
833 if let Some(children) = plan.dependents.get(name) {
834 for child in children {
835 if let Some(count) = remaining.get_mut(child)
836 && *count > 0
837 {
838 *count -= 1;
839 if *count == 0 && !abort_requested {
840 enqueue_ready(
841 child,
842 pipeline_failed,
843 queues.ready,
844 queues.waiting_on_failure,
845 queues.delayed_pending,
846 );
847 }
848 }
849 }
850 }
851}
852
853fn release_resource_lock(
854 planned: &ExecutableJob,
855 ready: &mut VecDeque<String>,
856 resource_groups: &ResourceGroupManager,
857 resource_waiting: &mut HashMap<String, VecDeque<String>>,
858) {
859 if let Some(group) = &planned.instance.resource_group {
860 let _ = resource_groups.release(group);
861 if let Some(queue) = resource_waiting.get_mut(group)
862 && let Some(next) = queue.pop_front()
863 {
864 ready.push_back(next);
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use super::{interruptible_running_jobs, release_resource_lock, retry_allowed};
872 use crate::compiler::JobInstance;
873 use crate::execution_plan::{ExecutableJob, ExecutionPlan};
874 use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
875 use crate::pipeline::{JobFailureKind, ResourceGroupManager, RuleEvaluation, RuleWhen};
876 use std::collections::{HashMap, HashSet, VecDeque};
877 use std::path::PathBuf;
878 use tempfile::tempdir;
879
880 #[test]
881 fn release_resource_lock_requeues_next_waiting_job() {
882 let planned = ExecutableJob {
883 instance: JobInstance {
884 job: job("build"),
885 stage_name: "build".into(),
886 dependencies: Vec::new(),
887 rule: RuleEvaluation {
888 included: true,
889 when: RuleWhen::OnSuccess,
890 ..Default::default()
891 },
892 timeout: None,
893 retry: RetryPolicySpec::default(),
894 interruptible: false,
895 resource_group: Some("builder".into()),
896 },
897 log_path: PathBuf::from("/tmp/build.log"),
898 log_hash: "hash".into(),
899 };
900 let mut ready = VecDeque::new();
901 let temp = tempdir().expect("tempdir");
902 let manager = ResourceGroupManager::new(temp.path().join("locks"));
903 manager
904 .try_acquire("builder", "build")
905 .expect("lock acquires");
906 let mut resource_waiting = HashMap::from([(
907 "builder".to_string(),
908 VecDeque::from(["package".to_string()]),
909 )]);
910
911 release_resource_lock(&planned, &mut ready, &manager, &mut resource_waiting);
912
913 assert_eq!(ready, VecDeque::from(["package".to_string()]));
914 assert!(
915 manager
916 .try_acquire("builder", "package")
917 .expect("lock re-acquires")
918 );
919 assert!(resource_waiting["builder"].is_empty());
920 }
921
922 #[test]
923 fn retry_allowed_defaults_to_true_without_conditions() {
924 assert!(retry_allowed(
925 &[],
926 &[],
927 Some(JobFailureKind::ScriptFailure),
928 Some(1)
929 ));
930 }
931
932 #[test]
933 fn retry_allowed_matches_script_failure_condition() {
934 assert!(retry_allowed(
935 &["script_failure".into()],
936 &[],
937 Some(JobFailureKind::ScriptFailure),
938 Some(1)
939 ));
940 assert!(!retry_allowed(
941 &["runner_system_failure".into()],
942 &[],
943 Some(JobFailureKind::ScriptFailure),
944 Some(1)
945 ));
946 }
947
948 #[test]
949 fn retry_allowed_treats_job_timeout_as_stuck_or_timeout_failure() {
950 assert!(retry_allowed(
951 &["stuck_or_timeout_failure".into()],
952 &[],
953 Some(JobFailureKind::JobExecutionTimeout),
954 None
955 ));
956 }
957
958 #[test]
959 fn retry_allowed_matches_api_failure_condition() {
960 assert!(retry_allowed(
961 &["api_failure".into()],
962 &[],
963 Some(JobFailureKind::ApiFailure),
964 None
965 ));
966 assert!(!retry_allowed(
967 &["api_failure".into()],
968 &[],
969 Some(JobFailureKind::UnknownFailure),
970 None
971 ));
972 }
973
974 #[test]
975 fn retry_allowed_matches_unmet_prerequisites_condition() {
976 assert!(retry_allowed(
977 &["unmet_prerequisites".into()],
978 &[],
979 Some(JobFailureKind::UnmetPrerequisites),
980 None
981 ));
982 }
983
984 #[test]
985 fn retry_allowed_matches_exit_code_condition() {
986 assert!(retry_allowed(
987 &[],
988 &[137],
989 Some(JobFailureKind::ScriptFailure),
990 Some(137)
991 ));
992 assert!(!retry_allowed(
993 &[],
994 &[137],
995 Some(JobFailureKind::ScriptFailure),
996 Some(1)
997 ));
998 }
999
1000 #[test]
1001 fn retry_allowed_matches_when_or_exit_code() {
1002 assert!(retry_allowed(
1003 &["runner_system_failure".into()],
1004 &[137],
1005 Some(JobFailureKind::ScriptFailure),
1006 Some(137)
1007 ));
1008 }
1009
1010 #[test]
1011 fn interruptible_running_jobs_selects_only_interruptible_nodes() {
1012 let plan = ExecutionPlan {
1013 ordered: vec!["build".into(), "deploy".into()],
1014 nodes: HashMap::from([
1015 ("build".into(), executable_job("build", true, "build", 0)),
1016 (
1017 "deploy".into(),
1018 executable_job("deploy", false, "deploy", 1),
1019 ),
1020 ]),
1021 dependents: HashMap::new(),
1022 order_index: HashMap::from([("build".into(), 0), ("deploy".into(), 1)]),
1023 variants: HashMap::new(),
1024 };
1025 let running = HashSet::from(["build".to_string(), "deploy".to_string()]);
1026
1027 assert_eq!(interruptible_running_jobs(&plan, &running), vec!["build"]);
1028 }
1029
1030 #[test]
1031 fn interruptible_running_jobs_respects_plan_order() {
1032 let plan = ExecutionPlan {
1033 ordered: vec!["test".into(), "build".into()],
1034 nodes: HashMap::from([
1035 ("build".into(), executable_job("build", true, "build", 1)),
1036 ("test".into(), executable_job("test", true, "test", 0)),
1037 ]),
1038 dependents: HashMap::new(),
1039 order_index: HashMap::from([("test".into(), 0), ("build".into(), 1)]),
1040 variants: HashMap::new(),
1041 };
1042 let running = HashSet::from(["build".to_string(), "test".to_string()]);
1043
1044 assert_eq!(
1045 interruptible_running_jobs(&plan, &running),
1046 vec!["test", "build"]
1047 );
1048 }
1049
1050 fn executable_job(name: &str, interruptible: bool, stage: &str, order: usize) -> ExecutableJob {
1051 let mut job = job(name);
1052 job.interruptible = interruptible;
1053 ExecutableJob {
1054 instance: JobInstance {
1055 job,
1056 stage_name: stage.into(),
1057 dependencies: Vec::new(),
1058 rule: RuleEvaluation::default(),
1059 timeout: None,
1060 retry: RetryPolicySpec::default(),
1061 interruptible,
1062 resource_group: None,
1063 },
1064 log_path: PathBuf::from(format!("/tmp/{name}-{order}.log")),
1065 log_hash: format!("hash-{name}-{order}"),
1066 }
1067 }
1068
1069 fn job(name: &str) -> JobSpec {
1070 JobSpec {
1071 name: name.into(),
1072 stage: "build".into(),
1073 commands: vec!["true".into()],
1074 needs: Vec::new(),
1075 explicit_needs: false,
1076 dependencies: Vec::new(),
1077 before_script: None,
1078 after_script: None,
1079 inherit_default_before_script: true,
1080 inherit_default_after_script: true,
1081 inherit_default_image: true,
1082 inherit_default_cache: true,
1083 inherit_default_services: true,
1084 inherit_default_timeout: true,
1085 inherit_default_retry: true,
1086 inherit_default_interruptible: true,
1087 when: None,
1088 rules: Vec::new(),
1089 only: Vec::new(),
1090 except: Vec::new(),
1091 artifacts: ArtifactSpec::default(),
1092 cache: Vec::new(),
1093 image: None,
1094 variables: HashMap::new(),
1095 services: Vec::new(),
1096 timeout: None,
1097 retry: RetryPolicySpec::default(),
1098 interruptible: false,
1099 resource_group: None,
1100 parallel: None,
1101 tags: Vec::new(),
1102 environment: None,
1103 }
1104 }
1105}