use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::num::NonZeroUsize;
use futures::StreamExt;
use futures::stream::FuturesUnordered;
use haz_domain::mutex::Mutex;
use haz_domain::name::{ProjectName, TagName};
use haz_domain::path::CanonicalPath;
use haz_domain::task_id::TaskId;
use haz_vfs::WritableFilesystem;
use snafu::Snafu;
use crate::hold_set::HoldSet;
use crate::process::ProcessSpawner;
use crate::run_graph::cascade::{
drain_ready_to_cancelled, emit_cascade_cancellations, emit_cascade_skips,
};
use crate::run_graph::cycle::{
check_and_record_runtime_cycle_for_completion, skip_ready_cycle_members,
};
use crate::run_graph::overlap::check_and_record_output_overlap;
use crate::run_graph::state::{
InFlightCounts, ReadyState, StreamHashAccumulator, precompute_task_tags, resolve_global_cap,
};
use crate::run_graph::steps::{
InFlightCompletion, InFlightFuture, LookupStepOutcome, run_lookup_step, run_spawn_step,
};
use crate::run_task::{
CancelledRecord, CompletedRecord, RunContext, RunObserver, RunOutcome, RunState, RunTaskError,
SkipCause,
};
#[derive(Debug)]
pub struct RunGraphOutcome {
pub outcomes: BTreeMap<TaskId, RunOutcome>,
pub task_errors: BTreeMap<TaskId, RunTaskError>,
pub invariant_violations: Vec<RuntimeInvariantViolation>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RuntimeInvariantViolation {
RuntimeCycle {
nodes: BTreeSet<TaskId>,
offending_edge: (TaskId, TaskId),
},
OutputOverlap {
first_task: TaskId,
second_task: TaskId,
shared_path: CanonicalPath,
},
}
#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum RunGraphError {}
pub async fn run_graph<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
created_at_unix: u64,
) -> Result<RunGraphOutcome, RunGraphError>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let internal_cancel = ctx.cancel.child_token();
let internal_ctx = RunContext {
fs: ctx.fs,
cache: ctx.cache,
spawner: ctx.spawner,
observer: ctx.observer,
workspace: ctx.workspace,
graph: ctx.graph,
host_env: ctx.host_env,
algo: ctx.algo,
cancel: &internal_cancel,
};
let mut sched = SchedulerState::new(&internal_ctx);
loop {
if !sched.cancelled && sched.ctx.cancel.is_cancelled() {
sched.cancelled = true;
}
if sched.cancelled {
sched.drain_cancelled();
} else {
sched.admit_ready();
}
if sched.in_flight.is_empty() {
break;
}
let Some(completion) = sched.next_completion().await else {
continue;
};
match completion {
InFlightCompletion::Lookup { task, result } => {
sched.handle_lookup(task, result, created_at_unix);
}
InFlightCompletion::Spawn { task, result } => {
sched.handle_spawn(task, result);
}
}
}
Ok(sched.into_outcome())
}
struct SchedulerState<'a, F, S, O>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
ctx: &'a RunContext<'a, F, S, O>,
global_cap: NonZeroUsize,
task_tags: BTreeMap<TaskId, BTreeSet<TagName>>,
ready_state: ReadyState,
counts: InFlightCounts,
accum: StreamHashAccumulator,
hold_set: HoldSet,
started: BTreeSet<TaskId>,
spawn_step_holds: BTreeMap<TaskId, (ProjectName, Option<Mutex>)>,
outcomes: BTreeMap<TaskId, RunOutcome>,
task_errors: BTreeMap<TaskId, RunTaskError>,
invariant_violations: Vec<RuntimeInvariantViolation>,
output_claims: HashMap<CanonicalPath, TaskId>,
augmented_edges: BTreeSet<(TaskId, TaskId)>,
in_flight: FuturesUnordered<InFlightFuture<'a>>,
cancelled: bool,
}
impl<'a, F, S, O> SchedulerState<'a, F, S, O>
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
fn new(ctx: &'a RunContext<'a, F, S, O>) -> Self {
let global_cap = resolve_global_cap(&ctx.workspace.settings.concurrency);
let task_tags = precompute_task_tags(ctx.workspace, ctx.graph);
let ready_state = ReadyState::from_graph(ctx.graph);
let augmented_edges: BTreeSet<(TaskId, TaskId)> = ctx
.graph
.edges
.iter()
.map(|e| (e.from.clone(), e.to.clone()))
.collect();
Self {
ctx,
global_cap,
task_tags,
ready_state,
counts: InFlightCounts::default(),
accum: StreamHashAccumulator::default(),
hold_set: HoldSet::default(),
started: BTreeSet::new(),
spawn_step_holds: BTreeMap::new(),
outcomes: BTreeMap::new(),
task_errors: BTreeMap::new(),
invariant_violations: Vec::new(),
output_claims: HashMap::new(),
augmented_edges,
in_flight: FuturesUnordered::new(),
cancelled: false,
}
}
fn drain_cancelled(&mut self) {
drain_ready_to_cancelled(self.ctx.observer, &mut self.ready_state, &mut self.outcomes);
}
fn admit_ready(&mut self) {
let candidates: Vec<TaskId> = self.ready_state.ready.iter().cloned().collect();
for task in candidates {
if self.ready_state.skip.contains(&task) {
self.ready_state.ready.remove(&task);
continue;
}
let tags = self
.task_tags
.get(&task)
.expect("ready task must have a precomputed tag set");
if !self.counts.can_admit(
tags,
&self.ctx.workspace.settings.concurrency,
self.global_cap,
) {
continue;
}
self.ready_state.ready.remove(&task);
self.counts.admit(tags);
if self.started.insert(task.clone()) {
self.ctx.observer.on_task_started(&task);
}
let preds_snapshot = self.accum.by_task.clone();
let task_for_future = task.clone();
self.in_flight.push(Box::pin(run_lookup_step(
self.ctx,
task_for_future,
preds_snapshot,
)));
}
}
async fn next_completion(&mut self) -> Option<InFlightCompletion> {
if self.cancelled {
return Some(
self.in_flight
.next()
.await
.expect("in_flight checked non-empty above"),
);
}
tokio::select! {
biased;
() = self.ctx.cancel.cancelled() => {
self.cancelled = true;
None
}
next = self.in_flight.next() => {
Some(next.expect("in_flight checked non-empty above"))
}
}
}
fn handle_lookup(
&mut self,
task: TaskId,
result: Result<LookupStepOutcome, RunTaskError>,
created_at_unix: u64,
) {
let tags = self
.task_tags
.get(&task)
.expect("completed task must have a precomputed tag set")
.clone();
if self.cancelled {
self.counts.release(&tags);
let record = CancelledRecord::RunCancelled { task: task.clone() };
self.ctx.observer.on_task_cancelled(&task, &record);
let newly = self.ready_state.complete_failed(&task);
emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
self.outcomes.insert(task, RunOutcome::Cancelled(record));
return;
}
match result {
Err(err) => {
self.counts.release(&tags);
let newly = self.ready_state.complete_failed(&task);
let cause = SkipCause::UpstreamErrored {
upstream: task.clone(),
};
emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
self.task_errors.insert(task, err);
}
Ok(LookupStepOutcome::Hit(record)) => {
self.counts.release(&tags);
self.ctx.observer.on_task_finished(&task, &record);
self.accum.record(&task, &record);
self.record_completion_invariants(task, record);
}
Ok(LookupStepOutcome::Miss {
key,
mutex,
project_name,
}) => {
if self.hold_set.compatible(&project_name, mutex.as_ref()) {
self.hold_set.acquire(&project_name, mutex.as_ref());
self.spawn_step_holds
.insert(task.clone(), (project_name, mutex));
let task_for_future = task.clone();
self.in_flight.push(Box::pin(run_spawn_step(
self.ctx,
task_for_future,
key,
created_at_unix,
)));
} else {
self.counts.release(&tags);
self.ready_state.ready.insert(task);
}
}
}
}
fn handle_spawn(&mut self, task: TaskId, result: Result<CompletedRecord, RunTaskError>) {
let tags = self
.task_tags
.get(&task)
.expect("completed task must have a precomputed tag set")
.clone();
if let Some((project_name, mutex)) = self.spawn_step_holds.remove(&task) {
self.hold_set.release(&project_name, mutex.as_ref());
}
self.counts.release(&tags);
match result {
Ok(record) => match record.state {
RunState::Succeeded => {
self.ctx.observer.on_task_finished(&task, &record);
self.accum.record(&task, &record);
self.record_completion_invariants(task, record);
}
RunState::Failed => {
self.ctx.observer.on_task_finished(&task, &record);
self.accum.record(&task, &record);
let newly = self.ready_state.complete_failed(&task);
let cause = SkipCause::UpstreamFailed {
upstream: task.clone(),
};
emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
self.outcomes.insert(task, RunOutcome::Completed(record));
}
RunState::Cancelled => {
let cancelled_record = CancelledRecord::SignaledInFlight {
task: task.clone(),
exit_status: record
.exit_status
.expect("a cancelled fresh run always carries an exit status"),
stdout_hash: record.stdout_hash,
stderr_hash: record.stderr_hash,
};
self.ctx
.observer
.on_task_cancelled(&task, &cancelled_record);
let newly = self.ready_state.complete_failed(&task);
emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
self.outcomes
.insert(task, RunOutcome::Cancelled(cancelled_record));
}
},
Err(err) => {
let newly = self.ready_state.complete_failed(&task);
let cause = SkipCause::UpstreamErrored {
upstream: task.clone(),
};
emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
self.task_errors.insert(task, err);
}
}
}
fn record_completion_invariants(&mut self, task: TaskId, record: CompletedRecord) {
if check_and_record_output_overlap(
&mut self.output_claims,
&mut self.invariant_violations,
&task,
&record.materialised_outputs,
) {
self.cancelled = true;
}
if let Some(cycle_nodes) = check_and_record_runtime_cycle_for_completion(
&mut self.augmented_edges,
&mut self.invariant_violations,
self.ctx.workspace,
&task,
&record.materialised_outputs,
) {
self.cancelled = true;
self.ctx.cancel.cancel();
self.ready_state.complete_succeeded(&task);
self.outcomes.insert(task, RunOutcome::Completed(record));
skip_ready_cycle_members(
self.ctx.observer,
&mut self.ready_state,
&mut self.outcomes,
&cycle_nodes,
);
return;
}
self.ready_state.complete_succeeded(&task);
self.outcomes.insert(task, RunOutcome::Completed(record));
}
fn into_outcome(self) -> RunGraphOutcome {
RunGraphOutcome {
outcomes: self.outcomes,
task_errors: self.task_errors,
invariant_violations: self.invariant_violations,
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use haz_domain::settings::WorkspaceSettings;
use tokio_util::sync::CancellationToken;
use crate::mock_impl::{MockBehaviour, MockProcessSpawner, MockSpec};
use crate::process::Signal;
use crate::run_graph::scheduler::run_graph;
use crate::run_graph::test_fixtures::*;
use crate::run_task::{CancelledRecord, RunSource, RunState, SkipCause};
#[tokio::test]
async fn exec_001_empty_graph_terminates_with_empty_outcomes() {
let ws = make_workspace(vec![], WorkspaceSettings::default());
let g = make_graph(vec![], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert!(result.outcomes.is_empty());
assert!(result.task_errors.is_empty());
assert!(observer.events().is_empty());
assert!(spawner.spawns().is_empty());
}
#[tokio::test]
async fn single_task_succeeds_writes_outcome() {
let task = make_task("build");
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(vec![tid("p", "build")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 1);
let record = completed_for(&result.outcomes, &tid("p", "build"));
assert_eq!(record.state, RunState::Succeeded);
assert_eq!(record.source, RunSource::FreshRun);
assert!(result.task_errors.is_empty());
}
#[tokio::test]
async fn exec_002_linear_chain_runs_in_topological_order() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("a"), make_task("b"), make_task("c")],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
vec![
h_edge(tid("p", "a"), tid("p", "b")),
h_edge(tid("p", "b"), tid("p", "c")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 3);
assert_eq!(
observer.started_order(),
vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
);
}
#[tokio::test]
async fn diamond_dag_runs_branches_and_joins_correctly() {
let p = make_project(
"p",
BTreeSet::new(),
vec![
make_task("bot"),
make_task("l"),
make_task("r"),
make_task("top"),
],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![
tid("p", "bot"),
tid("p", "l"),
tid("p", "r"),
tid("p", "top"),
],
vec![
h_edge(tid("p", "top"), tid("p", "l")),
h_edge(tid("p", "top"), tid("p", "r")),
h_edge(tid("p", "l"), tid("p", "bot")),
h_edge(tid("p", "r"), tid("p", "bot")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 4);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 4);
let started = observer.started_order();
assert_eq!(started.first(), Some(&tid("p", "top")));
assert_eq!(started.last(), Some(&tid("p", "bot")));
}
#[tokio::test]
async fn exec_004_global_cap_one_serialises_independent_tasks() {
let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
let events = observer.events();
let started_b = events
.iter()
.position(|e| matches!(e, Event::Started(t) if *t == tid("p", "b")))
.expect("b started");
let finished_a = events
.iter()
.position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a")))
.expect("a finished");
assert!(
started_b > finished_a,
"b started ({started_b}) must follow a finished ({finished_a}): {events:?}",
);
}
#[tokio::test]
async fn exec_004_global_cap_two_admits_three_independent_in_bursts() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("a"), make_task("b"), make_task("c")],
);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
let events = observer.events();
let started_c = events
.iter()
.position(|e| matches!(e, Event::Started(t) if *t == tid("p", "c")))
.expect("c started");
let any_finish_before_c = events[..started_c].iter().any(
|e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a") || *t == tid("p", "b")),
);
assert!(
any_finish_before_c,
"c starting at {started_c} must follow at least one finish: {events:?}",
);
}
#[tokio::test]
async fn exec_005_per_tag_cap_serialises_tagged_tasks_across_projects() {
let task_a = make_task("compute");
let task_b = make_task("compute");
let pa = make_project("pa", BTreeSet::from([tag("db")]), vec![task_a]);
let pb = make_project("pb", BTreeSet::from([tag("db")]), vec![task_b]);
let ws = make_workspace(
vec![pa, pb],
workspace_settings_with_tag_cap(fixed_cap(10), "db", 1),
);
let g = make_graph(vec![tid("pa", "compute"), tid("pb", "compute")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
let events = observer.events();
let started_pb = events
.iter()
.position(|e| matches!(e, Event::Started(t) if *t == tid("pb", "compute")))
.expect("pb:compute started");
let finished_pa = events
.iter()
.position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("pa", "compute")))
.expect("pa:compute finished");
assert!(
started_pb > finished_pa,
"pb:compute ({started_pb}) must follow pa:compute finish ({finished_pa}): {events:?}",
);
}
#[tokio::test]
async fn exec_003_canonical_order_under_partial_slot_availability() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("c"), make_task("a"), make_task("b")],
);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "c"), tid("p", "b"), tid("p", "a")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(
observer.started_order(),
vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
);
}
#[tokio::test]
async fn exec_010_task_failure_does_not_halt_unrelated_subgraph() {
let p = make_project(
"p",
BTreeSet::new(),
vec![
make_task("a"),
make_task("a_child"),
make_task("b"),
make_task("b_child"),
],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![
tid("p", "a"),
tid("p", "a_child"),
tid("p", "b"),
tid("p", "b_child"),
],
vec![
h_edge(tid("p", "a"), tid("p", "a_child")),
h_edge(tid("p", "b"), tid("p", "b_child")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 1);
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "a")).state,
RunState::Failed,
"a should be Failed",
);
assert_eq!(
skipped_for(&result.outcomes, &tid("p", "a_child")).cause,
SkipCause::UpstreamFailed {
upstream: tid("p", "a"),
},
"a_child cascade-skipped with root cause `a`",
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b")).state,
RunState::Succeeded,
"sibling b should succeed",
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b_child")).state,
RunState::Succeeded,
"sibling b_child should succeed",
);
}
#[tokio::test]
async fn exec_011_task_failure_cascades_to_hard_descendants() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("root"), make_task("mid"), make_task("leaf")],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
vec![
h_edge(tid("p", "root"), tid("p", "mid")),
h_edge(tid("p", "mid"), tid("p", "leaf")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 3);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "root")).state,
RunState::Failed,
);
let cause = SkipCause::UpstreamFailed {
upstream: tid("p", "root"),
};
assert_eq!(
skipped_for(&result.outcomes, &tid("p", "mid")).cause,
cause,
"mid records root cause = root",
);
assert_eq!(
skipped_for(&result.outcomes, &tid("p", "leaf")).cause,
cause,
"leaf records root cause = root (NOT mid)",
);
assert_eq!(spawner.spawns().len(), 1);
}
#[tokio::test]
async fn exec_010_soft_edge_predecessor_failure_does_not_cascade() {
let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![tid("p", "a"), tid("p", "b")],
vec![s_edge(tid("p", "a"), tid("p", "b"))],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 1);
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "a")).state,
RunState::Failed,
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b")).state,
RunState::Succeeded,
"soft-edge successor must not be cascade-skipped",
);
}
#[tokio::test]
async fn exec_011_observer_emits_no_started_or_finished_for_skipped_tasks() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("root"), make_task("mid"), make_task("leaf")],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
vec![
h_edge(tid("p", "root"), tid("p", "mid")),
h_edge(tid("p", "mid"), tid("p", "leaf")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let _ = run_graph(&ctx, 1).await.unwrap();
let events = observer.events();
let cause = SkipCause::UpstreamFailed {
upstream: tid("p", "root"),
};
assert_eq!(
events,
vec![
Event::Started(tid("p", "root")),
Event::Finished(tid("p", "root"), RunState::Failed, RunSource::FreshRun),
Event::Skipped(tid("p", "leaf"), cause.clone()),
Event::Skipped(tid("p", "mid"), cause),
],
"expected exactly one Started + Finished for root \
and one Skipped per descendant in canonical order",
);
}
#[tokio::test]
async fn exec_011_diamond_cascade_records_each_descendant_once() {
let p = make_project(
"p",
BTreeSet::new(),
vec![
make_task("bot"),
make_task("left"),
make_task("right"),
make_task("top"),
],
);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let g = make_graph(
vec![
tid("p", "bot"),
tid("p", "left"),
tid("p", "right"),
tid("p", "top"),
],
vec![
h_edge(tid("p", "top"), tid("p", "left")),
h_edge(tid("p", "top"), tid("p", "right")),
h_edge(tid("p", "left"), tid("p", "bot")),
h_edge(tid("p", "right"), tid("p", "bot")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 4);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 4);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "top")).state,
RunState::Failed,
);
let cause = SkipCause::UpstreamFailed {
upstream: tid("p", "top"),
};
for descendant in [tid("p", "left"), tid("p", "right"), tid("p", "bot")] {
assert_eq!(
skipped_for(&result.outcomes, &descendant).cause,
cause,
"{descendant:?} should be Skipped with root cause top",
);
}
let skipped_count_bot = observer
.events()
.iter()
.filter(|e| matches!(e, Event::Skipped(t, _) if *t == tid("p", "bot")))
.count();
assert_eq!(
skipped_count_bot, 1,
"bot must fire on_task_skipped exactly once across both cascade paths",
);
}
fn exit_on_kill_only_spec(kill_exit_code: i32) -> MockSpec {
MockSpec {
behaviour: MockBehaviour::OnKillOnly,
exit_code: kill_exit_code,
..MockSpec::default()
}
}
#[tokio::test]
async fn exec_013_cancel_before_admission_marks_all_ready_as_run_cancelled() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("a"), make_task("b"), make_task("c")],
);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
fixture.cancel.cancel();
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 3);
for name in ["a", "b", "c"] {
match cancelled_for(&result.outcomes, &tid("p", name)) {
CancelledRecord::RunCancelled { task } => {
assert_eq!(task, &tid("p", name));
}
other => panic!("expected RunCancelled for {name}, got {other:?}"),
}
}
assert!(
spawner.spawns().is_empty(),
"no task should have been spawned: {:?}",
spawner.spawns(),
);
let events = observer.events();
assert!(
events
.iter()
.all(|e| matches!(e, Event::Cancelled(_, CancelledRecord::RunCancelled { .. }))),
"expected only Cancelled events, got {events:?}",
);
assert_eq!(events.len(), 3);
}
#[tokio::test]
async fn exec_013_cancel_mid_flight_signals_in_flight_task() {
let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "solo")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
spawner.push_spec(exit_on_terminate_spec(0));
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
trigger_cancel.cancel();
};
let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
let result = result.unwrap();
match cancelled_for(&result.outcomes, &tid("p", "solo")) {
CancelledRecord::SignaledInFlight { task, .. } => {
assert_eq!(task, &tid("p", "solo"));
}
other => panic!("expected SignaledInFlight, got {other:?}"),
}
assert_eq!(spawner.spawns().len(), 1);
assert_eq!(
spawner.signals_for(0).unwrap(),
vec![Signal::Terminate],
"expected exactly one Terminate, got {:?}",
spawner.signals_for(0),
);
}
#[tokio::test]
async fn exec_014_cancel_mid_flight_escalates_to_kill_after_grace() {
let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.05));
let g = make_graph(vec![tid("p", "solo")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
spawner.push_spec(exit_on_kill_only_spec(137));
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
trigger_cancel.cancel();
};
let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
let result = result.unwrap();
match cancelled_for(&result.outcomes, &tid("p", "solo")) {
CancelledRecord::SignaledInFlight { .. } => {}
other => panic!("expected SignaledInFlight, got {other:?}"),
}
assert_eq!(
spawner.signals_for(0).unwrap(),
vec![Signal::Terminate, Signal::Kill],
);
}
#[tokio::test]
async fn exec_014_cancel_grace_zero_sends_kill_immediately() {
let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.0));
let g = make_graph(vec![tid("p", "solo")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
spawner.push_spec(exit_on_kill_only_spec(137));
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
trigger_cancel.cancel();
};
let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
let result = result.unwrap();
match cancelled_for(&result.outcomes, &tid("p", "solo")) {
CancelledRecord::SignaledInFlight { .. } => {}
other => panic!("expected SignaledInFlight, got {other:?}"),
}
assert_eq!(
spawner.signals_for(0).unwrap(),
vec![Signal::Terminate, Signal::Kill],
);
}
#[tokio::test]
async fn exec_011_cancelled_task_cascades_descendants_as_upstream_cancelled() {
let p = make_project(
"p",
BTreeSet::new(),
vec![make_task("root"), make_task("mid"), make_task("leaf")],
);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(
vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
vec![
h_edge(tid("p", "root"), tid("p", "mid")),
h_edge(tid("p", "mid"), tid("p", "leaf")),
],
);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
spawner.push_spec(exit_on_terminate_spec(0));
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
trigger_cancel.cancel();
};
let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
let result = result.unwrap();
assert_eq!(result.outcomes.len(), 3);
match cancelled_for(&result.outcomes, &tid("p", "root")) {
CancelledRecord::SignaledInFlight { task, .. } => {
assert_eq!(task, &tid("p", "root"));
}
other => panic!("expected SignaledInFlight for root, got {other:?}"),
}
for name in ["mid", "leaf"] {
match cancelled_for(&result.outcomes, &tid("p", name)) {
CancelledRecord::UpstreamCancelled { task, upstream } => {
assert_eq!(task, &tid("p", name));
assert_eq!(
upstream,
&tid("p", "root"),
"cascade attributes the root cancelled task to {name}",
);
}
other => {
panic!("expected UpstreamCancelled for {name}, got {other:?}")
}
}
}
}
#[tokio::test]
async fn exec_015_cancelled_run_does_not_produce_cache_entry() {
let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "solo")], vec![]);
let fixture = Fixture::new(ws, g);
{
let spawner1 = MockProcessSpawner::new();
spawner1.push_spec(exit_on_terminate_spec(0));
let observer1 = Recorder::default();
let ctx1 = make_ctx(&fixture, &spawner1, &observer1);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
trigger_cancel.cancel();
};
let (run1, ()) = tokio::join!(run_graph(&ctx1, 1), trigger);
let run1 = run1.unwrap();
match cancelled_for(&run1.outcomes, &tid("p", "solo")) {
CancelledRecord::SignaledInFlight { .. } => {}
other => panic!("run 1 expected SignaledInFlight, got {other:?}"),
}
}
let fresh_cancel = CancellationToken::new();
let spawner2 = MockProcessSpawner::new();
push_n_default_specs(&spawner2, 1);
let observer2 = Recorder::default();
let ctx2 = make_ctx_with_cancel(&fixture, &spawner2, &observer2, &fresh_cancel);
let run2 = run_graph(&ctx2, 2).await.unwrap();
let rec2 = completed_for(&run2.outcomes, &tid("p", "solo"));
assert_eq!(
rec2.source,
RunSource::FreshRun,
"run 2 must be a fresh run; a cache hit would mean run 1 stored an entry",
);
assert_eq!(rec2.state, RunState::Succeeded);
assert_eq!(spawner2.spawns().len(), 1);
}
#[tokio::test]
async fn exec_010_cancel_one_subgraph_does_not_halt_another() {
let task_fast = make_task_with("fast", &["echo", "fast"], None);
let task_slow = make_task_with("slow", &["echo", "slow"], None);
let p = make_project("p", BTreeSet::new(), vec![task_fast, task_slow]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
let g = make_graph(vec![tid("p", "fast"), tid("p", "slow")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec::default());
spawner.push_spec(exit_on_terminate_spec(0));
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let trigger_cancel = fixture.cancel.clone();
let trigger = async move {
tokio::time::sleep(std::time::Duration::from_millis(30)).await;
trigger_cancel.cancel();
};
let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
let result = result.unwrap();
assert_eq!(result.outcomes.len(), 2);
let fast_rec = completed_for(&result.outcomes, &tid("p", "fast"));
assert_eq!(fast_rec.state, RunState::Succeeded);
match cancelled_for(&result.outcomes, &tid("p", "slow")) {
CancelledRecord::SignaledInFlight { .. } => {}
other => panic!("expected SignaledInFlight for slow, got {other:?}"),
}
}
}