haz-exec 0.1.0

Async task execution engine for haz.
Documentation
//! Chunk 10 scenario (b): workspace-global mutex contended
//! across two projects.
//!
//! Three tasks share a single workspace-global exclusive mutex
//! (`mutex: db`) and have no dependencies between them:
//!
//! - `db_a:migrate` (project `db_a`)
//! - `db_a:seed`    (project `db_a`)
//! - `db_b:migrate` (project `db_b`)
//!
//! The global cap is fixed at 4, strictly larger than the number
//! of tasks, so the cap itself imposes no ordering. The only
//! ordering constraint is the mutex per `MUTEX-001`, `MUTEX-003`,
//! `MUTEX-005`, `MUTEX-006`, and `EXEC-006` condition 3.
//!
//! The assertion proves the cross-project effect of the mutex:
//! `db_b:migrate` (in project `db_b`) cannot complete before
//! either `db_a:*` task (in project `db_a`) has finished, which
//! is only possible if the workspace-global scope spans projects.

mod common;

use common::{
    Fixture, Recorder, TaskBuilder, completed_for, make_graph, make_project, make_workspace_at,
    nested_project_root, push_n_default_specs, tid, workspace_settings_with_fixed_cap,
};
use haz_domain::mutex::{MutexMode, MutexScope};
use haz_domain::task::Task;
use haz_exec::mock_impl::MockProcessSpawner;
use haz_exec::run_graph::run_graph;
use haz_exec::run_task::{RunSource, RunState};

const WORKSPACE_HOST: &str = "/ws";
const MUTEX_NAME: &str = "db";

#[tokio::test]
async fn workspace_global_exclusive_mutex_serialises_three_tasks_across_two_projects() {
    // Distinct argvs so each task carries a distinct cache key
    // (`CACHE-001`'s content addressing would otherwise have
    // tasks 2 and 3 hit the entry stored by task 1, hiding the
    // mutex effect this scenario is meant to prove).
    let migrate_a: Task = TaskBuilder::new("migrate")
        .command(&["echo", "db_a:migrate"])
        .mutex(MutexScope::Workspace, MUTEX_NAME, MutexMode::Exclusive)
        .build();
    let seed_a: Task = TaskBuilder::new("seed")
        .command(&["echo", "db_a:seed"])
        .mutex(MutexScope::Workspace, MUTEX_NAME, MutexMode::Exclusive)
        .build();
    let migrate_b: Task = TaskBuilder::new("migrate")
        .command(&["echo", "db_b:migrate"])
        .mutex(MutexScope::Workspace, MUTEX_NAME, MutexMode::Exclusive)
        .build();

    let project_a = make_project(
        "db_a",
        nested_project_root("/db_a"),
        vec![migrate_a, seed_a],
    );
    let project_b = make_project("db_b", nested_project_root("/db_b"), vec![migrate_b]);

    let workspace_host = std::path::PathBuf::from(WORKSPACE_HOST);
    let workspace = make_workspace_at(
        &workspace_host,
        vec![project_a, project_b],
        workspace_settings_with_fixed_cap(4),
    );
    let graph = make_graph(
        vec![
            tid("db_a", "migrate"),
            tid("db_a", "seed"),
            tid("db_b", "migrate"),
        ],
        Vec::new(),
    );
    let fixture = Fixture::new_mem(&workspace_host, workspace, graph);

    let spawner = MockProcessSpawner::new();
    push_n_default_specs(&spawner, 3);
    let observer = Recorder::default();
    let ctx = common::make_ctx(&fixture, &spawner, &observer);

    let result = run_graph(&ctx, 1_700_000_000).await.unwrap();

    let canonical = [
        tid("db_a", "migrate"),
        tid("db_a", "seed"),
        tid("db_b", "migrate"),
    ];

    // Every task completes successfully, fresh.
    for task in &canonical {
        let rec = completed_for(&result.outcomes, task);
        assert_eq!(rec.state, RunState::Succeeded);
        assert_eq!(rec.source, RunSource::FreshRun);
    }
    assert_eq!(spawner.spawns().len(), 3);
    assert!(result.invariant_violations.is_empty());
    assert!(result.task_errors.is_empty());

    // `MUTEX-005` invariant: a task that yields on contention does
    // NOT refire its `Started` event when re-admitted. Each of the
    // three tasks fires exactly one `Started`.
    for task in &canonical {
        assert_eq!(
            observer.count_started(task),
            1,
            "task {task:?} must observe exactly one Started event",
        );
    }

    // The mutex serialises completions in canonical `(project,
    // task)` order: each holder must release before the next
    // holder can acquire (`MUTEX-003`, `MUTEX-006`). The third
    // task lives in a different project than the first two, so
    // its position after both proves the workspace-global scope
    // spans projects (`MUTEX-001`).
    let events = observer.events();
    let finished_migrate_a = observer.pos_finished(&tid("db_a", "migrate"));
    let finished_seed_a = observer.pos_finished(&tid("db_a", "seed"));
    let finished_migrate_b = observer.pos_finished(&tid("db_b", "migrate"));
    assert!(
        finished_migrate_a < finished_seed_a,
        "db_a:migrate must finish before db_a:seed under mutex serialisation: \
         finished_migrate_a={finished_migrate_a}, finished_seed_a={finished_seed_a}, \
         events={events:?}",
    );
    assert!(
        finished_seed_a < finished_migrate_b,
        "db_a:seed must finish before db_b:migrate (cross-project serialisation): \
         finished_seed_a={finished_seed_a}, finished_migrate_b={finished_migrate_b}, \
         events={events:?}",
    );
}