haz-exec 0.1.0

Async task execution engine for haz.
Documentation
//! Chunk 10 scenario (a): cross-project hard edge with cache.
//!
//! Two projects `lib` and `app`. `app:build` hard-deps
//! `lib:compile` and consumes `lib:compile`'s output via a
//! workspace-absolute input path. Three scenarios:
//!
//! - Fresh run: both tasks run, both `RunSource::FreshRun`,
//!   both reach the cache as a store.
//! - Cache hit: second run against the same cache returns both
//!   tasks as `RunSource::CacheHit`.
//! - Failure cascade: `lib:compile` fails; `app:build` is
//!   surfaced as `Skipped(UpstreamFailed { upstream:
//!   lib:compile })` per `EXEC-011`.
//!
//! The cache-hit scenario is duplicated against a tempfile-rooted
//! `StdFilesystem` so the publish-side path (two-phase store +
//! rename) is exercised under real-FS semantics. The other two
//! scenarios stay on `MemFilesystem`.

mod common;

use std::path::Path;

use common::{
    Fixture, Recorder, TaskBuilder, completed_for, hard_edge, host_path, make_graph, make_project,
    make_workspace_at, nested_project_root, push_n_default_specs, push_spec_with_exit, skipped_for,
    tid,
};
use haz_domain::settings::WorkspaceSettings;
use haz_domain::task::Task;
use haz_exec::mock_impl::MockProcessSpawner;
use haz_exec::run_graph::run_graph;
use haz_exec::run_task::{RunSource, RunState, SkipCause};
use haz_vfs::WritableFilesystem;
use tokio_util::sync::CancellationToken;

const WORKSPACE_HOST_MEM: &str = "/ws";
const LIB_OUT_BYTES: &[u8] = b"lib-output-payload";
const APP_BIN_BYTES: &[u8] = b"app-binary-payload";

/// Build the workspace under test: project `lib` rooted at `/lib`
/// with a `compile` task; project `app` rooted at `/app` with a
/// `build` task that hard-deps `lib:compile`. The shape is
/// independent of which filesystem hosts it.
fn build_workspace_and_graph() -> (haz_domain::workspace::Workspace, haz_dag::graph::TaskGraph) {
    let lib_compile: Task = TaskBuilder::new("compile").output("out.o").build();
    let app_build: Task = TaskBuilder::new("build")
        .input("/lib/out.o")
        .output("bin")
        .dep("lib:compile")
        .build();

    let lib_project = make_project("lib", nested_project_root("/lib"), vec![lib_compile]);
    let app_project = make_project("app", nested_project_root("/app"), vec![app_build]);

    let workspace_host = workspace_host_path_mem();
    let workspace = make_workspace_at(
        &workspace_host,
        vec![lib_project, app_project],
        WorkspaceSettings::default(),
    );
    let graph = make_graph(
        vec![tid("lib", "compile"), tid("app", "build")],
        vec![hard_edge(tid("lib", "compile"), tid("app", "build"))],
    );
    (workspace, graph)
}

fn workspace_host_path_mem() -> std::path::PathBuf {
    std::path::PathBuf::from(WORKSPACE_HOST_MEM)
}

/// Pre-write `lib:compile` and `app:build`'s declared output files
/// so the executor's `resolve_output_files` pass succeeds for a
/// fresh run (the mock command `true` itself writes nothing).
fn pre_write_outputs<F: WritableFilesystem>(fs: &F, workspace_host: &Path) {
    fs.write_file(&host_path(workspace_host, "/lib/out.o"), LIB_OUT_BYTES)
        .unwrap();
    fs.write_file(&host_path(workspace_host, "/app/bin"), APP_BIN_BYTES)
        .unwrap();
}

// ============================================================
// MemFs: fresh run -> cache store
// ============================================================

