haz-exec 0.1.0

Async task execution engine for haz.
Documentation
//! Runtime `EXEC-020` output-overlap detection.
//!
//! The scheduler calls [`check_and_record_output_overlap`] after
//! every successful task completion (cache hit or fresh run) to
//! detect the runtime case `DAG-016` defers: two tasks whose
//! `outputs` patterns intersect at runtime and that both
//! materialise the same workspace-absolute path. On detection the
//! function appends a typed
//! [`RuntimeInvariantViolation::OutputOverlap`] entry to the
//! run's diagnostic vector and returns `true`; the caller uses
//! the return value to set the run's admission gate.

use std::collections::HashMap;

use haz_domain::path::CanonicalPath;
use haz_domain::task_id::TaskId;

use crate::run_graph::RuntimeInvariantViolation;

/// Inspect `materialised_outputs` against `output_claims` and
/// record an [`RuntimeInvariantViolation::OutputOverlap`] for each
/// path already claimed by a different task (`EXEC-020`).
///
/// Returns `true` iff at least one overlap was recorded; the
/// caller uses this to set the run's admission gate. Paths not
/// already claimed are inserted under `task` so subsequent
/// completions detect overlaps against this completion's outputs.
/// The map's claimant is the FIRST task to materialise a given
/// path; subsequent claimants are reported as the `second_task`
/// of each violation but do NOT overwrite the map (so a third
/// task overlapping the same path is also reported against the
/// original claimant).
pub(super) fn check_and_record_output_overlap(
    output_claims: &mut HashMap<CanonicalPath, TaskId>,
    invariant_violations: &mut Vec<RuntimeInvariantViolation>,
    task: &TaskId,
    materialised_outputs: &[CanonicalPath],
) -> bool {
    let mut any_overlap = false;
    for path in materialised_outputs {
        match output_claims.get(path) {
            Some(claimant) if claimant != task => {
                invariant_violations.push(RuntimeInvariantViolation::OutputOverlap {
                    first_task: claimant.clone(),
                    second_task: task.clone(),
                    shared_path: path.clone(),
                });
                any_overlap = true;
            }
            Some(_) | None => {
                output_claims
                    .entry(path.clone())
                    .or_insert_with(|| task.clone());
            }
        }
    }
    any_overlap
}

#[cfg(test)]
mod tests {
    use std::collections::BTreeSet;
    use std::path::Path;

    use haz_domain::path::OutputSpec;
    use haz_domain::task::Task;
    use haz_domain::task_id::TaskId;
    use haz_vfs::WritableFilesystem;
    use tokio_util::sync::CancellationToken;

    use crate::mock_impl::MockProcessSpawner;
    use crate::run_graph::RuntimeInvariantViolation;
    use crate::run_graph::scheduler::run_graph;
    use crate::run_graph::test_fixtures::*;
    use crate::run_task::{RunSource, RunState};

