1use std::collections::{BTreeMap, BTreeSet, HashMap};
29use std::num::NonZeroUsize;
30
31use futures::StreamExt;
32use futures::stream::FuturesUnordered;
33use haz_domain::mutex::Mutex;
34use haz_domain::name::{ProjectName, TagName};
35use haz_domain::path::CanonicalPath;
36use haz_domain::task_id::TaskId;
37use haz_vfs::WritableFilesystem;
38use snafu::Snafu;
39
40use crate::hold_set::HoldSet;
41use crate::process::ProcessSpawner;
42use crate::run_graph::cascade::{
43 drain_ready_to_cancelled, emit_cascade_cancellations, emit_cascade_skips,
44};
45use crate::run_graph::cycle::{
46 check_and_record_runtime_cycle_for_completion, skip_ready_cycle_members,
47};
48use crate::run_graph::overlap::check_and_record_output_overlap;
49use crate::run_graph::state::{
50 InFlightCounts, ReadyState, StreamHashAccumulator, precompute_task_tags, resolve_global_cap,
51};
52use crate::run_graph::steps::{
53 InFlightCompletion, InFlightFuture, LookupStepOutcome, run_lookup_step, run_spawn_step,
54};
55use crate::run_task::{
56 CancelledRecord, CompletedRecord, RunContext, RunObserver, RunOutcome, RunState, RunTaskError,
57 SkipCause,
58};
59
60#[derive(Debug)]
82pub struct RunGraphOutcome {
83 pub outcomes: BTreeMap<TaskId, RunOutcome>,
86 pub task_errors: BTreeMap<TaskId, RunTaskError>,
92 pub invariant_violations: Vec<RuntimeInvariantViolation>,
97}
98
99#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum RuntimeInvariantViolation {
108 RuntimeCycle {
113 nodes: BTreeSet<TaskId>,
115 offending_edge: (TaskId, TaskId),
118 },
119 OutputOverlap {
122 first_task: TaskId,
125 second_task: TaskId,
128 shared_path: CanonicalPath,
130 },
131}
132
133#[derive(Debug, Snafu)]
146#[snafu(visibility(pub(crate)))]
147pub enum RunGraphError {}
148
149pub async fn run_graph<F, S, O>(
207 ctx: &RunContext<'_, F, S, O>,
208 created_at_unix: u64,
209) -> Result<RunGraphOutcome, RunGraphError>
210where
211 F: WritableFilesystem,
212 S: ProcessSpawner,
213 O: RunObserver,
214{
215 let internal_cancel = ctx.cancel.child_token();
223 let internal_ctx = RunContext {
224 fs: ctx.fs,
225 cache: ctx.cache,
226 spawner: ctx.spawner,
227 observer: ctx.observer,
228 workspace: ctx.workspace,
229 graph: ctx.graph,
230 host_env: ctx.host_env,
231 algo: ctx.algo,
232 cancel: &internal_cancel,
233 };
234 let mut sched = SchedulerState::new(&internal_ctx);
235
236 loop {
237 if !sched.cancelled && sched.ctx.cancel.is_cancelled() {
238 sched.cancelled = true;
239 }
240
241 if sched.cancelled {
242 sched.drain_cancelled();
243 } else {
244 sched.admit_ready();
245 }
246
247 if sched.in_flight.is_empty() {
248 break;
249 }
250
251 let Some(completion) = sched.next_completion().await else {
252 continue;
253 };
254
255 match completion {
256 InFlightCompletion::Lookup { task, result } => {
257 sched.handle_lookup(task, result, created_at_unix);
258 }
259 InFlightCompletion::Spawn { task, result } => {
260 sched.handle_spawn(task, result);
261 }
262 }
263 }
264
265 Ok(sched.into_outcome())
266}
267
268struct SchedulerState<'a, F, S, O>
281where
282 F: WritableFilesystem,
283 S: ProcessSpawner,
284 O: RunObserver,
285{
286 ctx: &'a RunContext<'a, F, S, O>,
287 global_cap: NonZeroUsize,
289 task_tags: BTreeMap<TaskId, BTreeSet<TagName>>,
292 ready_state: ReadyState,
295 counts: InFlightCounts,
298 accum: StreamHashAccumulator,
301 hold_set: HoldSet,
303 started: BTreeSet<TaskId>,
308 spawn_step_holds: BTreeMap<TaskId, (ProjectName, Option<Mutex>)>,
312 outcomes: BTreeMap<TaskId, RunOutcome>,
314 task_errors: BTreeMap<TaskId, RunTaskError>,
317 invariant_violations: Vec<RuntimeInvariantViolation>,
320 output_claims: HashMap<CanonicalPath, TaskId>,
325 augmented_edges: BTreeSet<(TaskId, TaskId)>,
328 in_flight: FuturesUnordered<InFlightFuture<'a>>,
331 cancelled: bool,
336}
337
338impl<'a, F, S, O> SchedulerState<'a, F, S, O>
339where
340 F: WritableFilesystem,
341 S: ProcessSpawner,
342 O: RunObserver,
343{
344 fn new(ctx: &'a RunContext<'a, F, S, O>) -> Self {
345 let global_cap = resolve_global_cap(&ctx.workspace.settings.concurrency);
346 let task_tags = precompute_task_tags(ctx.workspace, ctx.graph);
347 let ready_state = ReadyState::from_graph(ctx.graph);
348 let augmented_edges: BTreeSet<(TaskId, TaskId)> = ctx
349 .graph
350 .edges
351 .iter()
352 .map(|e| (e.from.clone(), e.to.clone()))
353 .collect();
354 Self {
355 ctx,
356 global_cap,
357 task_tags,
358 ready_state,
359 counts: InFlightCounts::default(),
360 accum: StreamHashAccumulator::default(),
361 hold_set: HoldSet::default(),
362 started: BTreeSet::new(),
363 spawn_step_holds: BTreeMap::new(),
364 outcomes: BTreeMap::new(),
365 task_errors: BTreeMap::new(),
366 invariant_violations: Vec::new(),
367 output_claims: HashMap::new(),
368 augmented_edges,
369 in_flight: FuturesUnordered::new(),
370 cancelled: false,
371 }
372 }
373
374 fn drain_cancelled(&mut self) {
379 drain_ready_to_cancelled(self.ctx.observer, &mut self.ready_state, &mut self.outcomes);
380 }
381
382 fn admit_ready(&mut self) {
390 let candidates: Vec<TaskId> = self.ready_state.ready.iter().cloned().collect();
391 for task in candidates {
392 if self.ready_state.skip.contains(&task) {
393 self.ready_state.ready.remove(&task);
397 continue;
398 }
399 let tags = self
400 .task_tags
401 .get(&task)
402 .expect("ready task must have a precomputed tag set");
403 if !self.counts.can_admit(
404 tags,
405 &self.ctx.workspace.settings.concurrency,
406 self.global_cap,
407 ) {
408 continue;
409 }
410 self.ready_state.ready.remove(&task);
411 self.counts.admit(tags);
412
413 if self.started.insert(task.clone()) {
418 self.ctx.observer.on_task_started(&task);
419 }
420
421 let preds_snapshot = self.accum.by_task.clone();
422 let task_for_future = task.clone();
423 self.in_flight.push(Box::pin(run_lookup_step(
424 self.ctx,
425 task_for_future,
426 preds_snapshot,
427 )));
428 }
429 }
430
431 async fn next_completion(&mut self) -> Option<InFlightCompletion> {
439 if self.cancelled {
440 return Some(
441 self.in_flight
442 .next()
443 .await
444 .expect("in_flight checked non-empty above"),
445 );
446 }
447 tokio::select! {
448 biased;
449 () = self.ctx.cancel.cancelled() => {
450 self.cancelled = true;
451 None
452 }
453 next = self.in_flight.next() => {
454 Some(next.expect("in_flight checked non-empty above"))
455 }
456 }
457 }
458
459 fn handle_lookup(
466 &mut self,
467 task: TaskId,
468 result: Result<LookupStepOutcome, RunTaskError>,
469 created_at_unix: u64,
470 ) {
471 let tags = self
472 .task_tags
473 .get(&task)
474 .expect("completed task must have a precomputed tag set")
475 .clone();
476
477 if self.cancelled {
478 self.counts.release(&tags);
486 let record = CancelledRecord::RunCancelled { task: task.clone() };
487 self.ctx.observer.on_task_cancelled(&task, &record);
488 let newly = self.ready_state.complete_failed(&task);
489 emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
490 self.outcomes.insert(task, RunOutcome::Cancelled(record));
491 return;
492 }
493
494 match result {
495 Err(err) => {
496 self.counts.release(&tags);
497 let newly = self.ready_state.complete_failed(&task);
498 let cause = SkipCause::UpstreamErrored {
499 upstream: task.clone(),
500 };
501 emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
502 self.task_errors.insert(task, err);
503 }
504 Ok(LookupStepOutcome::Hit(record)) => {
505 self.counts.release(&tags);
506 self.ctx.observer.on_task_finished(&task, &record);
507 self.accum.record(&task, &record);
508 self.record_completion_invariants(task, record);
509 }
510 Ok(LookupStepOutcome::Miss {
511 key,
512 mutex,
513 project_name,
514 }) => {
515 if self.hold_set.compatible(&project_name, mutex.as_ref()) {
517 self.hold_set.acquire(&project_name, mutex.as_ref());
519 self.spawn_step_holds
520 .insert(task.clone(), (project_name, mutex));
521 let task_for_future = task.clone();
522 self.in_flight.push(Box::pin(run_spawn_step(
523 self.ctx,
524 task_for_future,
525 key,
526 created_at_unix,
527 )));
528 } else {
529 self.counts.release(&tags);
532 self.ready_state.ready.insert(task);
533 }
534 }
535 }
536 }
537
538 fn handle_spawn(&mut self, task: TaskId, result: Result<CompletedRecord, RunTaskError>) {
545 let tags = self
546 .task_tags
547 .get(&task)
548 .expect("completed task must have a precomputed tag set")
549 .clone();
550 if let Some((project_name, mutex)) = self.spawn_step_holds.remove(&task) {
553 self.hold_set.release(&project_name, mutex.as_ref());
554 }
555 self.counts.release(&tags);
556 match result {
557 Ok(record) => match record.state {
558 RunState::Succeeded => {
559 self.ctx.observer.on_task_finished(&task, &record);
560 self.accum.record(&task, &record);
561 self.record_completion_invariants(task, record);
562 }
563 RunState::Failed => {
564 self.ctx.observer.on_task_finished(&task, &record);
565 self.accum.record(&task, &record);
566 let newly = self.ready_state.complete_failed(&task);
567 let cause = SkipCause::UpstreamFailed {
568 upstream: task.clone(),
569 };
570 emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
571 self.outcomes.insert(task, RunOutcome::Completed(record));
572 }
573 RunState::Cancelled => {
574 let cancelled_record = CancelledRecord::SignaledInFlight {
581 task: task.clone(),
582 exit_status: record
583 .exit_status
584 .expect("a cancelled fresh run always carries an exit status"),
585 stdout_hash: record.stdout_hash,
586 stderr_hash: record.stderr_hash,
587 };
588 self.ctx
589 .observer
590 .on_task_cancelled(&task, &cancelled_record);
591 let newly = self.ready_state.complete_failed(&task);
592 emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
593 self.outcomes
594 .insert(task, RunOutcome::Cancelled(cancelled_record));
595 }
596 },
597 Err(err) => {
598 let newly = self.ready_state.complete_failed(&task);
599 let cause = SkipCause::UpstreamErrored {
600 upstream: task.clone(),
601 };
602 emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
603 self.task_errors.insert(task, err);
604 }
605 }
606 }
607
608 fn record_completion_invariants(&mut self, task: TaskId, record: CompletedRecord) {
615 if check_and_record_output_overlap(
616 &mut self.output_claims,
617 &mut self.invariant_violations,
618 &task,
619 &record.materialised_outputs,
620 ) {
621 self.cancelled = true;
622 }
623 if let Some(cycle_nodes) = check_and_record_runtime_cycle_for_completion(
624 &mut self.augmented_edges,
625 &mut self.invariant_violations,
626 self.ctx.workspace,
627 &task,
628 &record.materialised_outputs,
629 ) {
630 self.cancelled = true;
631 self.ctx.cancel.cancel();
632 self.ready_state.complete_succeeded(&task);
633 self.outcomes.insert(task, RunOutcome::Completed(record));
634 skip_ready_cycle_members(
635 self.ctx.observer,
636 &mut self.ready_state,
637 &mut self.outcomes,
638 &cycle_nodes,
639 );
640 return;
641 }
642 self.ready_state.complete_succeeded(&task);
643 self.outcomes.insert(task, RunOutcome::Completed(record));
644 }
645
646 fn into_outcome(self) -> RunGraphOutcome {
647 RunGraphOutcome {
648 outcomes: self.outcomes,
649 task_errors: self.task_errors,
650 invariant_violations: self.invariant_violations,
651 }
652 }
653}
654
655#[cfg(test)]
656mod tests {
657 use std::collections::BTreeSet;
658
659 use haz_domain::settings::WorkspaceSettings;
660 use tokio_util::sync::CancellationToken;
661
662 use crate::mock_impl::{MockBehaviour, MockProcessSpawner, MockSpec};
663 use crate::process::Signal;
664 use crate::run_graph::scheduler::run_graph;
665 use crate::run_graph::test_fixtures::*;
666 use crate::run_task::{CancelledRecord, RunSource, RunState, SkipCause};
667
668 #[tokio::test]
669 async fn exec_001_empty_graph_terminates_with_empty_outcomes() {
670 let ws = make_workspace(vec![], WorkspaceSettings::default());
671 let g = make_graph(vec![], vec![]);
672 let fixture = Fixture::new(ws, g);
673 let spawner = MockProcessSpawner::new();
674 let observer = Recorder::default();
675 let ctx = make_ctx(&fixture, &spawner, &observer);
676
677 let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
678
679 assert!(result.outcomes.is_empty());
680 assert!(result.task_errors.is_empty());
681 assert!(observer.events().is_empty());
682 assert!(spawner.spawns().is_empty());
683 }
684
685 #[tokio::test]
686 async fn single_task_succeeds_writes_outcome() {
687 let task = make_task("build");
688 let p = make_project("p", BTreeSet::new(), vec![task]);
689 let ws = make_workspace(vec![p], WorkspaceSettings::default());
690 let g = make_graph(vec![tid("p", "build")], vec![]);
691 let fixture = Fixture::new(ws, g);
692 let spawner = MockProcessSpawner::new();
693 push_n_default_specs(&spawner, 1);
694 let observer = Recorder::default();
695 let ctx = make_ctx(&fixture, &spawner, &observer);
696
697 let result = run_graph(&ctx, 1).await.unwrap();
698
699 assert_eq!(result.outcomes.len(), 1);
700 let record = completed_for(&result.outcomes, &tid("p", "build"));
701 assert_eq!(record.state, RunState::Succeeded);
702 assert_eq!(record.source, RunSource::FreshRun);
703 assert!(result.task_errors.is_empty());
704 }
705
706 #[tokio::test]
707 async fn exec_002_linear_chain_runs_in_topological_order() {
708 let p = make_project(
709 "p",
710 BTreeSet::new(),
711 vec![make_task("a"), make_task("b"), make_task("c")],
712 );
713 let ws = make_workspace(vec![p], WorkspaceSettings::default());
714 let g = make_graph(
715 vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
716 vec![
717 h_edge(tid("p", "a"), tid("p", "b")),
718 h_edge(tid("p", "b"), tid("p", "c")),
719 ],
720 );
721 let fixture = Fixture::new(ws, g);
722 let spawner = MockProcessSpawner::new();
723 push_n_default_specs(&spawner, 3);
724 let observer = Recorder::default();
725 let ctx = make_ctx(&fixture, &spawner, &observer);
726
727 let result = run_graph(&ctx, 1).await.unwrap();
728 assert_eq!(result.outcomes.len(), 3);
729 assert_eq!(
730 observer.started_order(),
731 vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
732 );
733 }
734
735 #[tokio::test]
736 async fn diamond_dag_runs_branches_and_joins_correctly() {
737 let p = make_project(
738 "p",
739 BTreeSet::new(),
740 vec![
741 make_task("bot"),
742 make_task("l"),
743 make_task("r"),
744 make_task("top"),
745 ],
746 );
747 let ws = make_workspace(vec![p], WorkspaceSettings::default());
748 let g = make_graph(
749 vec![
750 tid("p", "bot"),
751 tid("p", "l"),
752 tid("p", "r"),
753 tid("p", "top"),
754 ],
755 vec![
756 h_edge(tid("p", "top"), tid("p", "l")),
757 h_edge(tid("p", "top"), tid("p", "r")),
758 h_edge(tid("p", "l"), tid("p", "bot")),
759 h_edge(tid("p", "r"), tid("p", "bot")),
760 ],
761 );
762 let fixture = Fixture::new(ws, g);
763 let spawner = MockProcessSpawner::new();
764 push_n_default_specs(&spawner, 4);
765 let observer = Recorder::default();
766 let ctx = make_ctx(&fixture, &spawner, &observer);
767
768 let result = run_graph(&ctx, 1).await.unwrap();
769 assert_eq!(result.outcomes.len(), 4);
770 let started = observer.started_order();
771 assert_eq!(started.first(), Some(&tid("p", "top")));
773 assert_eq!(started.last(), Some(&tid("p", "bot")));
776 }
777
778 #[tokio::test]
779 async fn exec_004_global_cap_one_serialises_independent_tasks() {
780 let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
781 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
782 let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
783 let fixture = Fixture::new(ws, g);
784 let spawner = MockProcessSpawner::new();
785 push_n_default_specs(&spawner, 2);
786 let observer = Recorder::default();
787 let ctx = make_ctx(&fixture, &spawner, &observer);
788
789 run_graph(&ctx, 1).await.unwrap();
790
791 let events = observer.events();
793 let started_b = events
794 .iter()
795 .position(|e| matches!(e, Event::Started(t) if *t == tid("p", "b")))
796 .expect("b started");
797 let finished_a = events
798 .iter()
799 .position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a")))
800 .expect("a finished");
801 assert!(
802 started_b > finished_a,
803 "b started ({started_b}) must follow a finished ({finished_a}): {events:?}",
804 );
805 }
806
807 #[tokio::test]
808 async fn exec_004_global_cap_two_admits_three_independent_in_bursts() {
809 let p = make_project(
810 "p",
811 BTreeSet::new(),
812 vec![make_task("a"), make_task("b"), make_task("c")],
813 );
814 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
815 let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
816 let fixture = Fixture::new(ws, g);
817 let spawner = MockProcessSpawner::new();
818 push_n_default_specs(&spawner, 3);
819 let observer = Recorder::default();
820 let ctx = make_ctx(&fixture, &spawner, &observer);
821
822 run_graph(&ctx, 1).await.unwrap();
823
824 let events = observer.events();
827 let started_c = events
828 .iter()
829 .position(|e| matches!(e, Event::Started(t) if *t == tid("p", "c")))
830 .expect("c started");
831 let any_finish_before_c = events[..started_c].iter().any(
832 |e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a") || *t == tid("p", "b")),
833 );
834 assert!(
835 any_finish_before_c,
836 "c starting at {started_c} must follow at least one finish: {events:?}",
837 );
838 }
839
840 #[tokio::test]
841 async fn exec_005_per_tag_cap_serialises_tagged_tasks_across_projects() {
842 let task_a = make_task("compute");
843 let task_b = make_task("compute");
844 let pa = make_project("pa", BTreeSet::from([tag("db")]), vec![task_a]);
845 let pb = make_project("pb", BTreeSet::from([tag("db")]), vec![task_b]);
846 let ws = make_workspace(
847 vec![pa, pb],
848 workspace_settings_with_tag_cap(fixed_cap(10), "db", 1),
849 );
850 let g = make_graph(vec![tid("pa", "compute"), tid("pb", "compute")], vec![]);
851 let fixture = Fixture::new(ws, g);
852 let spawner = MockProcessSpawner::new();
853 push_n_default_specs(&spawner, 2);
854 let observer = Recorder::default();
855 let ctx = make_ctx(&fixture, &spawner, &observer);
856
857 run_graph(&ctx, 1).await.unwrap();
858
859 let events = observer.events();
860 let started_pb = events
861 .iter()
862 .position(|e| matches!(e, Event::Started(t) if *t == tid("pb", "compute")))
863 .expect("pb:compute started");
864 let finished_pa = events
865 .iter()
866 .position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("pa", "compute")))
867 .expect("pa:compute finished");
868 assert!(
869 started_pb > finished_pa,
870 "pb:compute ({started_pb}) must follow pa:compute finish ({finished_pa}): {events:?}",
871 );
872 }
873
874 #[tokio::test]
875 async fn exec_003_canonical_order_under_partial_slot_availability() {
876 let p = make_project(
881 "p",
882 BTreeSet::new(),
883 vec![make_task("c"), make_task("a"), make_task("b")],
884 );
885 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
886 let g = make_graph(vec![tid("p", "c"), tid("p", "b"), tid("p", "a")], vec![]);
887 let fixture = Fixture::new(ws, g);
888 let spawner = MockProcessSpawner::new();
889 push_n_default_specs(&spawner, 3);
890 let observer = Recorder::default();
891 let ctx = make_ctx(&fixture, &spawner, &observer);
892
893 run_graph(&ctx, 1).await.unwrap();
894
895 assert_eq!(
896 observer.started_order(),
897 vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
898 );
899 }
900
901 #[tokio::test]
902 async fn exec_010_task_failure_does_not_halt_unrelated_subgraph() {
903 let p = make_project(
907 "p",
908 BTreeSet::new(),
909 vec![
910 make_task("a"),
911 make_task("a_child"),
912 make_task("b"),
913 make_task("b_child"),
914 ],
915 );
916 let ws = make_workspace(vec![p], WorkspaceSettings::default());
917 let g = make_graph(
918 vec![
919 tid("p", "a"),
920 tid("p", "a_child"),
921 tid("p", "b"),
922 tid("p", "b_child"),
923 ],
924 vec![
925 h_edge(tid("p", "a"), tid("p", "a_child")),
926 h_edge(tid("p", "b"), tid("p", "b_child")),
927 ],
928 );
929 let fixture = Fixture::new(ws, g);
930 let spawner = MockProcessSpawner::new();
931 push_spec_with_exit(&spawner, 1);
934 push_n_default_specs(&spawner, 2);
935 let observer = Recorder::default();
936 let ctx = make_ctx(&fixture, &spawner, &observer);
937
938 let result = run_graph(&ctx, 1).await.unwrap();
939
940 assert_eq!(
941 completed_for(&result.outcomes, &tid("p", "a")).state,
942 RunState::Failed,
943 "a should be Failed",
944 );
945 assert_eq!(
949 skipped_for(&result.outcomes, &tid("p", "a_child")).cause,
950 SkipCause::UpstreamFailed {
951 upstream: tid("p", "a"),
952 },
953 "a_child cascade-skipped with root cause `a`",
954 );
955 assert_eq!(
956 completed_for(&result.outcomes, &tid("p", "b")).state,
957 RunState::Succeeded,
958 "sibling b should succeed",
959 );
960 assert_eq!(
961 completed_for(&result.outcomes, &tid("p", "b_child")).state,
962 RunState::Succeeded,
963 "sibling b_child should succeed",
964 );
965 }
966
967 #[tokio::test]
968 async fn exec_011_task_failure_cascades_to_hard_descendants() {
969 let p = make_project(
974 "p",
975 BTreeSet::new(),
976 vec![make_task("root"), make_task("mid"), make_task("leaf")],
977 );
978 let ws = make_workspace(vec![p], WorkspaceSettings::default());
979 let g = make_graph(
980 vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
981 vec![
982 h_edge(tid("p", "root"), tid("p", "mid")),
983 h_edge(tid("p", "mid"), tid("p", "leaf")),
984 ],
985 );
986 let fixture = Fixture::new(ws, g);
987 let spawner = MockProcessSpawner::new();
988 push_spec_with_exit(&spawner, 2);
990 let observer = Recorder::default();
991 let ctx = make_ctx(&fixture, &spawner, &observer);
992
993 let result = run_graph(&ctx, 1).await.unwrap();
994
995 assert_eq!(result.outcomes.len(), 3);
996 assert_eq!(
997 completed_for(&result.outcomes, &tid("p", "root")).state,
998 RunState::Failed,
999 );
1000 let cause = SkipCause::UpstreamFailed {
1001 upstream: tid("p", "root"),
1002 };
1003 assert_eq!(
1004 skipped_for(&result.outcomes, &tid("p", "mid")).cause,
1005 cause,
1006 "mid records root cause = root",
1007 );
1008 assert_eq!(
1009 skipped_for(&result.outcomes, &tid("p", "leaf")).cause,
1010 cause,
1011 "leaf records root cause = root (NOT mid)",
1012 );
1013 assert_eq!(spawner.spawns().len(), 1);
1014 }
1015
1016 #[tokio::test]
1017 async fn exec_010_soft_edge_predecessor_failure_does_not_cascade() {
1018 let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
1023 let ws = make_workspace(vec![p], WorkspaceSettings::default());
1024 let g = make_graph(
1025 vec![tid("p", "a"), tid("p", "b")],
1026 vec![s_edge(tid("p", "a"), tid("p", "b"))],
1027 );
1028 let fixture = Fixture::new(ws, g);
1029 let spawner = MockProcessSpawner::new();
1030 push_spec_with_exit(&spawner, 1);
1032 push_n_default_specs(&spawner, 1);
1033 let observer = Recorder::default();
1034 let ctx = make_ctx(&fixture, &spawner, &observer);
1035
1036 let result = run_graph(&ctx, 1).await.unwrap();
1037 assert_eq!(
1038 completed_for(&result.outcomes, &tid("p", "a")).state,
1039 RunState::Failed,
1040 );
1041 assert_eq!(
1042 completed_for(&result.outcomes, &tid("p", "b")).state,
1043 RunState::Succeeded,
1044 "soft-edge successor must not be cascade-skipped",
1045 );
1046 }
1047
1048 #[tokio::test]
1049 async fn exec_011_observer_emits_no_started_or_finished_for_skipped_tasks() {
1050 let p = make_project(
1057 "p",
1058 BTreeSet::new(),
1059 vec![make_task("root"), make_task("mid"), make_task("leaf")],
1060 );
1061 let ws = make_workspace(vec![p], WorkspaceSettings::default());
1062 let g = make_graph(
1063 vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
1064 vec![
1065 h_edge(tid("p", "root"), tid("p", "mid")),
1066 h_edge(tid("p", "mid"), tid("p", "leaf")),
1067 ],
1068 );
1069 let fixture = Fixture::new(ws, g);
1070 let spawner = MockProcessSpawner::new();
1071 push_spec_with_exit(&spawner, 3);
1072 let observer = Recorder::default();
1073 let ctx = make_ctx(&fixture, &spawner, &observer);
1074
1075 let _ = run_graph(&ctx, 1).await.unwrap();
1076
1077 let events = observer.events();
1078 let cause = SkipCause::UpstreamFailed {
1079 upstream: tid("p", "root"),
1080 };
1081 assert_eq!(
1087 events,
1088 vec![
1089 Event::Started(tid("p", "root")),
1090 Event::Finished(tid("p", "root"), RunState::Failed, RunSource::FreshRun),
1091 Event::Skipped(tid("p", "leaf"), cause.clone()),
1092 Event::Skipped(tid("p", "mid"), cause),
1093 ],
1094 "expected exactly one Started + Finished for root \
1095 and one Skipped per descendant in canonical order",
1096 );
1097 }
1098
1099 #[tokio::test]
1100 async fn exec_011_diamond_cascade_records_each_descendant_once() {
1101 let p = make_project(
1112 "p",
1113 BTreeSet::new(),
1114 vec![
1115 make_task("bot"),
1116 make_task("left"),
1117 make_task("right"),
1118 make_task("top"),
1119 ],
1120 );
1121 let ws = make_workspace(vec![p], WorkspaceSettings::default());
1122 let g = make_graph(
1123 vec![
1124 tid("p", "bot"),
1125 tid("p", "left"),
1126 tid("p", "right"),
1127 tid("p", "top"),
1128 ],
1129 vec![
1130 h_edge(tid("p", "top"), tid("p", "left")),
1131 h_edge(tid("p", "top"), tid("p", "right")),
1132 h_edge(tid("p", "left"), tid("p", "bot")),
1133 h_edge(tid("p", "right"), tid("p", "bot")),
1134 ],
1135 );
1136 let fixture = Fixture::new(ws, g);
1137 let spawner = MockProcessSpawner::new();
1138 push_spec_with_exit(&spawner, 4);
1139 let observer = Recorder::default();
1140 let ctx = make_ctx(&fixture, &spawner, &observer);
1141
1142 let result = run_graph(&ctx, 1).await.unwrap();
1143
1144 assert_eq!(result.outcomes.len(), 4);
1146 assert_eq!(
1147 completed_for(&result.outcomes, &tid("p", "top")).state,
1148 RunState::Failed,
1149 );
1150 let cause = SkipCause::UpstreamFailed {
1151 upstream: tid("p", "top"),
1152 };
1153 for descendant in [tid("p", "left"), tid("p", "right"), tid("p", "bot")] {
1154 assert_eq!(
1155 skipped_for(&result.outcomes, &descendant).cause,
1156 cause,
1157 "{descendant:?} should be Skipped with root cause top",
1158 );
1159 }
1160
1161 let skipped_count_bot = observer
1165 .events()
1166 .iter()
1167 .filter(|e| matches!(e, Event::Skipped(t, _) if *t == tid("p", "bot")))
1168 .count();
1169 assert_eq!(
1170 skipped_count_bot, 1,
1171 "bot must fire on_task_skipped exactly once across both cascade paths",
1172 );
1173 }
1174
1175 fn exit_on_kill_only_spec(kill_exit_code: i32) -> MockSpec {
1182 MockSpec {
1183 behaviour: MockBehaviour::OnKillOnly,
1184 exit_code: kill_exit_code,
1185 ..MockSpec::default()
1186 }
1187 }
1188
1189 #[tokio::test]
1190 async fn exec_013_cancel_before_admission_marks_all_ready_as_run_cancelled() {
1191 let p = make_project(
1196 "p",
1197 BTreeSet::new(),
1198 vec![make_task("a"), make_task("b"), make_task("c")],
1199 );
1200 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1201 let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
1202 let fixture = Fixture::new(ws, g);
1203 let spawner = MockProcessSpawner::new();
1204 let observer = Recorder::default();
1205 let ctx = make_ctx(&fixture, &spawner, &observer);
1206
1207 fixture.cancel.cancel();
1208 let result = run_graph(&ctx, 1).await.unwrap();
1209
1210 assert_eq!(result.outcomes.len(), 3);
1211 for name in ["a", "b", "c"] {
1212 match cancelled_for(&result.outcomes, &tid("p", name)) {
1213 CancelledRecord::RunCancelled { task } => {
1214 assert_eq!(task, &tid("p", name));
1215 }
1216 other => panic!("expected RunCancelled for {name}, got {other:?}"),
1217 }
1218 }
1219 assert!(
1220 spawner.spawns().is_empty(),
1221 "no task should have been spawned: {:?}",
1222 spawner.spawns(),
1223 );
1224 let events = observer.events();
1227 assert!(
1228 events
1229 .iter()
1230 .all(|e| matches!(e, Event::Cancelled(_, CancelledRecord::RunCancelled { .. }))),
1231 "expected only Cancelled events, got {events:?}",
1232 );
1233 assert_eq!(events.len(), 3);
1234 }
1235
1236 #[tokio::test]
1237 async fn exec_013_cancel_mid_flight_signals_in_flight_task() {
1238 let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1245 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1246 let g = make_graph(vec![tid("p", "solo")], vec![]);
1247 let fixture = Fixture::new(ws, g);
1248 let spawner = MockProcessSpawner::new();
1249 spawner.push_spec(exit_on_terminate_spec(0));
1250 let observer = Recorder::default();
1251 let ctx = make_ctx(&fixture, &spawner, &observer);
1252
1253 let trigger_cancel = fixture.cancel.clone();
1254 let trigger = async move {
1255 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1260 trigger_cancel.cancel();
1261 };
1262
1263 let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1264 let result = result.unwrap();
1265
1266 match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1267 CancelledRecord::SignaledInFlight { task, .. } => {
1268 assert_eq!(task, &tid("p", "solo"));
1269 }
1270 other => panic!("expected SignaledInFlight, got {other:?}"),
1271 }
1272 assert_eq!(spawner.spawns().len(), 1);
1273 assert_eq!(
1277 spawner.signals_for(0).unwrap(),
1278 vec![Signal::Terminate],
1279 "expected exactly one Terminate, got {:?}",
1280 spawner.signals_for(0),
1281 );
1282 }
1283
1284 #[tokio::test]
1285 async fn exec_014_cancel_mid_flight_escalates_to_kill_after_grace() {
1286 let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1292 let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.05));
1293 let g = make_graph(vec![tid("p", "solo")], vec![]);
1294 let fixture = Fixture::new(ws, g);
1295 let spawner = MockProcessSpawner::new();
1296 spawner.push_spec(exit_on_kill_only_spec(137));
1297 let observer = Recorder::default();
1298 let ctx = make_ctx(&fixture, &spawner, &observer);
1299
1300 let trigger_cancel = fixture.cancel.clone();
1301 let trigger = async move {
1302 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1303 trigger_cancel.cancel();
1304 };
1305
1306 let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1307 let result = result.unwrap();
1308
1309 match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1310 CancelledRecord::SignaledInFlight { .. } => {}
1311 other => panic!("expected SignaledInFlight, got {other:?}"),
1312 }
1313 assert_eq!(
1317 spawner.signals_for(0).unwrap(),
1318 vec![Signal::Terminate, Signal::Kill],
1319 );
1320 }
1321
1322 #[tokio::test]
1323 async fn exec_014_cancel_grace_zero_sends_kill_immediately() {
1324 let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1329 let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.0));
1330 let g = make_graph(vec![tid("p", "solo")], vec![]);
1331 let fixture = Fixture::new(ws, g);
1332 let spawner = MockProcessSpawner::new();
1333 spawner.push_spec(exit_on_kill_only_spec(137));
1334 let observer = Recorder::default();
1335 let ctx = make_ctx(&fixture, &spawner, &observer);
1336
1337 let trigger_cancel = fixture.cancel.clone();
1338 let trigger = async move {
1339 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1340 trigger_cancel.cancel();
1341 };
1342
1343 let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1344 let result = result.unwrap();
1345
1346 match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1347 CancelledRecord::SignaledInFlight { .. } => {}
1348 other => panic!("expected SignaledInFlight, got {other:?}"),
1349 }
1350 assert_eq!(
1351 spawner.signals_for(0).unwrap(),
1352 vec![Signal::Terminate, Signal::Kill],
1353 );
1354 }
1355
1356 #[tokio::test]
1357 async fn exec_011_cancelled_task_cascades_descendants_as_upstream_cancelled() {
1358 let p = make_project(
1365 "p",
1366 BTreeSet::new(),
1367 vec![make_task("root"), make_task("mid"), make_task("leaf")],
1368 );
1369 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1370 let g = make_graph(
1371 vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
1372 vec![
1373 h_edge(tid("p", "root"), tid("p", "mid")),
1374 h_edge(tid("p", "mid"), tid("p", "leaf")),
1375 ],
1376 );
1377 let fixture = Fixture::new(ws, g);
1378 let spawner = MockProcessSpawner::new();
1379 spawner.push_spec(exit_on_terminate_spec(0));
1380 let observer = Recorder::default();
1381 let ctx = make_ctx(&fixture, &spawner, &observer);
1382
1383 let trigger_cancel = fixture.cancel.clone();
1384 let trigger = async move {
1385 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1386 trigger_cancel.cancel();
1387 };
1388
1389 let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1390 let result = result.unwrap();
1391
1392 assert_eq!(result.outcomes.len(), 3);
1393 match cancelled_for(&result.outcomes, &tid("p", "root")) {
1394 CancelledRecord::SignaledInFlight { task, .. } => {
1395 assert_eq!(task, &tid("p", "root"));
1396 }
1397 other => panic!("expected SignaledInFlight for root, got {other:?}"),
1398 }
1399 for name in ["mid", "leaf"] {
1400 match cancelled_for(&result.outcomes, &tid("p", name)) {
1401 CancelledRecord::UpstreamCancelled { task, upstream } => {
1402 assert_eq!(task, &tid("p", name));
1403 assert_eq!(
1404 upstream,
1405 &tid("p", "root"),
1406 "cascade attributes the root cancelled task to {name}",
1407 );
1408 }
1409 other => {
1410 panic!("expected UpstreamCancelled for {name}, got {other:?}")
1411 }
1412 }
1413 }
1414 }
1415
1416 #[tokio::test]
1417 async fn exec_015_cancelled_run_does_not_produce_cache_entry() {
1418 let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1424 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1425 let g = make_graph(vec![tid("p", "solo")], vec![]);
1426 let fixture = Fixture::new(ws, g);
1427
1428 {
1430 let spawner1 = MockProcessSpawner::new();
1431 spawner1.push_spec(exit_on_terminate_spec(0));
1432 let observer1 = Recorder::default();
1433 let ctx1 = make_ctx(&fixture, &spawner1, &observer1);
1434 let trigger_cancel = fixture.cancel.clone();
1435 let trigger = async move {
1436 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1437 trigger_cancel.cancel();
1438 };
1439 let (run1, ()) = tokio::join!(run_graph(&ctx1, 1), trigger);
1440 let run1 = run1.unwrap();
1441 match cancelled_for(&run1.outcomes, &tid("p", "solo")) {
1442 CancelledRecord::SignaledInFlight { .. } => {}
1443 other => panic!("run 1 expected SignaledInFlight, got {other:?}"),
1444 }
1445 }
1446
1447 let fresh_cancel = CancellationToken::new();
1450 let spawner2 = MockProcessSpawner::new();
1451 push_n_default_specs(&spawner2, 1);
1452 let observer2 = Recorder::default();
1453 let ctx2 = make_ctx_with_cancel(&fixture, &spawner2, &observer2, &fresh_cancel);
1454 let run2 = run_graph(&ctx2, 2).await.unwrap();
1455
1456 let rec2 = completed_for(&run2.outcomes, &tid("p", "solo"));
1457 assert_eq!(
1458 rec2.source,
1459 RunSource::FreshRun,
1460 "run 2 must be a fresh run; a cache hit would mean run 1 stored an entry",
1461 );
1462 assert_eq!(rec2.state, RunState::Succeeded);
1463 assert_eq!(spawner2.spawns().len(), 1);
1467 }
1468
1469 #[tokio::test]
1470 async fn exec_010_cancel_one_subgraph_does_not_halt_another() {
1471 let task_fast = make_task_with("fast", &["echo", "fast"], None);
1483 let task_slow = make_task_with("slow", &["echo", "slow"], None);
1484 let p = make_project("p", BTreeSet::new(), vec![task_fast, task_slow]);
1485 let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
1486 let g = make_graph(vec![tid("p", "fast"), tid("p", "slow")], vec![]);
1487 let fixture = Fixture::new(ws, g);
1488 let spawner = MockProcessSpawner::new();
1489 spawner.push_spec(MockSpec::default());
1493 spawner.push_spec(exit_on_terminate_spec(0));
1494 let observer = Recorder::default();
1495 let ctx = make_ctx(&fixture, &spawner, &observer);
1496
1497 let trigger_cancel = fixture.cancel.clone();
1498 let trigger = async move {
1499 tokio::time::sleep(std::time::Duration::from_millis(30)).await;
1503 trigger_cancel.cancel();
1504 };
1505
1506 let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1507 let result = result.unwrap();
1508
1509 assert_eq!(result.outcomes.len(), 2);
1510 let fast_rec = completed_for(&result.outcomes, &tid("p", "fast"));
1511 assert_eq!(fast_rec.state, RunState::Succeeded);
1512 match cancelled_for(&result.outcomes, &tid("p", "slow")) {
1513 CancelledRecord::SignaledInFlight { .. } => {}
1514 other => panic!("expected SignaledInFlight for slow, got {other:?}"),
1515 }
1516 }
1517}