Skip to main content

haz_exec/run_graph/
scheduler.rs

1//! Workspace-wide scheduler loop body and its public-surface
2//! types.
3//!
4//! Three public items live here:
5//!
6//! - [`RunGraphOutcome`]: the per-task outcome map, per-task
7//!   error map, and run-level diagnostic vector returned by a
8//!   completed [`run_graph`] invocation.
9//! - [`RuntimeInvariantViolation`]: the typed diagnostic shape
10//!   for `EXEC-019` runtime cycles and `EXEC-020` runtime output
11//!   overlaps.
12//! - [`RunGraphError`]: reserved for future scheduler-level
13//!   error variants; empty in the current revision (`EXEC-010`
14//!   keeps single-task failures out of the top-level return).
15//!
16//! Plus the [`run_graph`] async function: the scheduler loop
17//! itself. The loop composes the helpers from
18//! [`crate::run_graph::state`], [`crate::run_graph::overlap`],
19//! [`crate::run_graph::cycle`], [`crate::run_graph::cascade`],
20//! and [`crate::run_graph::steps`] into the admission /
21//! completion state machine described on [`run_graph`]'s
22//! rustdoc.
23//!
24//! Re-exported by [`crate::run_graph`] so the existing
25//! external paths (`haz_exec::run_graph::run_graph`, etc.)
26//! continue to resolve unchanged.
27
28use std::collections::{BTreeMap, BTreeSet, HashMap};
29use std::num::NonZeroUsize;
30
31use futures::StreamExt;
32use futures::stream::FuturesUnordered;
33use haz_domain::mutex::Mutex;
34use haz_domain::name::{ProjectName, TagName};
35use haz_domain::path::CanonicalPath;
36use haz_domain::task_id::TaskId;
37use haz_vfs::WritableFilesystem;
38use snafu::Snafu;
39
40use crate::hold_set::HoldSet;
41use crate::process::ProcessSpawner;
42use crate::run_graph::cascade::{
43    drain_ready_to_cancelled, emit_cascade_cancellations, emit_cascade_skips,
44};
45use crate::run_graph::cycle::{
46    check_and_record_runtime_cycle_for_completion, skip_ready_cycle_members,
47};
48use crate::run_graph::overlap::check_and_record_output_overlap;
49use crate::run_graph::state::{
50    InFlightCounts, ReadyState, StreamHashAccumulator, precompute_task_tags, resolve_global_cap,
51};
52use crate::run_graph::steps::{
53    InFlightCompletion, InFlightFuture, LookupStepOutcome, run_lookup_step, run_spawn_step,
54};
55use crate::run_task::{
56    CancelledRecord, CompletedRecord, RunContext, RunObserver, RunOutcome, RunState, RunTaskError,
57    SkipCause,
58};
59
60/// Outcome of a single [`run_graph`] invocation.
61///
62/// `outcomes` carries one entry per task that reached a terminal
63/// state through the lookup-then-spawn pipeline; each entry is a
64/// [`RunOutcome::Completed`] wrapping the underlying
65/// [`CompletedRecord`]. Tasks for which the pipeline returned
66/// `Err` are absent from `outcomes` and present in `task_errors`.
67/// Tasks the cascade skipped per `EXEC-010` are absent from both
68/// maps in this revision; a follow-up commit lands the formal
69/// [`RunOutcome::Skipped`] entries.
70///
71/// `invariant_violations` carries run-level diagnostics for
72/// `EXEC-019` (runtime cycle) and `EXEC-020` (output overlap).
73/// The Vec is empty on a clean run; a non-empty Vec means the
74/// scheduler detected a workspace-level invariant violation
75/// during the run. Partial per-task outcomes are preserved
76/// alongside; [`crate::exit_code::exit_code_for`] consults both
77/// maps and the violation Vec to classify the run for
78/// `EXEC-021`.
79///
80/// [`CompletedRecord`]: crate::run_task::CompletedRecord
81#[derive(Debug)]
82pub struct RunGraphOutcome {
83    /// Per-task outcome in canonical `(ProjectName, TaskName)`
84    /// order.
85    pub outcomes: BTreeMap<TaskId, RunOutcome>,
86    /// Per-task error captured when the lookup-then-spawn
87    /// pipeline returned `Err` (failed cache lookup, spawn error,
88    /// stream-read error, store error, etc.). The cascade treats
89    /// an `Err` task the same way as an `Ok(Failed)` task: hard
90    /// descendants are marked skip; unrelated subgraphs continue.
91    pub task_errors: BTreeMap<TaskId, RunTaskError>,
92    /// Run-level diagnostics for runtime DAG invariants
93    /// (`EXEC-019` cycle, `EXEC-020` output overlap). Order is
94    /// detection order: the scheduler appends one entry per
95    /// detected violation as it discovers them.
96    pub invariant_violations: Vec<RuntimeInvariantViolation>,
97}
98
99/// A workspace-level invariant the scheduler detected at runtime.
100///
101/// Each variant carries enough information to produce a
102/// diagnostic shape-equivalent to the static `DAG-014` /
103/// `DAG-016` error it is the runtime analogue of: a cycle's node
104/// set and the offending edge; an overlap's two task identities
105/// and the shared path.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum RuntimeInvariantViolation {
108    /// `EXEC-019` runtime cycle: a length-≥2 cycle in the union
109    /// of hard, soft, and producer-matching edges, discovered
110    /// when a newly-materialised output added a runtime
111    /// producer-matching edge that closed the cycle.
112    RuntimeCycle {
113        /// Every task identified as a member of the cycle.
114        nodes: BTreeSet<TaskId>,
115        /// The newly-added producer-matching edge whose
116        /// addition closed the cycle (predecessor, successor).
117        offending_edge: (TaskId, TaskId),
118    },
119    /// `EXEC-020` runtime output overlap: two tasks have both
120    /// materialised the same workspace-absolute path.
121    OutputOverlap {
122        /// The task that claimed the path first (chronologically
123        /// in the run's completion order).
124        first_task: TaskId,
125        /// The task whose materialisation discovered the
126        /// pre-existing claim.
127        second_task: TaskId,
128        /// The path both tasks materialised.
129        shared_path: CanonicalPath,
130    },
131}
132
133/// Top-level error raised by [`run_graph`].
134///
135/// This revision defines no variants: a single task's failure
136/// does not abort the run (per `EXEC-010`), and every failure
137/// path the scheduler can currently produce is surfaced as a
138/// per-task `RunTaskError` captured in
139/// [`RunGraphOutcome::task_errors`] or as a typed
140/// [`RuntimeInvariantViolation`] in
141/// [`RunGraphOutcome::invariant_violations`]. The enum exists
142/// so a future scheduler-level diagnostic (e.g. a mutex
143/// acquisition error not attributable to a single task) can
144/// grow without changing [`run_graph`]'s signature.
145#[derive(Debug, Snafu)]
146#[snafu(visibility(pub(crate)))]
147pub enum RunGraphError {}
148
149/// Run every task in the validated graph subject to the
150/// workspace's concurrency caps and the mutex compatibility
151/// rules of `EXEC-006` condition 3 + `EXEC-007`.
152///
153/// The loop:
154///
155/// 1. Resolves the workspace's `concurrency.default` to a
156///    concrete cap and pre-computes the per-task tag set.
157/// 2. On each admission round, walks `state.ready` in canonical
158///    order and admits every task whose caps (`EXEC-004`,
159///    `EXEC-005`) permit. Admission consults caps ONLY; the
160///    mutex check is deferred per `EXEC-007` step 1 ("no mutex
161///    hold is taken during cache lookup"). Each admitted task
162///    fires [`RunObserver::on_task_started`] once (tracked via
163///    `started`) and pushes a lookup-step future.
164/// 3. The lookup-step future runs [`cache_lookup_phase`]. On
165///    hit, it drives [`restore_from_hit`] inline (no mutex
166///    interaction, per `MUTEX-007`) and reports the outcome. On
167///    miss, it stops short and reports the data the scheduler
168///    needs for the post-lookup mutex check.
169/// 4. On a miss completion the scheduler evaluates the live
170///    mutex hold set. When compatible, it acquires the
171///    mutex per `MUTEX-006` and pushes a spawn-step future
172///    ([`run_fresh`]). When incompatible (per `MUTEX-005` /
173///    `EXEC-007` step 3 incompatible branch), the slot is
174///    released, the task returns to `ready`, and the next
175///    admission round re-evaluates.
176/// 5. On a spawn-step completion the scheduler releases the
177///    held mutex, releases the slot, fires
178///    [`RunObserver::on_task_finished`], records the outcome,
179///    and either promotes hard successors to ready (succeeded)
180///    or cascade-skips hard descendants (failed, per `EXEC-010`).
181/// 6. Terminates when no futures are in flight; tasks marked
182///    skip never enter `ready`, so the loop's emptiness
183///    condition is well-defined for any graph including
184///    diamonds, fan-ins, and disconnected subgraphs.
185///
186/// # Errors
187///
188/// This revision never returns `Err`: a single task's failure
189/// does not abort the run (per `EXEC-010`), and the only paths
190/// that could yield a top-level error in the current scope are
191/// covered by per-task `RunTaskError`s captured in
192/// [`RunGraphOutcome::task_errors`]. Future revisions will grow
193/// [`RunGraphError`] with scheduler-level diagnostics (runtime
194/// cycle detection, runtime output overlap).
195///
196/// # Panics
197///
198/// Panics if `ctx.graph` references a task whose project or
199/// task name is absent from `ctx.workspace`. A validated graph
200/// is built from the workspace's effective task set; a node
201/// referring to a non-existent task would be a builder bug.
202///
203/// [`cache_lookup_phase`]: crate::run_task::cache_lookup_phase
204/// [`restore_from_hit`]: crate::run_task::restore_from_hit
205/// [`run_fresh`]: crate::run_task::run_fresh
206pub async fn run_graph<F, S, O>(
207    ctx: &RunContext<'_, F, S, O>,
208    created_at_unix: u64,
209) -> Result<RunGraphOutcome, RunGraphError>
210where
211    F: WritableFilesystem,
212    S: ProcessSpawner,
213    O: RunObserver,
214{
215    // EXEC-019 step 3 plumbing: the scheduler watches an
216    // internal child of `ctx.cancel`. User cancellation
217    // (`EXEC-012`) propagates parent-to-child automatically; the
218    // scheduler trips the child directly on runtime cycle
219    // detection. Tripping the child does NOT cancel the user's
220    // parent token: an observer holding a clone of `ctx.cancel`
221    // sees no cancellation when the run aborts for a cycle.
222    let internal_cancel = ctx.cancel.child_token();
223    let internal_ctx = RunContext {
224        fs: ctx.fs,
225        cache: ctx.cache,
226        spawner: ctx.spawner,
227        observer: ctx.observer,
228        workspace: ctx.workspace,
229        graph: ctx.graph,
230        host_env: ctx.host_env,
231        algo: ctx.algo,
232        cancel: &internal_cancel,
233    };
234    let mut sched = SchedulerState::new(&internal_ctx);
235
236    loop {
237        if !sched.cancelled && sched.ctx.cancel.is_cancelled() {
238            sched.cancelled = true;
239        }
240
241        if sched.cancelled {
242            sched.drain_cancelled();
243        } else {
244            sched.admit_ready();
245        }
246
247        if sched.in_flight.is_empty() {
248            break;
249        }
250
251        let Some(completion) = sched.next_completion().await else {
252            continue;
253        };
254
255        match completion {
256            InFlightCompletion::Lookup { task, result } => {
257                sched.handle_lookup(task, result, created_at_unix);
258            }
259            InFlightCompletion::Spawn { task, result } => {
260                sched.handle_spawn(task, result);
261            }
262        }
263    }
264
265    Ok(sched.into_outcome())
266}
267
268/// Aggregate of the per-run scheduler-local state.
269///
270/// Owns every piece of mutable bookkeeping the [`run_graph`] loop
271/// reads or writes: the ready set, the in-flight counts, the
272/// stream-hash accumulator, the mutex hold set, the per-task
273/// hold metadata, the outcome / error / violation maps, the
274/// EXEC-020 output-claim tracker, the EXEC-019 augmented edge
275/// set, the sticky cancel flag, and the `FuturesUnordered` of
276/// in-flight task futures.
277///
278/// Lives inside `scheduler.rs` (no `pub(super)`); the only
279/// caller is [`run_graph`].
280struct SchedulerState<'a, F, S, O>
281where
282    F: WritableFilesystem,
283    S: ProcessSpawner,
284    O: RunObserver,
285{
286    ctx: &'a RunContext<'a, F, S, O>,
287    /// Resolved `concurrency.default` (`EXEC-004`).
288    global_cap: NonZeroUsize,
289    /// Pre-computed per-task tag sets feeding the per-tag
290    /// admission accounting (`EXEC-005`).
291    task_tags: BTreeMap<TaskId, BTreeSet<TagName>>,
292    /// Ready set, hard-edge index, cascade-skip closure
293    /// (`EXEC-001`, `EXEC-010`, `EXEC-011`).
294    ready_state: ReadyState,
295    /// Global + per-tag in-flight counters (`EXEC-006`
296    /// conditions 1, 2).
297    counts: InFlightCounts,
298    /// Captured `(stdout_hash, stderr_hash)` per terminated task
299    /// (`CACHE-007`, `DAG-017`).
300    accum: StreamHashAccumulator,
301    /// Live mutex holds (`EXEC-006` condition 3, `MUTEX-001..007`).
302    hold_set: HoldSet,
303    /// Tasks that have already fired
304    /// [`RunObserver::on_task_started`]. A task that re-enters
305    /// the lifecycle after a `MUTEX-005` yield MUST NOT refire
306    /// the event (S3).
307    started: BTreeSet<TaskId>,
308    /// Per-task hold metadata captured at acquire time so the
309    /// spawn-step completion handler releases the right hold
310    /// without re-deriving it from the workspace.
311    spawn_step_holds: BTreeMap<TaskId, (ProjectName, Option<Mutex>)>,
312    /// Per-task terminal outcomes.
313    outcomes: BTreeMap<TaskId, RunOutcome>,
314    /// Per-task errors captured when the lookup-then-spawn
315    /// pipeline returned `Err`.
316    task_errors: BTreeMap<TaskId, RunTaskError>,
317    /// Run-level diagnostics for `EXEC-019` runtime cycle and
318    /// `EXEC-020` runtime output overlap.
319    invariant_violations: Vec<RuntimeInvariantViolation>,
320    /// `EXEC-020` output-claim tracker: maps each materialised
321    /// workspace-absolute path to the first task that claimed
322    /// it. Lookup-only iteration semantics; `HashMap` because
323    /// `CanonicalPath` implements `Hash + Eq`.
324    output_claims: HashMap<CanonicalPath, TaskId>,
325    /// `EXEC-019` augmented edge set: the static graph's edges
326    /// plus every producer-matching edge discovered at runtime.
327    augmented_edges: BTreeSet<(TaskId, TaskId)>,
328    /// `FuturesUnordered` of in-flight lookup-step and spawn-
329    /// step futures, all borrowing from `ctx`.
330    in_flight: FuturesUnordered<InFlightFuture<'a>>,
331    /// Sticky cancellation flag. Tripped by user cancel
332    /// (`EXEC-012`), EXEC-020 overlap detection, or EXEC-019
333    /// cycle detection. Once set, admission stops and the ready
334    /// set drains into `RunCancelled` entries each iteration.
335    cancelled: bool,
336}
337
338impl<'a, F, S, O> SchedulerState<'a, F, S, O>
339where
340    F: WritableFilesystem,
341    S: ProcessSpawner,
342    O: RunObserver,
343{
344    fn new(ctx: &'a RunContext<'a, F, S, O>) -> Self {
345        let global_cap = resolve_global_cap(&ctx.workspace.settings.concurrency);
346        let task_tags = precompute_task_tags(ctx.workspace, ctx.graph);
347        let ready_state = ReadyState::from_graph(ctx.graph);
348        let augmented_edges: BTreeSet<(TaskId, TaskId)> = ctx
349            .graph
350            .edges
351            .iter()
352            .map(|e| (e.from.clone(), e.to.clone()))
353            .collect();
354        Self {
355            ctx,
356            global_cap,
357            task_tags,
358            ready_state,
359            counts: InFlightCounts::default(),
360            accum: StreamHashAccumulator::default(),
361            hold_set: HoldSet::default(),
362            started: BTreeSet::new(),
363            spawn_step_holds: BTreeMap::new(),
364            outcomes: BTreeMap::new(),
365            task_errors: BTreeMap::new(),
366            invariant_violations: Vec::new(),
367            output_claims: HashMap::new(),
368            augmented_edges,
369            in_flight: FuturesUnordered::new(),
370            cancelled: false,
371        }
372    }
373
374    /// Drain `ready_state.ready` into `RunCancelled` outcomes.
375    /// Called every iteration after the cancel flag trips so a
376    /// successor newly promoted by a post-cancel succeeded
377    /// completion still surfaces as cancelled.
378    fn drain_cancelled(&mut self) {
379        drain_ready_to_cancelled(self.ctx.observer, &mut self.ready_state, &mut self.outcomes);
380    }
381
382    /// Admission round (`EXEC-003`, `EXEC-004`, `EXEC-005`).
383    ///
384    /// Walks the ready set in canonical order and admits every
385    /// task whose every cap permits. Mutex compatibility is NOT
386    /// consulted here (`EXEC-007` step 1 / `MUTEX-007`); the
387    /// lookup-step future runs first, and the post-lookup
388    /// branch in [`Self::handle_lookup`] does the mutex check.
389    fn admit_ready(&mut self) {
390        let candidates: Vec<TaskId> = self.ready_state.ready.iter().cloned().collect();
391        for task in candidates {
392            if self.ready_state.skip.contains(&task) {
393                // Cannot occur given admission and completion
394                // do not interleave on a single async task, but
395                // kept defensive so the invariant is documented.
396                self.ready_state.ready.remove(&task);
397                continue;
398            }
399            let tags = self
400                .task_tags
401                .get(&task)
402                .expect("ready task must have a precomputed tag set");
403            if !self.counts.can_admit(
404                tags,
405                &self.ctx.workspace.settings.concurrency,
406                self.global_cap,
407            ) {
408                continue;
409            }
410            self.ready_state.ready.remove(&task);
411            self.counts.admit(tags);
412
413            // EXEC-007 step 1 begins (no mutex hold). Fire
414            // on_task_started exactly once per task even when
415            // subsequent mutex contention causes the task to
416            // yield and re-enter the lifecycle.
417            if self.started.insert(task.clone()) {
418                self.ctx.observer.on_task_started(&task);
419            }
420
421            let preds_snapshot = self.accum.by_task.clone();
422            let task_for_future = task.clone();
423            self.in_flight.push(Box::pin(run_lookup_step(
424                self.ctx,
425                task_for_future,
426                preds_snapshot,
427            )));
428        }
429    }
430
431    /// Await the next completion, racing against the parent
432    /// cancel token. Returns [`None`] if the cancel token fired
433    /// before any completion; the caller should loop to the
434    /// next iteration (which will observe `self.cancelled`).
435    /// Returns [`Some`] with the completion otherwise.
436    ///
437    /// `in_flight` MUST be non-empty when called.
438    async fn next_completion(&mut self) -> Option<InFlightCompletion> {
439        if self.cancelled {
440            return Some(
441                self.in_flight
442                    .next()
443                    .await
444                    .expect("in_flight checked non-empty above"),
445            );
446        }
447        tokio::select! {
448            biased;
449            () = self.ctx.cancel.cancelled() => {
450                self.cancelled = true;
451                None
452            }
453            next = self.in_flight.next() => {
454                Some(next.expect("in_flight checked non-empty above"))
455            }
456        }
457    }
458
459    /// Handle a lookup-step future's completion. Branches on
460    /// (i) cancel-fire mid-flight (`EXEC-013` step 1), (ii)
461    /// lookup error, (iii) cache hit (no mutex per `MUTEX-007`),
462    /// (iv) cache miss compatible (acquire mutex, dispatch
463    /// spawn-step), (v) cache miss incompatible (`MUTEX-005`
464    /// yield: return to ready).
465    fn handle_lookup(
466        &mut self,
467        task: TaskId,
468        result: Result<LookupStepOutcome, RunTaskError>,
469        created_at_unix: u64,
470    ) {
471        let tags = self
472            .task_tags
473            .get(&task)
474            .expect("completed task must have a precomputed tag set")
475            .clone();
476
477        if self.cancelled {
478            // EXEC-013 step 1 in flight: a task that was
479            // admitted before the cancel-fire but whose
480            // lookup-step completed after is reclassified as
481            // cancelled. The result is discarded; no spawn-step
482            // is dispatched. The cascade is emitted via the same
483            // complete_failed mechanism Failed and Cancelled
484            // use, so descendants land as UpstreamCancelled.
485            self.counts.release(&tags);
486            let record = CancelledRecord::RunCancelled { task: task.clone() };
487            self.ctx.observer.on_task_cancelled(&task, &record);
488            let newly = self.ready_state.complete_failed(&task);
489            emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
490            self.outcomes.insert(task, RunOutcome::Cancelled(record));
491            return;
492        }
493
494        match result {
495            Err(err) => {
496                self.counts.release(&tags);
497                let newly = self.ready_state.complete_failed(&task);
498                let cause = SkipCause::UpstreamErrored {
499                    upstream: task.clone(),
500                };
501                emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
502                self.task_errors.insert(task, err);
503            }
504            Ok(LookupStepOutcome::Hit(record)) => {
505                self.counts.release(&tags);
506                self.ctx.observer.on_task_finished(&task, &record);
507                self.accum.record(&task, &record);
508                self.record_completion_invariants(task, record);
509            }
510            Ok(LookupStepOutcome::Miss {
511                key,
512                mutex,
513                project_name,
514            }) => {
515                // EXEC-006 condition 3 / EXEC-007 step 3.
516                if self.hold_set.compatible(&project_name, mutex.as_ref()) {
517                    // MUTEX-006 "hold lifetime begins at spawn time".
518                    self.hold_set.acquire(&project_name, mutex.as_ref());
519                    self.spawn_step_holds
520                        .insert(task.clone(), (project_name, mutex));
521                    let task_for_future = task.clone();
522                    self.in_flight.push(Box::pin(run_spawn_step(
523                        self.ctx,
524                        task_for_future,
525                        key,
526                        created_at_unix,
527                    )));
528                } else {
529                    // MUTEX-005 yield: release slot, return task
530                    // to ready; next admission re-evaluates.
531                    self.counts.release(&tags);
532                    self.ready_state.ready.insert(task);
533                }
534            }
535        }
536    }
537
538    /// Handle a spawn-step future's completion. Branches on the
539    /// per-task `RunState`: succeeded (run EXEC-020 / EXEC-019
540    /// invariant checks), failed (cascade-skip hard descendants),
541    /// cancelled (translate to `SignaledInFlight`, cascade
542    /// `UpstreamCancelled`), or pipeline error (cascade-skip with
543    /// `UpstreamErrored`).
544    fn handle_spawn(&mut self, task: TaskId, result: Result<CompletedRecord, RunTaskError>) {
545        let tags = self
546            .task_tags
547            .get(&task)
548            .expect("completed task must have a precomputed tag set")
549            .clone();
550        // MUTEX-006: release at command termination, regardless
551        // of success or failure.
552        if let Some((project_name, mutex)) = self.spawn_step_holds.remove(&task) {
553            self.hold_set.release(&project_name, mutex.as_ref());
554        }
555        self.counts.release(&tags);
556        match result {
557            Ok(record) => match record.state {
558                RunState::Succeeded => {
559                    self.ctx.observer.on_task_finished(&task, &record);
560                    self.accum.record(&task, &record);
561                    self.record_completion_invariants(task, record);
562                }
563                RunState::Failed => {
564                    self.ctx.observer.on_task_finished(&task, &record);
565                    self.accum.record(&task, &record);
566                    let newly = self.ready_state.complete_failed(&task);
567                    let cause = SkipCause::UpstreamFailed {
568                        upstream: task.clone(),
569                    };
570                    emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
571                    self.outcomes.insert(task, RunOutcome::Completed(record));
572                }
573                RunState::Cancelled => {
574                    // The single-task lifecycle saw the run's
575                    // cancellation token fire and signalled the
576                    // child. Translate to the run-graph aggregate
577                    // view: emit SignaledInFlight and cascade hard
578                    // descendants as UpstreamCancelled (EXEC-011
579                    // for cancellation).
580                    let cancelled_record = CancelledRecord::SignaledInFlight {
581                        task: task.clone(),
582                        exit_status: record
583                            .exit_status
584                            .expect("a cancelled fresh run always carries an exit status"),
585                        stdout_hash: record.stdout_hash,
586                        stderr_hash: record.stderr_hash,
587                    };
588                    self.ctx
589                        .observer
590                        .on_task_cancelled(&task, &cancelled_record);
591                    let newly = self.ready_state.complete_failed(&task);
592                    emit_cascade_cancellations(self.ctx.observer, &mut self.outcomes, &task, newly);
593                    self.outcomes
594                        .insert(task, RunOutcome::Cancelled(cancelled_record));
595                }
596            },
597            Err(err) => {
598                let newly = self.ready_state.complete_failed(&task);
599                let cause = SkipCause::UpstreamErrored {
600                    upstream: task.clone(),
601                };
602                emit_cascade_skips(self.ctx.observer, &mut self.outcomes, &cause, newly);
603                self.task_errors.insert(task, err);
604            }
605        }
606    }
607
608    /// Run the `EXEC-020` output-overlap check and the
609    /// `EXEC-019` runtime-cycle check on a just-completed
610    /// (`Succeeded`) `record`, then either drop the completion
611    /// into the outcomes map normally or, on cycle detection,
612    /// trip the internal cancel token and skip ready cycle
613    /// members before recording.
614    fn record_completion_invariants(&mut self, task: TaskId, record: CompletedRecord) {
615        if check_and_record_output_overlap(
616            &mut self.output_claims,
617            &mut self.invariant_violations,
618            &task,
619            &record.materialised_outputs,
620        ) {
621            self.cancelled = true;
622        }
623        if let Some(cycle_nodes) = check_and_record_runtime_cycle_for_completion(
624            &mut self.augmented_edges,
625            &mut self.invariant_violations,
626            self.ctx.workspace,
627            &task,
628            &record.materialised_outputs,
629        ) {
630            self.cancelled = true;
631            self.ctx.cancel.cancel();
632            self.ready_state.complete_succeeded(&task);
633            self.outcomes.insert(task, RunOutcome::Completed(record));
634            skip_ready_cycle_members(
635                self.ctx.observer,
636                &mut self.ready_state,
637                &mut self.outcomes,
638                &cycle_nodes,
639            );
640            return;
641        }
642        self.ready_state.complete_succeeded(&task);
643        self.outcomes.insert(task, RunOutcome::Completed(record));
644    }
645
646    fn into_outcome(self) -> RunGraphOutcome {
647        RunGraphOutcome {
648            outcomes: self.outcomes,
649            task_errors: self.task_errors,
650            invariant_violations: self.invariant_violations,
651        }
652    }
653}
654
655#[cfg(test)]
656mod tests {
657    use std::collections::BTreeSet;
658
659    use haz_domain::settings::WorkspaceSettings;
660    use tokio_util::sync::CancellationToken;
661
662    use crate::mock_impl::{MockBehaviour, MockProcessSpawner, MockSpec};
663    use crate::process::Signal;
664    use crate::run_graph::scheduler::run_graph;
665    use crate::run_graph::test_fixtures::*;
666    use crate::run_task::{CancelledRecord, RunSource, RunState, SkipCause};
667
668    #[tokio::test]
669    async fn exec_001_empty_graph_terminates_with_empty_outcomes() {
670        let ws = make_workspace(vec![], WorkspaceSettings::default());
671        let g = make_graph(vec![], vec![]);
672        let fixture = Fixture::new(ws, g);
673        let spawner = MockProcessSpawner::new();
674        let observer = Recorder::default();
675        let ctx = make_ctx(&fixture, &spawner, &observer);
676
677        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
678
679        assert!(result.outcomes.is_empty());
680        assert!(result.task_errors.is_empty());
681        assert!(observer.events().is_empty());
682        assert!(spawner.spawns().is_empty());
683    }
684
685    #[tokio::test]
686    async fn single_task_succeeds_writes_outcome() {
687        let task = make_task("build");
688        let p = make_project("p", BTreeSet::new(), vec![task]);
689        let ws = make_workspace(vec![p], WorkspaceSettings::default());
690        let g = make_graph(vec![tid("p", "build")], vec![]);
691        let fixture = Fixture::new(ws, g);
692        let spawner = MockProcessSpawner::new();
693        push_n_default_specs(&spawner, 1);
694        let observer = Recorder::default();
695        let ctx = make_ctx(&fixture, &spawner, &observer);
696
697        let result = run_graph(&ctx, 1).await.unwrap();
698
699        assert_eq!(result.outcomes.len(), 1);
700        let record = completed_for(&result.outcomes, &tid("p", "build"));
701        assert_eq!(record.state, RunState::Succeeded);
702        assert_eq!(record.source, RunSource::FreshRun);
703        assert!(result.task_errors.is_empty());
704    }
705
706    #[tokio::test]
707    async fn exec_002_linear_chain_runs_in_topological_order() {
708        let p = make_project(
709            "p",
710            BTreeSet::new(),
711            vec![make_task("a"), make_task("b"), make_task("c")],
712        );
713        let ws = make_workspace(vec![p], WorkspaceSettings::default());
714        let g = make_graph(
715            vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
716            vec![
717                h_edge(tid("p", "a"), tid("p", "b")),
718                h_edge(tid("p", "b"), tid("p", "c")),
719            ],
720        );
721        let fixture = Fixture::new(ws, g);
722        let spawner = MockProcessSpawner::new();
723        push_n_default_specs(&spawner, 3);
724        let observer = Recorder::default();
725        let ctx = make_ctx(&fixture, &spawner, &observer);
726
727        let result = run_graph(&ctx, 1).await.unwrap();
728        assert_eq!(result.outcomes.len(), 3);
729        assert_eq!(
730            observer.started_order(),
731            vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
732        );
733    }
734
735    #[tokio::test]
736    async fn diamond_dag_runs_branches_and_joins_correctly() {
737        let p = make_project(
738            "p",
739            BTreeSet::new(),
740            vec![
741                make_task("bot"),
742                make_task("l"),
743                make_task("r"),
744                make_task("top"),
745            ],
746        );
747        let ws = make_workspace(vec![p], WorkspaceSettings::default());
748        let g = make_graph(
749            vec![
750                tid("p", "bot"),
751                tid("p", "l"),
752                tid("p", "r"),
753                tid("p", "top"),
754            ],
755            vec![
756                h_edge(tid("p", "top"), tid("p", "l")),
757                h_edge(tid("p", "top"), tid("p", "r")),
758                h_edge(tid("p", "l"), tid("p", "bot")),
759                h_edge(tid("p", "r"), tid("p", "bot")),
760            ],
761        );
762        let fixture = Fixture::new(ws, g);
763        let spawner = MockProcessSpawner::new();
764        push_n_default_specs(&spawner, 4);
765        let observer = Recorder::default();
766        let ctx = make_ctx(&fixture, &spawner, &observer);
767
768        let result = run_graph(&ctx, 1).await.unwrap();
769        assert_eq!(result.outcomes.len(), 4);
770        let started = observer.started_order();
771        // `top` is the only initial root, so it starts first.
772        assert_eq!(started.first(), Some(&tid("p", "top")));
773        // `bot` runs only after both `l` and `r` succeed; it must
774        // therefore start last in any valid scheduling order.
775        assert_eq!(started.last(), Some(&tid("p", "bot")));
776    }
777
778    #[tokio::test]
779    async fn exec_004_global_cap_one_serialises_independent_tasks() {
780        let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
781        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
782        let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
783        let fixture = Fixture::new(ws, g);
784        let spawner = MockProcessSpawner::new();
785        push_n_default_specs(&spawner, 2);
786        let observer = Recorder::default();
787        let ctx = make_ctx(&fixture, &spawner, &observer);
788
789        run_graph(&ctx, 1).await.unwrap();
790
791        // With cap 1, Started(b) must come AFTER Finished(a).
792        let events = observer.events();
793        let started_b = events
794            .iter()
795            .position(|e| matches!(e, Event::Started(t) if *t == tid("p", "b")))
796            .expect("b started");
797        let finished_a = events
798            .iter()
799            .position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a")))
800            .expect("a finished");
801        assert!(
802            started_b > finished_a,
803            "b started ({started_b}) must follow a finished ({finished_a}): {events:?}",
804        );
805    }
806
807    #[tokio::test]
808    async fn exec_004_global_cap_two_admits_three_independent_in_bursts() {
809        let p = make_project(
810            "p",
811            BTreeSet::new(),
812            vec![make_task("a"), make_task("b"), make_task("c")],
813        );
814        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
815        let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
816        let fixture = Fixture::new(ws, g);
817        let spawner = MockProcessSpawner::new();
818        push_n_default_specs(&spawner, 3);
819        let observer = Recorder::default();
820        let ctx = make_ctx(&fixture, &spawner, &observer);
821
822        run_graph(&ctx, 1).await.unwrap();
823
824        // With cap 2 and 3 tasks: c starts only after one of {a, b}
825        // has finished.
826        let events = observer.events();
827        let started_c = events
828            .iter()
829            .position(|e| matches!(e, Event::Started(t) if *t == tid("p", "c")))
830            .expect("c started");
831        let any_finish_before_c = events[..started_c].iter().any(
832            |e| matches!(e, Event::Finished(t, _, _) if *t == tid("p", "a") || *t == tid("p", "b")),
833        );
834        assert!(
835            any_finish_before_c,
836            "c starting at {started_c} must follow at least one finish: {events:?}",
837        );
838    }
839
840    #[tokio::test]
841    async fn exec_005_per_tag_cap_serialises_tagged_tasks_across_projects() {
842        let task_a = make_task("compute");
843        let task_b = make_task("compute");
844        let pa = make_project("pa", BTreeSet::from([tag("db")]), vec![task_a]);
845        let pb = make_project("pb", BTreeSet::from([tag("db")]), vec![task_b]);
846        let ws = make_workspace(
847            vec![pa, pb],
848            workspace_settings_with_tag_cap(fixed_cap(10), "db", 1),
849        );
850        let g = make_graph(vec![tid("pa", "compute"), tid("pb", "compute")], vec![]);
851        let fixture = Fixture::new(ws, g);
852        let spawner = MockProcessSpawner::new();
853        push_n_default_specs(&spawner, 2);
854        let observer = Recorder::default();
855        let ctx = make_ctx(&fixture, &spawner, &observer);
856
857        run_graph(&ctx, 1).await.unwrap();
858
859        let events = observer.events();
860        let started_pb = events
861            .iter()
862            .position(|e| matches!(e, Event::Started(t) if *t == tid("pb", "compute")))
863            .expect("pb:compute started");
864        let finished_pa = events
865            .iter()
866            .position(|e| matches!(e, Event::Finished(t, _, _) if *t == tid("pa", "compute")))
867            .expect("pa:compute finished");
868        assert!(
869            started_pb > finished_pa,
870            "pb:compute ({started_pb}) must follow pa:compute finish ({finished_pa}): {events:?}",
871        );
872    }
873
874    #[tokio::test]
875    async fn exec_003_canonical_order_under_partial_slot_availability() {
876        // Three independent tasks under cap=1; canonical
877        // `(ProjectName, TaskName)` order is (p,a) < (p,b) < (p,c).
878        // The node insertion order below is intentionally reverse
879        // declaration order to prove the scheduler ignores it.
880        let p = make_project(
881            "p",
882            BTreeSet::new(),
883            vec![make_task("c"), make_task("a"), make_task("b")],
884        );
885        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
886        let g = make_graph(vec![tid("p", "c"), tid("p", "b"), tid("p", "a")], vec![]);
887        let fixture = Fixture::new(ws, g);
888        let spawner = MockProcessSpawner::new();
889        push_n_default_specs(&spawner, 3);
890        let observer = Recorder::default();
891        let ctx = make_ctx(&fixture, &spawner, &observer);
892
893        run_graph(&ctx, 1).await.unwrap();
894
895        assert_eq!(
896            observer.started_order(),
897            vec![tid("p", "a"), tid("p", "b"), tid("p", "c")],
898        );
899    }
900
901    #[tokio::test]
902    async fn exec_010_task_failure_does_not_halt_unrelated_subgraph() {
903        // Two independent subgraphs: a -> a_child and b -> b_child.
904        // `a` fails; `a_child` must be skipped; `b` and `b_child`
905        // must both complete.
906        let p = make_project(
907            "p",
908            BTreeSet::new(),
909            vec![
910                make_task("a"),
911                make_task("a_child"),
912                make_task("b"),
913                make_task("b_child"),
914            ],
915        );
916        let ws = make_workspace(vec![p], WorkspaceSettings::default());
917        let g = make_graph(
918            vec![
919                tid("p", "a"),
920                tid("p", "a_child"),
921                tid("p", "b"),
922                tid("p", "b_child"),
923            ],
924            vec![
925                h_edge(tid("p", "a"), tid("p", "a_child")),
926                h_edge(tid("p", "b"), tid("p", "b_child")),
927            ],
928        );
929        let fixture = Fixture::new(ws, g);
930        let spawner = MockProcessSpawner::new();
931        // Canonical admission order: a (fail), b (succeed),
932        // then b_child (succeed). a_child never spawns.
933        push_spec_with_exit(&spawner, 1);
934        push_n_default_specs(&spawner, 2);
935        let observer = Recorder::default();
936        let ctx = make_ctx(&fixture, &spawner, &observer);
937
938        let result = run_graph(&ctx, 1).await.unwrap();
939
940        assert_eq!(
941            completed_for(&result.outcomes, &tid("p", "a")).state,
942            RunState::Failed,
943            "a should be Failed",
944        );
945        // a_child is cascade-skipped, surfaced as
946        // RunOutcome::Skipped with `a` as the root cause
947        // (EXEC-011 + S3).
948        assert_eq!(
949            skipped_for(&result.outcomes, &tid("p", "a_child")).cause,
950            SkipCause::UpstreamFailed {
951                upstream: tid("p", "a"),
952            },
953            "a_child cascade-skipped with root cause `a`",
954        );
955        assert_eq!(
956            completed_for(&result.outcomes, &tid("p", "b")).state,
957            RunState::Succeeded,
958            "sibling b should succeed",
959        );
960        assert_eq!(
961            completed_for(&result.outcomes, &tid("p", "b_child")).state,
962            RunState::Succeeded,
963            "sibling b_child should succeed",
964        );
965    }
966
967    #[tokio::test]
968    async fn exec_011_task_failure_cascades_to_hard_descendants() {
969        // Chain root -> mid -> leaf. `root` fails; `mid` and
970        // `leaf` are both cascade-skipped and surface as
971        // `RunOutcome::Skipped` with `root` (NOT `mid`) as
972        // the cause on `leaf` per S3's root-cause attribution.
973        let p = make_project(
974            "p",
975            BTreeSet::new(),
976            vec![make_task("root"), make_task("mid"), make_task("leaf")],
977        );
978        let ws = make_workspace(vec![p], WorkspaceSettings::default());
979        let g = make_graph(
980            vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
981            vec![
982                h_edge(tid("p", "root"), tid("p", "mid")),
983                h_edge(tid("p", "mid"), tid("p", "leaf")),
984            ],
985        );
986        let fixture = Fixture::new(ws, g);
987        let spawner = MockProcessSpawner::new();
988        // Only one spawn happens (root's). Push one failing spec.
989        push_spec_with_exit(&spawner, 2);
990        let observer = Recorder::default();
991        let ctx = make_ctx(&fixture, &spawner, &observer);
992
993        let result = run_graph(&ctx, 1).await.unwrap();
994
995        assert_eq!(result.outcomes.len(), 3);
996        assert_eq!(
997            completed_for(&result.outcomes, &tid("p", "root")).state,
998            RunState::Failed,
999        );
1000        let cause = SkipCause::UpstreamFailed {
1001            upstream: tid("p", "root"),
1002        };
1003        assert_eq!(
1004            skipped_for(&result.outcomes, &tid("p", "mid")).cause,
1005            cause,
1006            "mid records root cause = root",
1007        );
1008        assert_eq!(
1009            skipped_for(&result.outcomes, &tid("p", "leaf")).cause,
1010            cause,
1011            "leaf records root cause = root (NOT mid)",
1012        );
1013        assert_eq!(spawner.spawns().len(), 1);
1014    }
1015
1016    #[tokio::test]
1017    async fn exec_010_soft_edge_predecessor_failure_does_not_cascade() {
1018        // a -(soft)-> b. Soft edges do not establish a hard
1019        // predecessor; b has hard in-degree 0 and is ready from the
1020        // start. When a fails, the cascade walks only hard edges,
1021        // so b is not skipped.
1022        let p = make_project("p", BTreeSet::new(), vec![make_task("a"), make_task("b")]);
1023        let ws = make_workspace(vec![p], WorkspaceSettings::default());
1024        let g = make_graph(
1025            vec![tid("p", "a"), tid("p", "b")],
1026            vec![s_edge(tid("p", "a"), tid("p", "b"))],
1027        );
1028        let fixture = Fixture::new(ws, g);
1029        let spawner = MockProcessSpawner::new();
1030        // Canonical admission order is a, b. a fails; b succeeds.
1031        push_spec_with_exit(&spawner, 1);
1032        push_n_default_specs(&spawner, 1);
1033        let observer = Recorder::default();
1034        let ctx = make_ctx(&fixture, &spawner, &observer);
1035
1036        let result = run_graph(&ctx, 1).await.unwrap();
1037        assert_eq!(
1038            completed_for(&result.outcomes, &tid("p", "a")).state,
1039            RunState::Failed,
1040        );
1041        assert_eq!(
1042            completed_for(&result.outcomes, &tid("p", "b")).state,
1043            RunState::Succeeded,
1044            "soft-edge successor must not be cascade-skipped",
1045        );
1046    }
1047
1048    #[tokio::test]
1049    async fn exec_011_observer_emits_no_started_or_finished_for_skipped_tasks() {
1050        // Chain root -> mid -> leaf with root failing. The
1051        // observer event stream must contain exactly one
1052        // Started (root), one Finished (root, Failed,
1053        // FreshRun), and one Skipped per cascade-descendant
1054        // (mid, leaf). Skipped tasks NEVER fire Started or
1055        // Finished (EXEC-011 + S4).
1056        let p = make_project(
1057            "p",
1058            BTreeSet::new(),
1059            vec![make_task("root"), make_task("mid"), make_task("leaf")],
1060        );
1061        let ws = make_workspace(vec![p], WorkspaceSettings::default());
1062        let g = make_graph(
1063            vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
1064            vec![
1065                h_edge(tid("p", "root"), tid("p", "mid")),
1066                h_edge(tid("p", "mid"), tid("p", "leaf")),
1067            ],
1068        );
1069        let fixture = Fixture::new(ws, g);
1070        let spawner = MockProcessSpawner::new();
1071        push_spec_with_exit(&spawner, 3);
1072        let observer = Recorder::default();
1073        let ctx = make_ctx(&fixture, &spawner, &observer);
1074
1075        let _ = run_graph(&ctx, 1).await.unwrap();
1076
1077        let events = observer.events();
1078        let cause = SkipCause::UpstreamFailed {
1079            upstream: tid("p", "root"),
1080        };
1081        // Skipped events fire in canonical (ProjectName,
1082        // TaskName) order because the cascade closure is a
1083        // BTreeSet; with task names "mid" and "leaf", lex
1084        // ordering puts "leaf" first regardless of the
1085        // graph's chain shape.
1086        assert_eq!(
1087            events,
1088            vec![
1089                Event::Started(tid("p", "root")),
1090                Event::Finished(tid("p", "root"), RunState::Failed, RunSource::FreshRun),
1091                Event::Skipped(tid("p", "leaf"), cause.clone()),
1092                Event::Skipped(tid("p", "mid"), cause),
1093            ],
1094            "expected exactly one Started + Finished for root \
1095             and one Skipped per descendant in canonical order",
1096        );
1097    }
1098
1099    #[tokio::test]
1100    async fn exec_011_diamond_cascade_records_each_descendant_once() {
1101        // Diamond: top -> left, top -> right, left -> bot,
1102        // right -> bot. `top` fails and the cascade reaches
1103        // `bot` via two paths. The outcomes map MUST contain
1104        // exactly one Skipped entry for `bot` (not two), and
1105        // the observer MUST fire on_task_skipped exactly once
1106        // for `bot`. This exercises the diamond-uniqueness
1107        // invariant of complete_failed's returned set
1108        // end-to-end (the unit test
1109        // `complete_failed_diamond_attributes_each_descendant_once`
1110        // covers it at the state level).
1111        let p = make_project(
1112            "p",
1113            BTreeSet::new(),
1114            vec![
1115                make_task("bot"),
1116                make_task("left"),
1117                make_task("right"),
1118                make_task("top"),
1119            ],
1120        );
1121        let ws = make_workspace(vec![p], WorkspaceSettings::default());
1122        let g = make_graph(
1123            vec![
1124                tid("p", "bot"),
1125                tid("p", "left"),
1126                tid("p", "right"),
1127                tid("p", "top"),
1128            ],
1129            vec![
1130                h_edge(tid("p", "top"), tid("p", "left")),
1131                h_edge(tid("p", "top"), tid("p", "right")),
1132                h_edge(tid("p", "left"), tid("p", "bot")),
1133                h_edge(tid("p", "right"), tid("p", "bot")),
1134            ],
1135        );
1136        let fixture = Fixture::new(ws, g);
1137        let spawner = MockProcessSpawner::new();
1138        push_spec_with_exit(&spawner, 4);
1139        let observer = Recorder::default();
1140        let ctx = make_ctx(&fixture, &spawner, &observer);
1141
1142        let result = run_graph(&ctx, 1).await.unwrap();
1143
1144        // 4 outcomes total: top Completed(Failed), left/right/bot Skipped.
1145        assert_eq!(result.outcomes.len(), 4);
1146        assert_eq!(
1147            completed_for(&result.outcomes, &tid("p", "top")).state,
1148            RunState::Failed,
1149        );
1150        let cause = SkipCause::UpstreamFailed {
1151            upstream: tid("p", "top"),
1152        };
1153        for descendant in [tid("p", "left"), tid("p", "right"), tid("p", "bot")] {
1154            assert_eq!(
1155                skipped_for(&result.outcomes, &descendant).cause,
1156                cause,
1157                "{descendant:?} should be Skipped with root cause top",
1158            );
1159        }
1160
1161        // The observer fires on_task_skipped exactly once per
1162        // descendant. `bot` is reachable along two cascade
1163        // paths but must be reported once.
1164        let skipped_count_bot = observer
1165            .events()
1166            .iter()
1167            .filter(|e| matches!(e, Event::Skipped(t, _) if *t == tid("p", "bot")))
1168            .count();
1169        assert_eq!(
1170            skipped_count_bot, 1,
1171            "bot must fire on_task_skipped exactly once across both cascade paths",
1172        );
1173    }
1174
1175    // ====================================================================
1176    // EXEC-012..015: cancellation
1177    // ====================================================================
1178
1179    /// Build a [`MockSpec`] that ignores SIGTERM and exits
1180    /// only on SIGKILL with `kill_exit_code`.
1181    fn exit_on_kill_only_spec(kill_exit_code: i32) -> MockSpec {
1182        MockSpec {
1183            behaviour: MockBehaviour::OnKillOnly,
1184            exit_code: kill_exit_code,
1185            ..MockSpec::default()
1186        }
1187    }
1188
1189    #[tokio::test]
1190    async fn exec_013_cancel_before_admission_marks_all_ready_as_run_cancelled() {
1191        // Cap=1, three independent tasks. The token fires
1192        // BEFORE run_graph starts. EXEC-013 step 1: every
1193        // still-ready task that has not been started enters
1194        // the cancelled state; no spawn ever happens.
1195        let p = make_project(
1196            "p",
1197            BTreeSet::new(),
1198            vec![make_task("a"), make_task("b"), make_task("c")],
1199        );
1200        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1201        let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
1202        let fixture = Fixture::new(ws, g);
1203        let spawner = MockProcessSpawner::new();
1204        let observer = Recorder::default();
1205        let ctx = make_ctx(&fixture, &spawner, &observer);
1206
1207        fixture.cancel.cancel();
1208        let result = run_graph(&ctx, 1).await.unwrap();
1209
1210        assert_eq!(result.outcomes.len(), 3);
1211        for name in ["a", "b", "c"] {
1212            match cancelled_for(&result.outcomes, &tid("p", name)) {
1213                CancelledRecord::RunCancelled { task } => {
1214                    assert_eq!(task, &tid("p", name));
1215                }
1216                other => panic!("expected RunCancelled for {name}, got {other:?}"),
1217            }
1218        }
1219        assert!(
1220            spawner.spawns().is_empty(),
1221            "no task should have been spawned: {:?}",
1222            spawner.spawns(),
1223        );
1224        // Observer fires on_task_cancelled (Event::Cancelled)
1225        // for each task; on_task_started fires for none.
1226        let events = observer.events();
1227        assert!(
1228            events
1229                .iter()
1230                .all(|e| matches!(e, Event::Cancelled(_, CancelledRecord::RunCancelled { .. }))),
1231            "expected only Cancelled events, got {events:?}",
1232        );
1233        assert_eq!(events.len(), 3);
1234    }
1235
1236    #[tokio::test]
1237    async fn exec_013_cancel_mid_flight_signals_in_flight_task() {
1238        // One task whose mock child responds to SIGTERM.
1239        // The trigger fires cancel while the spawn-step is
1240        // blocked on the mock's wait. Per-future grace dance
1241        // sends SIGTERM; mock exits; outcome is
1242        // SignaledInFlight; only Signal::Terminate was
1243        // delivered.
1244        let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1245        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1246        let g = make_graph(vec![tid("p", "solo")], vec![]);
1247        let fixture = Fixture::new(ws, g);
1248        let spawner = MockProcessSpawner::new();
1249        spawner.push_spec(exit_on_terminate_spec(0));
1250        let observer = Recorder::default();
1251        let ctx = make_ctx(&fixture, &spawner, &observer);
1252
1253        let trigger_cancel = fixture.cancel.clone();
1254        let trigger = async move {
1255            // Yield long enough for the scheduler to admit
1256            // the task and reach the spawn-step wait. A
1257            // few milliseconds is more than enough for the
1258            // MemFilesystem-backed cache lookup to land.
1259            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1260            trigger_cancel.cancel();
1261        };
1262
1263        let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1264        let result = result.unwrap();
1265
1266        match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1267            CancelledRecord::SignaledInFlight { task, .. } => {
1268                assert_eq!(task, &tid("p", "solo"));
1269            }
1270            other => panic!("expected SignaledInFlight, got {other:?}"),
1271        }
1272        assert_eq!(spawner.spawns().len(), 1);
1273        // Mock recorded exactly Signal::Terminate: the
1274        // polite-exit-on-SIGTERM branch wins before grace
1275        // expires, so SIGKILL never fires.
1276        assert_eq!(
1277            spawner.signals_for(0).unwrap(),
1278            vec![Signal::Terminate],
1279            "expected exactly one Terminate, got {:?}",
1280            spawner.signals_for(0),
1281        );
1282    }
1283
1284    #[tokio::test]
1285    async fn exec_014_cancel_mid_flight_escalates_to_kill_after_grace() {
1286        // Stubborn child: ignores SIGTERM, only SIGKILL
1287        // exits the wait. A short grace (50 ms) so the
1288        // test does not sit on the default 5 s grace.
1289        // The per-future grace dance sends SIGTERM, sleeps
1290        // grace, then sends SIGKILL; the mock then unblocks.
1291        let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1292        let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.05));
1293        let g = make_graph(vec![tid("p", "solo")], vec![]);
1294        let fixture = Fixture::new(ws, g);
1295        let spawner = MockProcessSpawner::new();
1296        spawner.push_spec(exit_on_kill_only_spec(137));
1297        let observer = Recorder::default();
1298        let ctx = make_ctx(&fixture, &spawner, &observer);
1299
1300        let trigger_cancel = fixture.cancel.clone();
1301        let trigger = async move {
1302            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1303            trigger_cancel.cancel();
1304        };
1305
1306        let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1307        let result = result.unwrap();
1308
1309        match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1310            CancelledRecord::SignaledInFlight { .. } => {}
1311            other => panic!("expected SignaledInFlight, got {other:?}"),
1312        }
1313        // Mock recorded SIGTERM then SIGKILL: the polite
1314        // signal failed to exit the wait, the grace timer
1315        // fired, the executor escalated.
1316        assert_eq!(
1317            spawner.signals_for(0).unwrap(),
1318            vec![Signal::Terminate, Signal::Kill],
1319        );
1320    }
1321
1322    #[tokio::test]
1323    async fn exec_014_cancel_grace_zero_sends_kill_immediately() {
1324        // With cancel_grace = 0, the per-future grace dance
1325        // collapses: SIGTERM and SIGKILL fire one after the
1326        // other with no virtual time between them. The
1327        // stubborn OnKillOnly child exits on SIGKILL.
1328        let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1329        let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(1), 0.0));
1330        let g = make_graph(vec![tid("p", "solo")], vec![]);
1331        let fixture = Fixture::new(ws, g);
1332        let spawner = MockProcessSpawner::new();
1333        spawner.push_spec(exit_on_kill_only_spec(137));
1334        let observer = Recorder::default();
1335        let ctx = make_ctx(&fixture, &spawner, &observer);
1336
1337        let trigger_cancel = fixture.cancel.clone();
1338        let trigger = async move {
1339            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1340            trigger_cancel.cancel();
1341        };
1342
1343        let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1344        let result = result.unwrap();
1345
1346        match cancelled_for(&result.outcomes, &tid("p", "solo")) {
1347            CancelledRecord::SignaledInFlight { .. } => {}
1348            other => panic!("expected SignaledInFlight, got {other:?}"),
1349        }
1350        assert_eq!(
1351            spawner.signals_for(0).unwrap(),
1352            vec![Signal::Terminate, Signal::Kill],
1353        );
1354    }
1355
1356    #[tokio::test]
1357    async fn exec_011_cancelled_task_cascades_descendants_as_upstream_cancelled() {
1358        // Chain root -> mid -> leaf. The cancel fires while
1359        // root is in-flight; root's per-future grace dance
1360        // signals the child, root ends as SignaledInFlight,
1361        // and the scheduler cascades hard descendants as
1362        // UpstreamCancelled per the cancellation arm of
1363        // EXEC-011.
1364        let p = make_project(
1365            "p",
1366            BTreeSet::new(),
1367            vec![make_task("root"), make_task("mid"), make_task("leaf")],
1368        );
1369        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1370        let g = make_graph(
1371            vec![tid("p", "root"), tid("p", "mid"), tid("p", "leaf")],
1372            vec![
1373                h_edge(tid("p", "root"), tid("p", "mid")),
1374                h_edge(tid("p", "mid"), tid("p", "leaf")),
1375            ],
1376        );
1377        let fixture = Fixture::new(ws, g);
1378        let spawner = MockProcessSpawner::new();
1379        spawner.push_spec(exit_on_terminate_spec(0));
1380        let observer = Recorder::default();
1381        let ctx = make_ctx(&fixture, &spawner, &observer);
1382
1383        let trigger_cancel = fixture.cancel.clone();
1384        let trigger = async move {
1385            tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1386            trigger_cancel.cancel();
1387        };
1388
1389        let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1390        let result = result.unwrap();
1391
1392        assert_eq!(result.outcomes.len(), 3);
1393        match cancelled_for(&result.outcomes, &tid("p", "root")) {
1394            CancelledRecord::SignaledInFlight { task, .. } => {
1395                assert_eq!(task, &tid("p", "root"));
1396            }
1397            other => panic!("expected SignaledInFlight for root, got {other:?}"),
1398        }
1399        for name in ["mid", "leaf"] {
1400            match cancelled_for(&result.outcomes, &tid("p", name)) {
1401                CancelledRecord::UpstreamCancelled { task, upstream } => {
1402                    assert_eq!(task, &tid("p", name));
1403                    assert_eq!(
1404                        upstream,
1405                        &tid("p", "root"),
1406                        "cascade attributes the root cancelled task to {name}",
1407                    );
1408                }
1409                other => {
1410                    panic!("expected UpstreamCancelled for {name}, got {other:?}")
1411                }
1412            }
1413        }
1414    }
1415
1416    #[tokio::test]
1417    async fn exec_015_cancelled_run_does_not_produce_cache_entry() {
1418        // Run 1 cancels mid-flight; EXEC-015 forbids the
1419        // cache store. Run 2 over the same fixture (fresh
1420        // never-cancelled token, fresh observer/spawner)
1421        // MUST be a cache miss (FreshRun), confirming the
1422        // cancelled run left nothing behind.
1423        let p = make_project("p", BTreeSet::new(), vec![make_task("solo")]);
1424        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
1425        let g = make_graph(vec![tid("p", "solo")], vec![]);
1426        let fixture = Fixture::new(ws, g);
1427
1428        // Run 1: cancel mid-flight.
1429        {
1430            let spawner1 = MockProcessSpawner::new();
1431            spawner1.push_spec(exit_on_terminate_spec(0));
1432            let observer1 = Recorder::default();
1433            let ctx1 = make_ctx(&fixture, &spawner1, &observer1);
1434            let trigger_cancel = fixture.cancel.clone();
1435            let trigger = async move {
1436                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
1437                trigger_cancel.cancel();
1438            };
1439            let (run1, ()) = tokio::join!(run_graph(&ctx1, 1), trigger);
1440            let run1 = run1.unwrap();
1441            match cancelled_for(&run1.outcomes, &tid("p", "solo")) {
1442                CancelledRecord::SignaledInFlight { .. } => {}
1443                other => panic!("run 1 expected SignaledInFlight, got {other:?}"),
1444            }
1445        }
1446
1447        // Run 2: fresh token (the fixture's was cancelled),
1448        // ExitImmediately mock, no cancel-trigger.
1449        let fresh_cancel = CancellationToken::new();
1450        let spawner2 = MockProcessSpawner::new();
1451        push_n_default_specs(&spawner2, 1);
1452        let observer2 = Recorder::default();
1453        let ctx2 = make_ctx_with_cancel(&fixture, &spawner2, &observer2, &fresh_cancel);
1454        let run2 = run_graph(&ctx2, 2).await.unwrap();
1455
1456        let rec2 = completed_for(&run2.outcomes, &tid("p", "solo"));
1457        assert_eq!(
1458            rec2.source,
1459            RunSource::FreshRun,
1460            "run 2 must be a fresh run; a cache hit would mean run 1 stored an entry",
1461        );
1462        assert_eq!(rec2.state, RunState::Succeeded);
1463        // Exactly one fresh spawn occurred in run 2 (the
1464        // mock recorded it); run 1 also had one, but on a
1465        // separate spawner.
1466        assert_eq!(spawner2.spawns().len(), 1);
1467    }
1468
1469    #[tokio::test]
1470    async fn exec_010_cancel_one_subgraph_does_not_halt_another() {
1471        // Two independent tasks (no edges between them).
1472        // Cap=2 so both are admitted. Task `fast` uses
1473        // ExitImmediately and completes before the trigger
1474        // fires; task `slow` uses OnTerminate and is
1475        // blocked when the trigger fires. The cancel
1476        // therefore catches only `slow` in flight; `fast`'s
1477        // Completed outcome must NOT be reclassified by the
1478        // cancellation flow.
1479        //
1480        // Distinct commands so cache keys do not collide
1481        // (CACHE-001 content addressing).
1482        let task_fast = make_task_with("fast", &["echo", "fast"], None);
1483        let task_slow = make_task_with("slow", &["echo", "slow"], None);
1484        let p = make_project("p", BTreeSet::new(), vec![task_fast, task_slow]);
1485        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
1486        let g = make_graph(vec![tid("p", "fast"), tid("p", "slow")], vec![]);
1487        let fixture = Fixture::new(ws, g);
1488        let spawner = MockProcessSpawner::new();
1489        // BTreeSet admission order is canonical lex: "fast"
1490        // then "slow". The mock spawner pops specs in FIFO
1491        // order, so push `fast`'s spec first.
1492        spawner.push_spec(MockSpec::default());
1493        spawner.push_spec(exit_on_terminate_spec(0));
1494        let observer = Recorder::default();
1495        let ctx = make_ctx(&fixture, &spawner, &observer);
1496
1497        let trigger_cancel = fixture.cancel.clone();
1498        let trigger = async move {
1499            // Give the scheduler time to admit both tasks,
1500            // complete fast's lookup+spawn, and have slow
1501            // sitting in its spawn-step wait.
1502            tokio::time::sleep(std::time::Duration::from_millis(30)).await;
1503            trigger_cancel.cancel();
1504        };
1505
1506        let (result, ()) = tokio::join!(run_graph(&ctx, 1), trigger);
1507        let result = result.unwrap();
1508
1509        assert_eq!(result.outcomes.len(), 2);
1510        let fast_rec = completed_for(&result.outcomes, &tid("p", "fast"));
1511        assert_eq!(fast_rec.state, RunState::Succeeded);
1512        match cancelled_for(&result.outcomes, &tid("p", "slow")) {
1513            CancelledRecord::SignaledInFlight { .. } => {}
1514            other => panic!("expected SignaledInFlight for slow, got {other:?}"),
1515        }
1516    }
1517}