1use super::{core::ExecutorCore, job_runner};
2use crate::execution_plan::{ExecutableJob, ExecutionPlan};
3use crate::model::ArtifactSourceOutcome;
4use crate::pipeline::{
5 self, HaltKind, JobEvent, JobFailureKind, JobStatus, JobSummary, ResourceGroupManager, RuleWhen,
6};
7use crate::runtime;
8use crate::ui::{UiBridge, UiCommand, UiJobStatus};
9use anyhow::{Context, Result, anyhow};
10use std::collections::{HashMap, HashSet, VecDeque};
11use std::sync::Arc;
12use tokio::{
13 sync::{Semaphore, mpsc},
14 task, time as tokio_time,
15};
16
17fn interruptible_running_jobs(plan: &ExecutionPlan, running: &HashSet<String>) -> Vec<String> {
18 let mut names = running
19 .iter()
20 .filter(|name| {
21 plan.nodes
22 .get(*name)
23 .map(|planned| planned.instance.interruptible)
24 .unwrap_or(false)
25 })
26 .cloned()
27 .collect::<Vec<_>>();
28 names.sort_by_key(|name| plan.order_index.get(name).copied().unwrap_or(usize::MAX));
29 names
30}
31
32pub(crate) async fn execute_plan(
33 exec: &ExecutorCore,
34 plan: Arc<ExecutionPlan>,
35 ui: Option<Arc<UiBridge>>,
36 mut commands: Option<&mut mpsc::UnboundedReceiver<UiCommand>>,
37) -> (Vec<JobSummary>, Result<()>) {
38 let total = plan.ordered.len();
41 if total == 0 {
42 return (Vec::new(), Ok(()));
43 }
44
45 let mut remaining: HashMap<String, usize> = plan
46 .nodes
47 .iter()
48 .map(|(name, job)| (name.clone(), job.instance.dependencies.len()))
49 .collect();
50 let mut ready: VecDeque<String> = VecDeque::new();
51 let mut waiting_on_failure: VecDeque<String> = VecDeque::new();
52 let mut delayed_pending: HashSet<String> = HashSet::new();
53 let mut resource_retry_pending: HashSet<String> = HashSet::new();
54 let mut manual_waiting: HashSet<String> = HashSet::new();
55 let mut running = HashSet::new();
56 let mut abort_requested = false;
57 let mut completed = 0usize;
58 let mut pipeline_failed = false;
59 let mut halt_kind = HaltKind::None;
60 let mut halt_error: Option<anyhow::Error> = None;
61 let mut summaries: Vec<JobSummary> = Vec::new();
62 let mut attempts: HashMap<String, u32> = HashMap::new();
63 let mut resource_waiting: HashMap<String, VecDeque<String>> = HashMap::new();
64 let resource_groups = ResourceGroupManager::new(runtime::resource_group_root());
65 let mut manual_input_available = commands.is_some();
66
67 let semaphore = Arc::new(Semaphore::new(exec.config.max_parallel_jobs.max(1)));
68 let exec = Arc::new(exec.clone());
69 let (tx, mut rx) = mpsc::unbounded_channel::<JobEvent>();
70 let (delay_tx, mut delay_rx) = mpsc::unbounded_channel::<String>();
71
72 let enqueue_ready = |job_name: &str,
73 pipeline_failed_flag: bool,
74 ready_queue: &mut VecDeque<String>,
75 wait_failure_queue: &mut VecDeque<String>,
76 delayed_set: &mut HashSet<String>| {
77 let Some(planned) = plan.nodes.get(job_name) else {
78 return;
79 };
80 match planned.instance.rule.when {
81 RuleWhen::OnFailure => {
82 if pipeline_failed_flag {
83 ready_queue.push_back(job_name.to_string());
84 } else {
85 wait_failure_queue.push_back(job_name.to_string());
86 }
87 }
88 RuleWhen::Delayed => {
89 if pipeline_failed_flag {
90 return;
91 }
92 if let Some(delay) = planned.instance.rule.start_in {
93 if delayed_set.insert(job_name.to_string()) {
94 let tx_clone = delay_tx.clone();
95 let name = job_name.to_string();
96 task::spawn(async move {
97 tokio_time::sleep(delay).await;
98 let _ = tx_clone.send(name);
99 });
100 }
101 } else {
102 ready_queue.push_back(job_name.to_string());
103 }
104 }
105 RuleWhen::Manual | RuleWhen::OnSuccess => {
106 if pipeline_failed_flag && planned.instance.rule.when.requires_success() {
107 return;
108 }
109 ready_queue.push_back(job_name.to_string());
110 }
111 RuleWhen::Always => {
112 ready_queue.push_back(job_name.to_string());
113 }
114 RuleWhen::Never => {}
115 }
116 };
117
118 for name in &plan.ordered {
119 if remaining.get(name).copied().unwrap_or(0) == 0 && !abort_requested {
120 enqueue_ready(
121 name,
122 pipeline_failed,
123 &mut ready,
124 &mut waiting_on_failure,
125 &mut delayed_pending,
126 );
127 }
128 }
129
130 while completed < total {
131 while let Some(name) = ready.pop_front() {
132 if abort_requested {
133 break;
134 }
135 let planned = match plan.nodes.get(&name).cloned() {
136 Some(job) => job,
137 None => continue,
138 };
139 if pipeline_failed && planned.instance.rule.when.requires_success() {
140 continue;
141 }
142
143 if matches!(planned.instance.rule.when, RuleWhen::Manual)
144 && !planned.instance.rule.manual_auto_run
145 {
146 if manual_input_available {
147 if manual_waiting.insert(name.clone())
148 && let Some(ui_ref) = ui.as_deref()
149 {
150 ui_ref.job_manual_pending(&name);
151 }
152 } else {
153 let reason = planned
154 .instance
155 .rule
156 .manual_reason
157 .clone()
158 .unwrap_or_else(|| "manual job not run".to_string());
159 if let Some(ui_ref) = ui.as_deref() {
160 ui_ref.job_finished(
161 &planned.instance.job.name,
162 UiJobStatus::Skipped,
163 0.0,
164 Some(reason.clone()),
165 );
166 }
167 summaries.push(JobSummary {
168 name: planned.instance.job.name.clone(),
169 stage_name: planned.instance.stage_name.clone(),
170 duration: 0.0,
171 status: JobStatus::Skipped(reason.clone()),
172 log_path: None,
173 log_hash: planned.log_hash.clone(),
174 allow_failure: planned.instance.rule.allow_failure,
175 environment: exec.expanded_environment(&planned.instance.job),
176 });
177 completed += 1;
178 release_dependents(
179 &plan,
180 &name,
181 &mut remaining,
182 abort_requested,
183 pipeline_failed,
184 &mut ReadyQueues {
185 ready: &mut ready,
186 waiting_on_failure: &mut waiting_on_failure,
187 delayed_pending: &mut delayed_pending,
188 },
189 &enqueue_ready,
190 );
191 }
192 continue;
193 }
194
195 if let Some(group) = &planned.instance.resource_group {
196 let acquired =
197 match resource_groups.try_acquire(group, &planned.instance.job.name) {
198 Ok(acquired) => acquired,
199 Err(err) => {
200 halt_kind = HaltKind::JobFailure;
201 pipeline_failed = true;
202 if halt_error.is_none() {
203 halt_error = Some(err.context(format!(
204 "failed to acquire resource group '{}'",
205 group
206 )));
207 }
208 break;
209 }
210 };
211 if !acquired {
212 if running.is_empty() && ready.is_empty() {
213 loop {
214 tokio_time::sleep(std::time::Duration::from_millis(500)).await;
215 match resource_groups.try_acquire(group, &planned.instance.job.name) {
216 Ok(true) => break,
217 Ok(false) => continue,
218 Err(err) => {
219 halt_kind = HaltKind::JobFailure;
220 pipeline_failed = true;
221 if halt_error.is_none() {
222 halt_error = Some(err.context(format!(
223 "failed to acquire resource group '{}'",
224 group
225 )));
226 }
227 break;
228 }
229 }
230 }
231 if pipeline_failed {
232 break;
233 }
234 } else {
235 if resource_retry_pending.insert(name.clone()) {
236 let tx_clone = delay_tx.clone();
237 let retry_name = name.clone();
238 task::spawn(async move {
239 tokio_time::sleep(std::time::Duration::from_millis(500)).await;
240 let _ = tx_clone.send(retry_name);
241 });
242 }
243 continue;
244 }
245 }
246 }
247
248 let entry = attempts.entry(name.clone()).or_insert(0);
249 *entry += 1;
250
251 let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
252 Ok(info) => info,
253 Err(err) => {
254 summaries.push(JobSummary {
255 name: planned.instance.job.name.clone(),
256 stage_name: planned.instance.stage_name.clone(),
257 duration: 0.0,
258 status: JobStatus::Failed(err.to_string()),
259 log_path: Some(planned.log_path.clone()),
260 log_hash: planned.log_hash.clone(),
261 allow_failure: false,
262 environment: exec.expanded_environment(&planned.instance.job),
263 });
264 return (summaries, Err(err));
265 }
266 };
267 running.insert(name.clone());
268 pipeline::spawn_job(
269 exec.clone(),
270 plan.clone(),
271 planned,
272 run_info,
273 semaphore.clone(),
274 tx.clone(),
275 ui.clone(),
276 );
277 }
278
279 if completed >= total {
280 break;
281 }
282
283 if running.is_empty()
284 && ready.is_empty()
285 && delayed_pending.is_empty()
286 && pipeline_failed
287 && waiting_on_failure.is_empty()
288 && manual_waiting.is_empty()
289 {
290 break;
291 }
292
293 if running.is_empty()
294 && ready.is_empty()
295 && delayed_pending.is_empty()
296 && !pipeline_failed
297 && waiting_on_failure.is_empty()
298 && manual_waiting.is_empty()
299 {
300 let remaining_jobs: Vec<_> = remaining
301 .iter()
302 .filter_map(|(name, &count)| if count > 0 { Some(name.clone()) } else { None })
303 .collect();
304 if !remaining_jobs.is_empty() {
305 halt_kind = HaltKind::Deadlock;
306 halt_error = Some(anyhow!(
307 "no runnable jobs, potential dependency cycle involving: {:?}",
308 remaining_jobs
309 ));
310 }
311 break;
312 }
313
314 if running.is_empty()
315 && ready.is_empty()
316 && delayed_pending.is_empty()
317 && !pipeline_failed
318 && !waiting_on_failure.is_empty()
319 && manual_waiting.is_empty()
320 {
321 break;
322 }
323
324 enum SchedulerEvent {
325 Job(JobEvent),
326 Delay(String),
327 Command(UiCommand),
328 }
329
330 let next_event = tokio::select! {
331 Some(event) = rx.recv() => Some(SchedulerEvent::Job(event)),
332 Some(name) = delay_rx.recv() => Some(SchedulerEvent::Delay(name)),
333 cmd = async {
334 if let Some(rx) = commands.as_mut() {
335 (*rx).recv().await
336 } else {
337 std::future::pending().await
338 }
339 } => {
340 match cmd {
341 Some(command) => Some(SchedulerEvent::Command(command)),
342 None => {
343 manual_input_available = false;
344 commands = None;
345 None
346 }
347 }
348 }
349 else => None,
350 };
351
352 if !manual_input_available && !manual_waiting.is_empty() {
353 let pending: Vec<String> = manual_waiting.drain().collect();
354 for name in pending {
355 if let Some(planned) = plan.nodes.get(&name) {
356 let reason = planned
357 .instance
358 .rule
359 .manual_reason
360 .clone()
361 .unwrap_or_else(|| "manual job not run".to_string());
362 if let Some(ui_ref) = ui.as_deref() {
363 ui_ref.job_finished(
364 &planned.instance.job.name,
365 UiJobStatus::Skipped,
366 0.0,
367 Some(reason.clone()),
368 );
369 }
370 summaries.push(JobSummary {
371 name: planned.instance.job.name.clone(),
372 stage_name: planned.instance.stage_name.clone(),
373 duration: 0.0,
374 status: JobStatus::Skipped(reason),
375 log_path: None,
376 log_hash: planned.log_hash.clone(),
377 allow_failure: planned.instance.rule.allow_failure,
378 environment: exec.expanded_environment(&planned.instance.job),
379 });
380 completed += 1;
381 release_dependents(
382 &plan,
383 &name,
384 &mut remaining,
385 abort_requested,
386 pipeline_failed,
387 &mut ReadyQueues {
388 ready: &mut ready,
389 waiting_on_failure: &mut waiting_on_failure,
390 delayed_pending: &mut delayed_pending,
391 },
392 &enqueue_ready,
393 );
394 }
395 }
396 }
397
398 let Some(event) = next_event else {
399 if running.is_empty()
400 && ready.is_empty()
401 && delayed_pending.is_empty()
402 && resource_retry_pending.is_empty()
403 {
404 halt_kind = HaltKind::ChannelClosed;
405 halt_error = Some(anyhow!(
406 "job worker channel closed unexpectedly while {} jobs remained",
407 total - completed
408 ));
409 break;
410 }
411 continue;
412 };
413
414 match event {
415 SchedulerEvent::Delay(name) => {
416 if abort_requested {
417 continue;
418 }
419 if resource_retry_pending.remove(&name) {
420 ready.push_back(name);
421 continue;
422 }
423 delayed_pending.remove(&name);
424 if pipeline_failed
425 && let Some(planned) = plan.nodes.get(&name)
426 && planned.instance.rule.when.requires_success()
427 {
428 continue;
429 }
430 ready.push_back(name);
431 }
432 SchedulerEvent::Command(cmd) => match cmd {
433 UiCommand::StartManual { name } => {
434 if manual_waiting.remove(&name) {
435 ready.push_back(name);
436 }
437 }
438 UiCommand::CancelJob { name } => {
439 exec.cancel_running_job(&name);
440 }
441 UiCommand::AnalyzeJob { name, source_name } => {
442 spawn_analysis(exec.clone(), plan.clone(), ui.clone(), name, source_name);
443 }
444 UiCommand::PreviewAiPrompt { name, source_name } => {
445 spawn_prompt_preview(exec.clone(), plan.clone(), ui.clone(), name, source_name);
446 }
447 UiCommand::AbortPipeline => {
448 abort_requested = true;
449 pipeline_failed = true;
450 halt_kind = HaltKind::Aborted;
451 if halt_error.is_none() {
452 halt_error = Some(anyhow!("pipeline aborted by user"));
453 }
454 for name in interruptible_running_jobs(plan.as_ref(), &running) {
455 exec.cancel_running_job(&name);
456 }
457 ready.clear();
458 waiting_on_failure.clear();
459 delayed_pending.clear();
460 manual_waiting.clear();
461 }
462 UiCommand::RestartJob { .. } => {}
463 },
464 SchedulerEvent::Job(event) => {
465 running.remove(&event.name);
466 let Some(planned) = plan.nodes.get(&event.name) else {
467 let message = format!(
468 "completed job '{}' was not found in execution plan",
469 event.name
470 );
471 if !pipeline_failed {
472 pipeline_failed = true;
473 halt_kind = HaltKind::JobFailure;
474 if halt_error.is_none() {
475 halt_error = Some(anyhow!(message.clone()));
476 }
477 }
478 summaries.push(JobSummary {
479 name: event.name.clone(),
480 stage_name: event.stage_name.clone(),
481 duration: event.duration,
482 status: JobStatus::Failed(message),
483 log_path: event.log_path.clone(),
484 log_hash: event.log_hash.clone(),
485 allow_failure: false,
486 environment: None,
487 });
488 completed += 1;
489 continue;
490 };
491 match event.result {
492 Ok(_) => {
493 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Success);
494 release_resource_lock(
495 planned,
496 &mut ready,
497 &resource_groups,
498 &mut resource_waiting,
499 );
500 release_dependents(
501 &plan,
502 &event.name,
503 &mut remaining,
504 abort_requested,
505 pipeline_failed,
506 &mut ReadyQueues {
507 ready: &mut ready,
508 waiting_on_failure: &mut waiting_on_failure,
509 delayed_pending: &mut delayed_pending,
510 },
511 &enqueue_ready,
512 );
513 summaries.push(JobSummary {
514 name: event.name.clone(),
515 stage_name: event.stage_name.clone(),
516 duration: event.duration,
517 status: JobStatus::Success,
518 log_path: event.log_path.clone(),
519 log_hash: event.log_hash.clone(),
520 allow_failure: planned.instance.rule.allow_failure,
521 environment: exec.expanded_environment(&planned.instance.job),
522 });
523 completed += 1;
524 }
525 Err(err) => {
526 if event.cancelled {
527 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Skipped);
528 release_resource_lock(
529 planned,
530 &mut ready,
531 &resource_groups,
532 &mut resource_waiting,
533 );
534 summaries.push(JobSummary {
535 name: event.name.clone(),
536 stage_name: event.stage_name.clone(),
537 duration: event.duration,
538 status: JobStatus::Skipped("aborted by user".to_string()),
539 log_path: event.log_path.clone(),
540 log_hash: event.log_hash.clone(),
541 allow_failure: true,
542 environment: exec.expanded_environment(&planned.instance.job),
543 });
544 completed += 1;
545 continue;
546 }
547 let err_msg = err.to_string();
548 let attempts_so_far = attempts.get(&event.name).copied().unwrap_or(1);
549 let retries_used = attempts_so_far.saturating_sub(1);
550 if retries_used < planned.instance.retry.max
551 && retry_allowed(
552 &planned.instance.retry.when,
553 &planned.instance.retry.exit_codes,
554 event.failure_kind,
555 event.exit_code,
556 )
557 {
558 release_resource_lock(
559 planned,
560 &mut ready,
561 &resource_groups,
562 &mut resource_waiting,
563 );
564 ready.push_back(event.name.clone());
565 continue;
566 }
567 exec.record_completed_job(&event.name, ArtifactSourceOutcome::Failed);
568 release_resource_lock(
569 planned,
570 &mut ready,
571 &resource_groups,
572 &mut resource_waiting,
573 );
574 if !planned.instance.rule.allow_failure && !pipeline_failed {
575 pipeline_failed = true;
576 halt_kind = HaltKind::JobFailure;
577 if halt_error.is_none() {
578 halt_error =
579 Some(anyhow!("job '{}' failed: {}", event.name, err_msg));
580 }
581 while let Some(name) = waiting_on_failure.pop_front() {
582 ready.push_back(name);
583 }
584 }
585 release_dependents(
586 &plan,
587 &event.name,
588 &mut remaining,
589 abort_requested,
590 pipeline_failed,
591 &mut ReadyQueues {
592 ready: &mut ready,
593 waiting_on_failure: &mut waiting_on_failure,
594 delayed_pending: &mut delayed_pending,
595 },
596 &enqueue_ready,
597 );
598 summaries.push(JobSummary {
599 name: event.name.clone(),
600 stage_name: event.stage_name.clone(),
601 duration: event.duration,
602 status: JobStatus::Failed(err_msg),
603 log_path: event.log_path.clone(),
604 log_hash: event.log_hash.clone(),
605 allow_failure: planned.instance.rule.allow_failure,
606 environment: exec.expanded_environment(&planned.instance.job),
607 });
608 completed += 1;
609 }
610 }
611 }
612 }
613 }
614
615 let skip_reason = match halt_kind {
616 HaltKind::JobFailure => Some("not run (pipeline stopped after failure)".to_string()),
617 HaltKind::Deadlock => Some("not run (dependency cycle detected)".to_string()),
618 HaltKind::ChannelClosed => {
619 Some("not run (executor channel closed unexpectedly)".to_string())
620 }
621 HaltKind::Aborted => Some("not run (pipeline aborted by user)".to_string()),
622 HaltKind::None => None,
623 };
624
625 let mut recorded: HashSet<String> = summaries.iter().map(|entry| entry.name.clone()).collect();
626 for job_name in &plan.ordered {
627 if recorded.contains(job_name) {
628 continue;
629 }
630 let Some(planned) = plan.nodes.get(job_name) else {
631 continue;
632 };
633 let reason = if let Some(reason) = skip_reason.clone() {
634 Some(reason)
635 } else if planned.instance.rule.when == RuleWhen::OnFailure {
636 Some("skipped (rules: on_failure and pipeline succeeded)".to_string())
637 } else {
638 None
639 };
640
641 if let Some(reason) = reason {
642 if let Some(ui_ref) = ui.as_deref() {
643 ui_ref.job_finished(job_name, UiJobStatus::Skipped, 0.0, Some(reason.clone()));
644 }
645 summaries.push(JobSummary {
646 name: job_name.clone(),
647 stage_name: planned.instance.stage_name.clone(),
648 duration: 0.0,
649 status: JobStatus::Skipped(reason.clone()),
650 log_path: Some(planned.log_path.clone()),
651 log_hash: planned.log_hash.clone(),
652 allow_failure: planned.instance.rule.allow_failure,
653 environment: exec.expanded_environment(&planned.instance.job),
654 });
655 recorded.insert(job_name.clone());
656 }
657 }
658
659 let result = halt_error.map_or(Ok(()), Err);
660 (summaries, result)
661}
662
663fn retry_allowed(
664 conditions: &[String],
665 exit_codes: &[i32],
666 failure_kind: Option<JobFailureKind>,
667 exit_code: Option<i32>,
668) -> bool {
669 if conditions.is_empty() && exit_codes.is_empty() {
670 return true;
671 }
672 let when_matches = failure_kind.is_some_and(|kind| {
673 conditions
674 .iter()
675 .any(|condition| retry_condition_matches(condition, kind))
676 });
677 let exit_code_matches = exit_code.is_some_and(|code| exit_codes.contains(&code));
678 when_matches || exit_code_matches
679}
680
681fn retry_condition_matches(condition: &str, failure_kind: JobFailureKind) -> bool {
682 match condition {
683 "always" => true,
684 "unknown_failure" => failure_kind == JobFailureKind::UnknownFailure,
685 "script_failure" => failure_kind == JobFailureKind::ScriptFailure,
686 "api_failure" => failure_kind == JobFailureKind::ApiFailure,
687 "job_execution_timeout" => failure_kind == JobFailureKind::JobExecutionTimeout,
688 "runner_system_failure" => failure_kind == JobFailureKind::RunnerSystemFailure,
689 "runner_unsupported" => failure_kind == JobFailureKind::RunnerUnsupported,
690 "stale_schedule" => failure_kind == JobFailureKind::StaleSchedule,
691 "archived_failure" => failure_kind == JobFailureKind::ArchivedFailure,
692 "unmet_prerequisites" => failure_kind == JobFailureKind::UnmetPrerequisites,
693 "scheduler_failure" => failure_kind == JobFailureKind::SchedulerFailure,
694 "data_integrity_failure" => failure_kind == JobFailureKind::DataIntegrityFailure,
695 "stuck_or_timeout_failure" => {
696 matches!(
697 failure_kind,
698 JobFailureKind::StuckOrTimeoutFailure | JobFailureKind::JobExecutionTimeout
699 )
700 }
701 _ => false,
702 }
703}
704
705pub(crate) async fn handle_restart_commands(
706 exec: &ExecutorCore,
707 plan: Arc<ExecutionPlan>,
708 ui: Option<Arc<UiBridge>>,
709 commands: &mut mpsc::UnboundedReceiver<UiCommand>,
710 summaries: &mut Vec<JobSummary>,
711) -> Result<()> {
712 while let Some(command) = commands.recv().await {
713 match command {
714 UiCommand::RestartJob { name } => {
715 let Some(planned) = plan.nodes.get(&name).cloned() else {
716 continue;
717 };
718
719 if let Some(ui_ref) = ui.as_deref() {
720 ui_ref.job_restarted(&name);
721 }
722
723 let run_info = match exec.log_job_start(&planned, ui.as_deref()) {
724 Ok(info) => info,
725 Err(err) => {
726 summaries.push(JobSummary {
727 name: planned.instance.job.name.clone(),
728 stage_name: planned.instance.stage_name.clone(),
729 duration: 0.0,
730 status: JobStatus::Failed(err.to_string()),
731 log_path: Some(planned.log_path.clone()),
732 log_hash: planned.log_hash.clone(),
733 allow_failure: false,
734 environment: exec.expanded_environment(&planned.instance.job),
735 });
736 return Err(err);
737 }
738 };
739 let restart_exec = exec.clone();
740 let ui_clone = ui.clone();
741 let run_info_clone = run_info.clone();
742 let job_plan = plan.clone();
743 let event = task::spawn_blocking(move || {
744 job_runner::run_planned_job(
745 &restart_exec,
746 job_plan,
747 planned,
748 run_info_clone,
749 ui_clone,
750 )
751 })
752 .await
753 .context("job restart task failed")?;
754 update_summaries_from_event(exec, plan.as_ref(), event, summaries);
755 }
756 UiCommand::AnalyzeJob { name, source_name } => {
757 spawn_analysis(
758 Arc::new(exec.clone()),
759 plan.clone(),
760 ui.clone(),
761 name,
762 source_name,
763 );
764 }
765 UiCommand::PreviewAiPrompt { name, source_name } => {
766 spawn_prompt_preview(
767 Arc::new(exec.clone()),
768 plan.clone(),
769 ui.clone(),
770 name,
771 source_name,
772 );
773 }
774 UiCommand::StartManual { .. } => {}
775 UiCommand::CancelJob { .. } => {}
776 UiCommand::AbortPipeline => break,
777 }
778 }
779 Ok(())
780}
781
782fn spawn_analysis(
783 exec: Arc<ExecutorCore>,
784 plan: Arc<ExecutionPlan>,
785 ui: Option<Arc<UiBridge>>,
786 name: String,
787 source_name: String,
788) {
789 tokio::task::spawn_blocking(move || {
790 exec.analyze_job_with_default_provider(&plan, &name, &source_name, ui.as_deref());
791 });
792}
793
794fn spawn_prompt_preview(
795 exec: Arc<ExecutorCore>,
796 plan: Arc<ExecutionPlan>,
797 ui: Option<Arc<UiBridge>>,
798 name: String,
799 source_name: String,
800) {
801 tokio::task::spawn_blocking(move || {
802 if let Some(ui) = ui.as_deref()
803 && let Ok(prompt) = exec.render_ai_prompt(&plan, &name, &source_name)
804 {
805 ui.ai_prompt_ready(&name, prompt);
806 }
807 });
808}
809
810fn update_summaries_from_event(
811 exec: &ExecutorCore,
812 plan: &ExecutionPlan,
813 event: JobEvent,
814 summaries: &mut Vec<JobSummary>,
815) {
816 let JobEvent {
817 name,
818 stage_name,
819 duration,
820 log_path,
821 log_hash,
822 result,
823 failure_kind: _,
824 exit_code: _,
825 cancelled,
826 } = event;
827
828 let allow_failure = plan
829 .nodes
830 .get(&name)
831 .map(|planned| planned.instance.rule.allow_failure)
832 .unwrap_or(false);
833 let environment = plan
834 .nodes
835 .get(&name)
836 .and_then(|planned| exec.expanded_environment(&planned.instance.job));
837
838 let status = match result {
839 Ok(_) => JobStatus::Success,
840 Err(err) => {
841 if cancelled {
842 JobStatus::Skipped("aborted by user".to_string())
843 } else {
844 JobStatus::Failed(err.to_string())
845 }
846 }
847 };
848 let outcome = match &status {
849 JobStatus::Success => ArtifactSourceOutcome::Success,
850 JobStatus::Failed(_) => ArtifactSourceOutcome::Failed,
851 JobStatus::Skipped(_) => ArtifactSourceOutcome::Skipped,
852 };
853 exec.record_completed_job(&name, outcome);
854
855 summaries.retain(|entry| entry.name != name);
856 summaries.push(JobSummary {
857 name,
858 stage_name,
859 duration,
860 status,
861 log_path,
862 log_hash,
863 allow_failure,
864 environment,
865 });
866}
867
868struct ReadyQueues<'a> {
869 ready: &'a mut VecDeque<String>,
870 waiting_on_failure: &'a mut VecDeque<String>,
871 delayed_pending: &'a mut HashSet<String>,
872}
873
874fn release_dependents<F>(
875 plan: &ExecutionPlan,
876 name: &str,
877 remaining: &mut HashMap<String, usize>,
878 abort_requested: bool,
879 pipeline_failed: bool,
880 queues: &mut ReadyQueues<'_>,
881 enqueue_ready: &F,
882) where
883 F: Fn(&str, bool, &mut VecDeque<String>, &mut VecDeque<String>, &mut HashSet<String>),
884{
885 if let Some(children) = plan.dependents.get(name) {
886 for child in children {
887 if let Some(count) = remaining.get_mut(child)
888 && *count > 0
889 {
890 *count -= 1;
891 if *count == 0 && !abort_requested {
892 enqueue_ready(
893 child,
894 pipeline_failed,
895 queues.ready,
896 queues.waiting_on_failure,
897 queues.delayed_pending,
898 );
899 }
900 }
901 }
902 }
903}
904
905fn release_resource_lock(
906 planned: &ExecutableJob,
907 ready: &mut VecDeque<String>,
908 resource_groups: &ResourceGroupManager,
909 resource_waiting: &mut HashMap<String, VecDeque<String>>,
910) {
911 if let Some(group) = &planned.instance.resource_group {
912 let _ = resource_groups.release(group);
913 if let Some(queue) = resource_waiting.get_mut(group)
914 && let Some(next) = queue.pop_front()
915 {
916 ready.push_back(next);
917 }
918 }
919}
920
921#[cfg(test)]
922mod tests {
923 use super::{interruptible_running_jobs, release_resource_lock, retry_allowed};
924 use crate::compiler::JobInstance;
925 use crate::execution_plan::{ExecutableJob, ExecutionPlan};
926 use crate::model::{ArtifactSpec, JobSpec, RetryPolicySpec};
927 use crate::pipeline::{JobFailureKind, ResourceGroupManager, RuleEvaluation, RuleWhen};
928 use std::collections::{HashMap, HashSet, VecDeque};
929 use std::path::PathBuf;
930 use tempfile::tempdir;
931
932 #[test]
933 fn release_resource_lock_requeues_next_waiting_job() {
934 let planned = ExecutableJob {
935 instance: JobInstance {
936 job: job("build"),
937 stage_name: "build".into(),
938 dependencies: Vec::new(),
939 rule: RuleEvaluation {
940 included: true,
941 when: RuleWhen::OnSuccess,
942 ..Default::default()
943 },
944 timeout: None,
945 retry: RetryPolicySpec::default(),
946 interruptible: false,
947 resource_group: Some("builder".into()),
948 },
949 log_path: PathBuf::from("/tmp/build.log"),
950 log_hash: "hash".into(),
951 };
952 let mut ready = VecDeque::new();
953 let temp = tempdir().expect("tempdir");
954 let manager = ResourceGroupManager::new(temp.path().join("locks"));
955 manager
956 .try_acquire("builder", "build")
957 .expect("lock acquires");
958 let mut resource_waiting = HashMap::from([(
959 "builder".to_string(),
960 VecDeque::from(["package".to_string()]),
961 )]);
962
963 release_resource_lock(&planned, &mut ready, &manager, &mut resource_waiting);
964
965 assert_eq!(ready, VecDeque::from(["package".to_string()]));
966 assert!(
967 manager
968 .try_acquire("builder", "package")
969 .expect("lock re-acquires")
970 );
971 assert!(resource_waiting["builder"].is_empty());
972 }
973
974 #[test]
975 fn retry_allowed_defaults_to_true_without_conditions() {
976 assert!(retry_allowed(
977 &[],
978 &[],
979 Some(JobFailureKind::ScriptFailure),
980 Some(1)
981 ));
982 }
983
984 #[test]
985 fn retry_allowed_matches_script_failure_condition() {
986 assert!(retry_allowed(
987 &["script_failure".into()],
988 &[],
989 Some(JobFailureKind::ScriptFailure),
990 Some(1)
991 ));
992 assert!(!retry_allowed(
993 &["runner_system_failure".into()],
994 &[],
995 Some(JobFailureKind::ScriptFailure),
996 Some(1)
997 ));
998 }
999
1000 #[test]
1001 fn retry_allowed_treats_job_timeout_as_stuck_or_timeout_failure() {
1002 assert!(retry_allowed(
1003 &["stuck_or_timeout_failure".into()],
1004 &[],
1005 Some(JobFailureKind::JobExecutionTimeout),
1006 None
1007 ));
1008 }
1009
1010 #[test]
1011 fn retry_allowed_matches_api_failure_condition() {
1012 assert!(retry_allowed(
1013 &["api_failure".into()],
1014 &[],
1015 Some(JobFailureKind::ApiFailure),
1016 None
1017 ));
1018 assert!(!retry_allowed(
1019 &["api_failure".into()],
1020 &[],
1021 Some(JobFailureKind::UnknownFailure),
1022 None
1023 ));
1024 }
1025
1026 #[test]
1027 fn retry_allowed_matches_unmet_prerequisites_condition() {
1028 assert!(retry_allowed(
1029 &["unmet_prerequisites".into()],
1030 &[],
1031 Some(JobFailureKind::UnmetPrerequisites),
1032 None
1033 ));
1034 }
1035
1036 #[test]
1037 fn retry_allowed_matches_exit_code_condition() {
1038 assert!(retry_allowed(
1039 &[],
1040 &[137],
1041 Some(JobFailureKind::ScriptFailure),
1042 Some(137)
1043 ));
1044 assert!(!retry_allowed(
1045 &[],
1046 &[137],
1047 Some(JobFailureKind::ScriptFailure),
1048 Some(1)
1049 ));
1050 }
1051
1052 #[test]
1053 fn retry_allowed_matches_when_or_exit_code() {
1054 assert!(retry_allowed(
1055 &["runner_system_failure".into()],
1056 &[137],
1057 Some(JobFailureKind::ScriptFailure),
1058 Some(137)
1059 ));
1060 }
1061
1062 #[test]
1063 fn interruptible_running_jobs_selects_only_interruptible_nodes() {
1064 let plan = ExecutionPlan {
1065 ordered: vec!["build".into(), "deploy".into()],
1066 nodes: HashMap::from([
1067 ("build".into(), executable_job("build", true, "build", 0)),
1068 (
1069 "deploy".into(),
1070 executable_job("deploy", false, "deploy", 1),
1071 ),
1072 ]),
1073 dependents: HashMap::new(),
1074 order_index: HashMap::from([("build".into(), 0), ("deploy".into(), 1)]),
1075 variants: HashMap::new(),
1076 };
1077 let running = HashSet::from(["build".to_string(), "deploy".to_string()]);
1078
1079 assert_eq!(interruptible_running_jobs(&plan, &running), vec!["build"]);
1080 }
1081
1082 #[test]
1083 fn interruptible_running_jobs_respects_plan_order() {
1084 let plan = ExecutionPlan {
1085 ordered: vec!["test".into(), "build".into()],
1086 nodes: HashMap::from([
1087 ("build".into(), executable_job("build", true, "build", 1)),
1088 ("test".into(), executable_job("test", true, "test", 0)),
1089 ]),
1090 dependents: HashMap::new(),
1091 order_index: HashMap::from([("test".into(), 0), ("build".into(), 1)]),
1092 variants: HashMap::new(),
1093 };
1094 let running = HashSet::from(["build".to_string(), "test".to_string()]);
1095
1096 assert_eq!(
1097 interruptible_running_jobs(&plan, &running),
1098 vec!["test", "build"]
1099 );
1100 }
1101
1102 fn executable_job(name: &str, interruptible: bool, stage: &str, order: usize) -> ExecutableJob {
1103 let mut job = job(name);
1104 job.interruptible = interruptible;
1105 ExecutableJob {
1106 instance: JobInstance {
1107 job,
1108 stage_name: stage.into(),
1109 dependencies: Vec::new(),
1110 rule: RuleEvaluation::default(),
1111 timeout: None,
1112 retry: RetryPolicySpec::default(),
1113 interruptible,
1114 resource_group: None,
1115 },
1116 log_path: PathBuf::from(format!("/tmp/{name}-{order}.log")),
1117 log_hash: format!("hash-{name}-{order}"),
1118 }
1119 }
1120
1121 fn job(name: &str) -> JobSpec {
1122 JobSpec {
1123 name: name.into(),
1124 stage: "build".into(),
1125 commands: vec!["true".into()],
1126 needs: Vec::new(),
1127 explicit_needs: false,
1128 dependencies: Vec::new(),
1129 before_script: None,
1130 after_script: None,
1131 inherit_default_before_script: true,
1132 inherit_default_after_script: true,
1133 inherit_default_image: true,
1134 inherit_default_cache: true,
1135 inherit_default_services: true,
1136 inherit_default_timeout: true,
1137 inherit_default_retry: true,
1138 inherit_default_interruptible: true,
1139 when: None,
1140 rules: Vec::new(),
1141 only: Vec::new(),
1142 except: Vec::new(),
1143 artifacts: ArtifactSpec::default(),
1144 cache: Vec::new(),
1145 image: None,
1146 variables: HashMap::new(),
1147 services: Vec::new(),
1148 timeout: None,
1149 retry: RetryPolicySpec::default(),
1150 interruptible: false,
1151 resource_group: None,
1152 parallel: None,
1153 tags: Vec::new(),
1154 environment: None,
1155 }
1156 }
1157}