#[tokio::test]
async fn cross_project_hard_edge_fresh_run_stores_both() {
    let (workspace, graph) = build_workspace_and_graph();
    let workspace_host = workspace_host_path_mem();
    let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
    pre_write_outputs(fixture.cache.fs(), &workspace_host);

    let spawner = MockProcessSpawner::new();
    push_n_default_specs(&spawner, 2);
    let observer = Recorder::default();
    let ctx = common::make_ctx(&fixture, &spawner, &observer);

    let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

    for t in [tid("lib", "compile"), tid("app", "build")] {
        let rec = completed_for(&result.outcomes, &t);
        assert_eq!(rec.state, RunState::Succeeded);
        assert_eq!(rec.source, RunSource::FreshRun);
    }

    // Topological order: lib:compile starts before app:build.
    let started = observer.started_order();
    assert_eq!(started, vec![tid("lib", "compile"), tid("app", "build")]);

    assert!(result.invariant_violations.is_empty());
    assert!(result.task_errors.is_empty());
    assert_eq!(spawner.spawns().len(), 2);
}

// ============================================================
// MemFs: second run -> both cache hit
// ============================================================

#[tokio::test]
async fn cross_project_hard_edge_second_run_hits_cache() {
    let (workspace, graph) = build_workspace_and_graph();
    let workspace_host = workspace_host_path_mem();
    let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
    pre_write_outputs(fixture.cache.fs(), &workspace_host);

    // Run 1: fresh -> both stored.
    let spawner1 = MockProcessSpawner::new();
    push_n_default_specs(&spawner1, 2);
    let observer1 = Recorder::default();
    let ctx1 = common::make_ctx(&fixture, &spawner1, &observer1);
    let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
    for t in [tid("lib", "compile"), tid("app", "build")] {
        assert_eq!(
            completed_for(&result1.outcomes, &t).source,
            RunSource::FreshRun,
        );
    }

    // Run 2: same cache, fresh observer + spawner + cancel token.
    // Both tasks lookup -> hit; restore replays the recorded
    // outputs. No spawn happens.
    let spawner2 = MockProcessSpawner::new();
    let observer2 = Recorder::default();
    let cancel2 = CancellationToken::new();
    let ctx2 = haz_exec::run_task::RunContext {
        fs: fixture.cache.fs(),
        cache: &fixture.cache,
        spawner: &spawner2,
        observer: &observer2,
        workspace: &fixture.workspace,
        graph: &fixture.graph,
        host_env: &fixture.host_env,
        algo: haz_domain::settings::cache::HashAlgo::Blake3,
        cancel: &cancel2,
    };
    let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();

    for t in [tid("lib", "compile"), tid("app", "build")] {
        let rec = completed_for(&result2.outcomes, &t);
        assert_eq!(rec.state, RunState::Succeeded);
        assert_eq!(rec.source, RunSource::CacheHit);
    }
    assert_eq!(
        spawner2.spawns().len(),
        0,
        "cache hits must NOT spawn processes (MUTEX-007, EXEC-007 step 2)"
    );
    assert!(result2.invariant_violations.is_empty());
}

// ============================================================
// MemFs: failure cascade across projects
// ============================================================

#[tokio::test]
async fn cross_project_hard_edge_failure_cascades_to_downstream_project() {
    let (workspace, graph) = build_workspace_and_graph();
    let workspace_host = workspace_host_path_mem();
    let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
    // lib:compile's outputs are NOT pre-written: the failed run
    // never reaches `resolve_output_files`.

    let spawner = MockProcessSpawner::new();
    // lib:compile exits non-zero; app:build never spawns
    // (cascade-skipped).
    push_spec_with_exit(&spawner, 1);
    let observer = Recorder::default();
    let ctx = common::make_ctx(&fixture, &spawner, &observer);

    let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

    let lib_rec = completed_for(&result.outcomes, &tid("lib", "compile"));
    assert_eq!(lib_rec.state, RunState::Failed);
    assert_eq!(lib_rec.source, RunSource::FreshRun);

    // app:build is cascade-skipped per EXEC-011 with the upstream
    // identifier pointing at lib:compile (cross-project cascade).
    let app_skip = skipped_for(&result.outcomes, &tid("app", "build"));
    assert_eq!(app_skip.task, tid("app", "build"));
    match &app_skip.cause {
        SkipCause::UpstreamFailed { upstream } => {
            assert_eq!(upstream, &tid("lib", "compile"));
        }
        other => panic!("expected UpstreamFailed for app:build, got {other:?}"),
    }

    assert_eq!(
        spawner.spawns().len(),
        1,
        "app:build must not spawn after lib:compile fails"
    );
    assert!(result.invariant_violations.is_empty());
}

