mod common;
use std::path::Path;
use common::{
Fixture, Recorder, TaskBuilder, completed_for, hard_edge, host_path, make_graph, make_project,
make_workspace_at, nested_project_root, push_n_default_specs, push_spec_with_exit, skipped_for,
tid,
};
use haz_domain::settings::WorkspaceSettings;
use haz_domain::task::Task;
use haz_exec::mock_impl::MockProcessSpawner;
use haz_exec::run_graph::run_graph;
use haz_exec::run_task::{RunSource, RunState, SkipCause};
use haz_vfs::WritableFilesystem;
use tokio_util::sync::CancellationToken;
const WORKSPACE_HOST_MEM: &str = "/ws";
const LIB_OUT_BYTES: &[u8] = b"lib-output-payload";
const APP_BIN_BYTES: &[u8] = b"app-binary-payload";
fn build_workspace_and_graph() -> (haz_domain::workspace::Workspace, haz_dag::graph::TaskGraph) {
let lib_compile: Task = TaskBuilder::new("compile").output("out.o").build();
let app_build: Task = TaskBuilder::new("build")
.input("/lib/out.o")
.output("bin")
.dep("lib:compile")
.build();
let lib_project = make_project("lib", nested_project_root("/lib"), vec![lib_compile]);
let app_project = make_project("app", nested_project_root("/app"), vec![app_build]);
let workspace_host = workspace_host_path_mem();
let workspace = make_workspace_at(
&workspace_host,
vec![lib_project, app_project],
WorkspaceSettings::default(),
);
let graph = make_graph(
vec![tid("lib", "compile"), tid("app", "build")],
vec![hard_edge(tid("lib", "compile"), tid("app", "build"))],
);
(workspace, graph)
}
fn workspace_host_path_mem() -> std::path::PathBuf {
std::path::PathBuf::from(WORKSPACE_HOST_MEM)
}
fn pre_write_outputs<F: WritableFilesystem>(fs: &F, workspace_host: &Path) {
fs.write_file(&host_path(workspace_host, "/lib/out.o"), LIB_OUT_BYTES)
.unwrap();
fs.write_file(&host_path(workspace_host, "/app/bin"), APP_BIN_BYTES)
.unwrap();
}
#[tokio::test]
async fn cross_project_hard_edge_fresh_run_stores_both() {
let (workspace, graph) = build_workspace_and_graph();
let workspace_host = workspace_host_path_mem();
let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
pre_write_outputs(fixture.cache.fs(), &workspace_host);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = common::make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
for t in [tid("lib", "compile"), tid("app", "build")] {
let rec = completed_for(&result.outcomes, &t);
assert_eq!(rec.state, RunState::Succeeded);
assert_eq!(rec.source, RunSource::FreshRun);
}
let started = observer.started_order();
assert_eq!(started, vec![tid("lib", "compile"), tid("app", "build")]);
assert!(result.invariant_violations.is_empty());
assert!(result.task_errors.is_empty());
assert_eq!(spawner.spawns().len(), 2);
}
#[tokio::test]
async fn cross_project_hard_edge_second_run_hits_cache() {
let (workspace, graph) = build_workspace_and_graph();
let workspace_host = workspace_host_path_mem();
let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
pre_write_outputs(fixture.cache.fs(), &workspace_host);
let spawner1 = MockProcessSpawner::new();
push_n_default_specs(&spawner1, 2);
let observer1 = Recorder::default();
let ctx1 = common::make_ctx(&fixture, &spawner1, &observer1);
let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
for t in [tid("lib", "compile"), tid("app", "build")] {
assert_eq!(
completed_for(&result1.outcomes, &t).source,
RunSource::FreshRun,
);
}
let spawner2 = MockProcessSpawner::new();
let observer2 = Recorder::default();
let cancel2 = CancellationToken::new();
let ctx2 = haz_exec::run_task::RunContext {
fs: fixture.cache.fs(),
cache: &fixture.cache,
spawner: &spawner2,
observer: &observer2,
workspace: &fixture.workspace,
graph: &fixture.graph,
host_env: &fixture.host_env,
algo: haz_domain::settings::cache::HashAlgo::Blake3,
cancel: &cancel2,
};
let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();
for t in [tid("lib", "compile"), tid("app", "build")] {
let rec = completed_for(&result2.outcomes, &t);
assert_eq!(rec.state, RunState::Succeeded);
assert_eq!(rec.source, RunSource::CacheHit);
}
assert_eq!(
spawner2.spawns().len(),
0,
"cache hits must NOT spawn processes (MUTEX-007, EXEC-007 step 2)"
);
assert!(result2.invariant_violations.is_empty());
}
#[tokio::test]
async fn cross_project_hard_edge_failure_cascades_to_downstream_project() {
let (workspace, graph) = build_workspace_and_graph();
let workspace_host = workspace_host_path_mem();
let fixture = Fixture::new_mem(&workspace_host, workspace, graph);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 1);
let observer = Recorder::default();
let ctx = common::make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
let lib_rec = completed_for(&result.outcomes, &tid("lib", "compile"));
assert_eq!(lib_rec.state, RunState::Failed);
assert_eq!(lib_rec.source, RunSource::FreshRun);
let app_skip = skipped_for(&result.outcomes, &tid("app", "build"));
assert_eq!(app_skip.task, tid("app", "build"));
match &app_skip.cause {
SkipCause::UpstreamFailed { upstream } => {
assert_eq!(upstream, &tid("lib", "compile"));
}
other => panic!("expected UpstreamFailed for app:build, got {other:?}"),
}
assert_eq!(
spawner.spawns().len(),
1,
"app:build must not spawn after lib:compile fails"
);
assert!(result.invariant_violations.is_empty());
}
#[tokio::test]
async fn cross_project_hard_edge_cache_hit_on_real_filesystem() {
let tempdir = tempfile::TempDir::new().expect("tempdir");
let workspace_host = tempdir.path().to_path_buf();
let lib_compile: Task = TaskBuilder::new("compile").output("out.o").build();
let app_build: Task = TaskBuilder::new("build")
.input("/lib/out.o")
.output("bin")
.dep("lib:compile")
.build();
let lib_project = make_project("lib", nested_project_root("/lib"), vec![lib_compile]);
let app_project = make_project("app", nested_project_root("/app"), vec![app_build]);
let workspace = make_workspace_at(
&workspace_host,
vec![lib_project, app_project],
WorkspaceSettings::default(),
);
let graph = make_graph(
vec![tid("lib", "compile"), tid("app", "build")],
vec![hard_edge(tid("lib", "compile"), tid("app", "build"))],
);
let fixture = Fixture::new_std(&workspace_host, workspace, graph);
pre_write_outputs(fixture.cache.fs(), &workspace_host);
let spawner1 = MockProcessSpawner::new();
push_n_default_specs(&spawner1, 2);
let observer1 = Recorder::default();
let ctx1 = common::make_ctx(&fixture, &spawner1, &observer1);
let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
for t in [tid("lib", "compile"), tid("app", "build")] {
let rec = completed_for(&result1.outcomes, &t);
assert_eq!(rec.state, RunState::Succeeded);
assert_eq!(rec.source, RunSource::FreshRun);
}
let spawner2 = MockProcessSpawner::new();
let observer2 = Recorder::default();
let cancel2 = CancellationToken::new();
let ctx2 = haz_exec::run_task::RunContext {
fs: fixture.cache.fs(),
cache: &fixture.cache,
spawner: &spawner2,
observer: &observer2,
workspace: &fixture.workspace,
graph: &fixture.graph,
host_env: &fixture.host_env,
algo: haz_domain::settings::cache::HashAlgo::Blake3,
cancel: &cancel2,
};
let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();
for t in [tid("lib", "compile"), tid("app", "build")] {
let rec = completed_for(&result2.outcomes, &t);
assert_eq!(rec.state, RunState::Succeeded);
assert_eq!(rec.source, RunSource::CacheHit);
}
assert_eq!(spawner2.spawns().len(), 0);
assert!(workspace_host.join(".haz").join("cache").exists());
}