    #[tokio::test]
    async fn exec_020_two_tasks_share_literal_output_records_overlap() {
        // Both `a` and `b` declare the same literal output
        // path under the workspace-rooted project. The cache
        // store path materialises the file from disk; both
        // tasks claim `/out.bin` and the second to complete
        // records an OutputOverlap against the first.
        let task_a = make_task_with_outputs("a", vec![OutputSpec::parse("out.bin").unwrap()]);
        let task_b = make_task_with_outputs("b", vec![OutputSpec::parse("out.bin").unwrap()]);
        let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
        // cap=1 admits in canonical lex order: `a` first.
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
        let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
        let fixture = Fixture::new(ws, g);
        // Pre-write the shared output file so both tasks'
        // resolve_output_files calls succeed.
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/out.bin"), b"contents")
            .unwrap();
        let spawner = MockProcessSpawner::new();
        push_n_default_specs(&spawner, 2);
        let observer = Recorder::default();
        let ctx = make_ctx(&fixture, &spawner, &observer);

        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

        // Both tasks completed successfully (overlap is a
        // run-level diagnostic; per-task outcomes are honest).
        let rec_a = completed_for(&result.outcomes, &tid("p", "a"));
        assert_eq!(rec_a.state, RunState::Succeeded);
        let rec_b = completed_for(&result.outcomes, &tid("p", "b"));
        assert_eq!(rec_b.state, RunState::Succeeded);

        // Exactly one OutputOverlap, first_task=a (admitted
        // first under cap=1), second_task=b.
        assert_eq!(result.invariant_violations.len(), 1);
        match &result.invariant_violations[0] {
            RuntimeInvariantViolation::OutputOverlap {
                first_task,
                second_task,
                shared_path,
            } => {
                assert_eq!(first_task, &tid("p", "a"));
                assert_eq!(second_task, &tid("p", "b"));
                assert_eq!(shared_path, &workspace_absolute_canonical("/out.bin"));
            }
            other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
                panic!("expected OutputOverlap, got {other:?}")
            }
        }
    }

    #[tokio::test]
    async fn exec_020_three_tasks_share_literal_output_records_two_overlaps() {
        // Three tasks claim the same path. cap=3 admits all
        // three at once; all complete; each subsequent
        // claimant overlaps against the FIRST claimant (the
        // map's entry stays with the winner). Two
        // OutputOverlap violations are recorded against the
        // same first_task.
        let ts: Vec<Task> = ["a", "b", "c"]
            .into_iter()
            .map(|n| make_task_with_outputs(n, vec![OutputSpec::parse("out.bin").unwrap()]))
            .collect();
        let p = make_project("p", BTreeSet::new(), ts);
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(3)));
        let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
        let fixture = Fixture::new(ws, g);
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/out.bin"), b"contents")
            .unwrap();
        let spawner = MockProcessSpawner::new();
        push_n_default_specs(&spawner, 3);
        let observer = Recorder::default();
        let ctx = make_ctx(&fixture, &spawner, &observer);

        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

        // All three completed (none cancelled; in-flight
        // tasks were not signalled per R3).
        assert_eq!(result.outcomes.len(), 3);
        for n in ["a", "b", "c"] {
            let rec = completed_for(&result.outcomes, &tid("p", n));
            assert_eq!(rec.state, RunState::Succeeded);
        }

        // Two violations: the SAME first_task across both
        // (the chronological winner), two distinct
        // second_tasks (the other two participants).
        assert_eq!(result.invariant_violations.len(), 2);
        let winner = match &result.invariant_violations[0] {
            RuntimeInvariantViolation::OutputOverlap { first_task, .. } => first_task.clone(),
            other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
                panic!("expected OutputOverlap, got {other:?}")
            }
        };
        let mut second_tasks: BTreeSet<TaskId> = BTreeSet::new();
        for v in &result.invariant_violations {
            match v {
                RuntimeInvariantViolation::OutputOverlap {
                    first_task,
                    second_task,
                    shared_path,
                } => {
                    assert_eq!(first_task, &winner);
                    assert_ne!(second_task, &winner);
                    assert_eq!(shared_path, &workspace_absolute_canonical("/out.bin"));
                    second_tasks.insert(second_task.clone());
                }
                other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
                    panic!("expected OutputOverlap, got {other:?}")
                }
            }
        }
        let all_three: BTreeSet<TaskId> =
            BTreeSet::from([tid("p", "a"), tid("p", "b"), tid("p", "c")]);
        let losers: BTreeSet<TaskId> = all_three
            .iter()
            .filter(|t| *t != &winner)
            .cloned()
            .collect();
        assert_eq!(second_tasks, losers);
    }

    #[tokio::test]
    async fn exec_020_in_flight_unrelated_task_completes_naturally_after_detection() {
        // Three tasks admitted concurrently under cap=3: `a`
        // and `b` overlap; `c` is unrelated. R3 says
        // in-flight tasks complete naturally on overlap. `c`
        // MUST land as Completed(Succeeded), not Cancelled or
        // Skipped.
        let task_a = make_task_with_outputs("a", vec![OutputSpec::parse("shared.bin").unwrap()]);
        let task_b = make_task_with_outputs("b", vec![OutputSpec::parse("shared.bin").unwrap()]);
        let task_c = make_task_with_outputs("c", vec![OutputSpec::parse("other.bin").unwrap()]);
        let p = make_project("p", BTreeSet::new(), vec![task_a, task_b, task_c]);
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(3)));
        let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
        let fixture = Fixture::new(ws, g);
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/shared.bin"), b"shared")
            .unwrap();
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/other.bin"), b"other")
            .unwrap();
        let spawner = MockProcessSpawner::new();
        push_n_default_specs(&spawner, 3);
        let observer = Recorder::default();
        let ctx = make_ctx(&fixture, &spawner, &observer);

        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

        // All three completed (overlap doesn't cancel
        // in-flight tasks; `c` was unrelated anyway).
        assert_eq!(result.outcomes.len(), 3);
        let rec_a = completed_for(&result.outcomes, &tid("p", "a"));
        assert_eq!(rec_a.state, RunState::Succeeded);
        let rec_b = completed_for(&result.outcomes, &tid("p", "b"));
        assert_eq!(rec_b.state, RunState::Succeeded);
        let rec_c = completed_for(&result.outcomes, &tid("p", "c"));
        assert_eq!(rec_c.state, RunState::Succeeded);

        // Exactly one OutputOverlap for the shared path. The
        // {first,second}_task ordering is FuturesUnordered-
        // dependent; assert the set {a, b} regardless of
        // which completed first.
        assert_eq!(result.invariant_violations.len(), 1);
        match &result.invariant_violations[0] {
            RuntimeInvariantViolation::OutputOverlap {
                first_task,
                second_task,
                shared_path,
            } => {
                let pair = BTreeSet::from([first_task.clone(), second_task.clone()]);
                assert_eq!(pair, BTreeSet::from([tid("p", "a"), tid("p", "b")]));
                assert_eq!(shared_path, &workspace_absolute_canonical("/shared.bin"));
            }
            other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
                panic!("expected OutputOverlap, got {other:?}")
            }
        }
    }

    #[tokio::test]
    async fn exec_020_materialised_outputs_populated_on_fresh_run() {
        let task =
            make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
        let p = make_project("p", BTreeSet::new(), vec![task]);
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
        let g = make_graph(vec![tid("p", "build")], vec![]);
        let fixture = Fixture::new(ws, g);
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/artifact.bin"), b"bytes")
            .unwrap();
        let spawner = MockProcessSpawner::new();
        push_n_default_specs(&spawner, 1);
        let observer = Recorder::default();
        let ctx = make_ctx(&fixture, &spawner, &observer);

        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

        let rec = completed_for(&result.outcomes, &tid("p", "build"));
        assert_eq!(rec.source, RunSource::FreshRun);
        assert_eq!(
            rec.materialised_outputs,
            vec![workspace_absolute_canonical("/artifact.bin")]
        );
        assert!(result.invariant_violations.is_empty());
    }

    #[tokio::test]
    async fn exec_020_materialised_outputs_populated_on_cache_hit() {
        let task =
            make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
        let p = make_project("p", BTreeSet::new(), vec![task]);
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
        let g = make_graph(vec![tid("p", "build")], vec![]);
        let fixture = Fixture::new(ws, g);
        fixture
            .cache
            .fs()
            .write_file(Path::new("/ws/artifact.bin"), b"bytes")
            .unwrap();
        let spawner = MockProcessSpawner::new();
        push_n_default_specs(&spawner, 1);

        // First run: miss → store.
        let observer1 = Recorder::default();
        let ctx1 = make_ctx(&fixture, &spawner, &observer1);
        let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
        assert_eq!(
            completed_for(&result1.outcomes, &tid("p", "build")).source,
            RunSource::FreshRun
        );

        // Second run: hit. materialised_outputs come from
        // the manifest's outputs list.
        let observer2 = Recorder::default();
        let token2 = CancellationToken::new();
        let ctx2 = make_ctx_with_cancel(&fixture, &spawner, &observer2, &token2);
        let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();
        let rec = completed_for(&result2.outcomes, &tid("p", "build"));
        assert_eq!(rec.source, RunSource::CacheHit);
        assert_eq!(
            rec.materialised_outputs,
            vec![workspace_absolute_canonical("/artifact.bin")]
        );
        assert!(result2.invariant_violations.is_empty());
        assert_eq!(spawner.spawns().len(), 1);
    }

    #[tokio::test]
    async fn exec_020_materialised_outputs_empty_on_failed_run() {
        let task =
            make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
        let p = make_project("p", BTreeSet::new(), vec![task]);
        let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
        let g = make_graph(vec![tid("p", "build")], vec![]);
        let fixture = Fixture::new(ws, g);
        // Pre-writing the file is irrelevant for a failed run;
        // the resolve_output_files branch is skipped entirely.
        let spawner = MockProcessSpawner::new();
        push_spec_with_exit(&spawner, 1);
        let observer = Recorder::default();
        let ctx = make_ctx(&fixture, &spawner, &observer);

        let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

        let rec = completed_for(&result.outcomes, &tid("p", "build"));
        assert_eq!(rec.state, RunState::Failed);
        assert!(rec.materialised_outputs.is_empty());
        assert!(result.invariant_violations.is_empty());
    }
}