// ============================================================
// StdFs: same fresh-run-then-cache-hit shape, real FS publish
// ============================================================

#[tokio::test]
async fn cross_project_hard_edge_cache_hit_on_real_filesystem() {
    // Tempdir lifetime spans both runs and the cache (which holds
    // the StdFilesystem internally). Dropping the TempDir at end
    // of scope removes the workspace and its `.haz/cache`.
    let tempdir = tempfile::TempDir::new().expect("tempdir");
    let workspace_host = tempdir.path().to_path_buf();

    // Workspace shape is host-path-agnostic; build it against the
    // tempfile path.
    let lib_compile: Task = TaskBuilder::new("compile").output("out.o").build();
    let app_build: Task = TaskBuilder::new("build")
        .input("/lib/out.o")
        .output("bin")
        .dep("lib:compile")
        .build();
    let lib_project = make_project("lib", nested_project_root("/lib"), vec![lib_compile]);
    let app_project = make_project("app", nested_project_root("/app"), vec![app_build]);
    let workspace = make_workspace_at(
        &workspace_host,
        vec![lib_project, app_project],
        WorkspaceSettings::default(),
    );
    let graph = make_graph(
        vec![tid("lib", "compile"), tid("app", "build")],
        vec![hard_edge(tid("lib", "compile"), tid("app", "build"))],
    );
    let fixture = Fixture::new_std(&workspace_host, workspace, graph);
    pre_write_outputs(fixture.cache.fs(), &workspace_host);

    // Run 1: both fresh, both stored on the real FS.
    let spawner1 = MockProcessSpawner::new();
    push_n_default_specs(&spawner1, 2);
    let observer1 = Recorder::default();
    let ctx1 = common::make_ctx(&fixture, &spawner1, &observer1);
    let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
    for t in [tid("lib", "compile"), tid("app", "build")] {
        let rec = completed_for(&result1.outcomes, &t);
        assert_eq!(rec.state, RunState::Succeeded);
        assert_eq!(rec.source, RunSource::FreshRun);
    }

    // Run 2: same cache, fresh observer + spawner + cancel
    // token. Both tasks lookup -> hit; restore replays the
    // manifests' recorded outputs to the real FS.
    let spawner2 = MockProcessSpawner::new();
    let observer2 = Recorder::default();
    let cancel2 = CancellationToken::new();
    let ctx2 = haz_exec::run_task::RunContext {
        fs: fixture.cache.fs(),
        cache: &fixture.cache,
        spawner: &spawner2,
        observer: &observer2,
        workspace: &fixture.workspace,
        graph: &fixture.graph,
        host_env: &fixture.host_env,
        algo: haz_domain::settings::cache::HashAlgo::Blake3,
        cancel: &cancel2,
    };
    let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();

    for t in [tid("lib", "compile"), tid("app", "build")] {
        let rec = completed_for(&result2.outcomes, &t);
        assert_eq!(rec.state, RunState::Succeeded);
        assert_eq!(rec.source, RunSource::CacheHit);
    }
    assert_eq!(spawner2.spawns().len(), 0);

    // The cache directory exists under the workspace root, per
    // the layout the cache builds in `Cache::new`.
    assert!(workspace_host.join(".haz").join("cache").exists());
}