use std::collections::HashMap;
use haz_domain::path::CanonicalPath;
use haz_domain::task_id::TaskId;
use crate::run_graph::RuntimeInvariantViolation;
pub(super) fn check_and_record_output_overlap(
output_claims: &mut HashMap<CanonicalPath, TaskId>,
invariant_violations: &mut Vec<RuntimeInvariantViolation>,
task: &TaskId,
materialised_outputs: &[CanonicalPath],
) -> bool {
let mut any_overlap = false;
for path in materialised_outputs {
match output_claims.get(path) {
Some(claimant) if claimant != task => {
invariant_violations.push(RuntimeInvariantViolation::OutputOverlap {
first_task: claimant.clone(),
second_task: task.clone(),
shared_path: path.clone(),
});
any_overlap = true;
}
Some(_) | None => {
output_claims
.entry(path.clone())
.or_insert_with(|| task.clone());
}
}
}
any_overlap
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::path::Path;
use haz_domain::path::OutputSpec;
use haz_domain::task::Task;
use haz_domain::task_id::TaskId;
use haz_vfs::WritableFilesystem;
use tokio_util::sync::CancellationToken;
use crate::mock_impl::MockProcessSpawner;
use crate::run_graph::RuntimeInvariantViolation;
use crate::run_graph::scheduler::run_graph;
use crate::run_graph::test_fixtures::*;
use crate::run_task::{RunSource, RunState};
#[tokio::test]
async fn exec_020_two_tasks_share_literal_output_records_overlap() {
let task_a = make_task_with_outputs("a", vec![OutputSpec::parse("out.bin").unwrap()]);
let task_b = make_task_with_outputs("b", vec![OutputSpec::parse("out.bin").unwrap()]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/out.bin"), b"contents")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
let rec_a = completed_for(&result.outcomes, &tid("p", "a"));
assert_eq!(rec_a.state, RunState::Succeeded);
let rec_b = completed_for(&result.outcomes, &tid("p", "b"));
assert_eq!(rec_b.state, RunState::Succeeded);
assert_eq!(result.invariant_violations.len(), 1);
match &result.invariant_violations[0] {
RuntimeInvariantViolation::OutputOverlap {
first_task,
second_task,
shared_path,
} => {
assert_eq!(first_task, &tid("p", "a"));
assert_eq!(second_task, &tid("p", "b"));
assert_eq!(shared_path, &workspace_absolute_canonical("/out.bin"));
}
other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
panic!("expected OutputOverlap, got {other:?}")
}
}
}
#[tokio::test]
async fn exec_020_three_tasks_share_literal_output_records_two_overlaps() {
let ts: Vec<Task> = ["a", "b", "c"]
.into_iter()
.map(|n| make_task_with_outputs(n, vec![OutputSpec::parse("out.bin").unwrap()]))
.collect();
let p = make_project("p", BTreeSet::new(), ts);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(3)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/out.bin"), b"contents")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(result.outcomes.len(), 3);
for n in ["a", "b", "c"] {
let rec = completed_for(&result.outcomes, &tid("p", n));
assert_eq!(rec.state, RunState::Succeeded);
}
assert_eq!(result.invariant_violations.len(), 2);
let winner = match &result.invariant_violations[0] {
RuntimeInvariantViolation::OutputOverlap { first_task, .. } => first_task.clone(),
other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
panic!("expected OutputOverlap, got {other:?}")
}
};
let mut second_tasks: BTreeSet<TaskId> = BTreeSet::new();
for v in &result.invariant_violations {
match v {
RuntimeInvariantViolation::OutputOverlap {
first_task,
second_task,
shared_path,
} => {
assert_eq!(first_task, &winner);
assert_ne!(second_task, &winner);
assert_eq!(shared_path, &workspace_absolute_canonical("/out.bin"));
second_tasks.insert(second_task.clone());
}
other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
panic!("expected OutputOverlap, got {other:?}")
}
}
}
let all_three: BTreeSet<TaskId> =
BTreeSet::from([tid("p", "a"), tid("p", "b"), tid("p", "c")]);
let losers: BTreeSet<TaskId> = all_three
.iter()
.filter(|t| *t != &winner)
.cloned()
.collect();
assert_eq!(second_tasks, losers);
}
#[tokio::test]
async fn exec_020_in_flight_unrelated_task_completes_naturally_after_detection() {
let task_a = make_task_with_outputs("a", vec![OutputSpec::parse("shared.bin").unwrap()]);
let task_b = make_task_with_outputs("b", vec![OutputSpec::parse("shared.bin").unwrap()]);
let task_c = make_task_with_outputs("c", vec![OutputSpec::parse("other.bin").unwrap()]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b, task_c]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(3)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/shared.bin"), b"shared")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/other.bin"), b"other")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(result.outcomes.len(), 3);
let rec_a = completed_for(&result.outcomes, &tid("p", "a"));
assert_eq!(rec_a.state, RunState::Succeeded);
let rec_b = completed_for(&result.outcomes, &tid("p", "b"));
assert_eq!(rec_b.state, RunState::Succeeded);
let rec_c = completed_for(&result.outcomes, &tid("p", "c"));
assert_eq!(rec_c.state, RunState::Succeeded);
assert_eq!(result.invariant_violations.len(), 1);
match &result.invariant_violations[0] {
RuntimeInvariantViolation::OutputOverlap {
first_task,
second_task,
shared_path,
} => {
let pair = BTreeSet::from([first_task.clone(), second_task.clone()]);
assert_eq!(pair, BTreeSet::from([tid("p", "a"), tid("p", "b")]));
assert_eq!(shared_path, &workspace_absolute_canonical("/shared.bin"));
}
other @ RuntimeInvariantViolation::RuntimeCycle { .. } => {
panic!("expected OutputOverlap, got {other:?}")
}
}
}
#[tokio::test]
async fn exec_020_materialised_outputs_populated_on_fresh_run() {
let task =
make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "build")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/artifact.bin"), b"bytes")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
let rec = completed_for(&result.outcomes, &tid("p", "build"));
assert_eq!(rec.source, RunSource::FreshRun);
assert_eq!(
rec.materialised_outputs,
vec![workspace_absolute_canonical("/artifact.bin")]
);
assert!(result.invariant_violations.is_empty());
}
#[tokio::test]
async fn exec_020_materialised_outputs_populated_on_cache_hit() {
let task =
make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "build")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/artifact.bin"), b"bytes")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer1 = Recorder::default();
let ctx1 = make_ctx(&fixture, &spawner, &observer1);
let result1 = run_graph(&ctx1, 1_700_000_000).await.unwrap();
assert_eq!(
completed_for(&result1.outcomes, &tid("p", "build")).source,
RunSource::FreshRun
);
let observer2 = Recorder::default();
let token2 = CancellationToken::new();
let ctx2 = make_ctx_with_cancel(&fixture, &spawner, &observer2, &token2);
let result2 = run_graph(&ctx2, 1_700_000_000).await.unwrap();
let rec = completed_for(&result2.outcomes, &tid("p", "build"));
assert_eq!(rec.source, RunSource::CacheHit);
assert_eq!(
rec.materialised_outputs,
vec![workspace_absolute_canonical("/artifact.bin")]
);
assert!(result2.invariant_violations.is_empty());
assert_eq!(spawner.spawns().len(), 1);
}
#[tokio::test]
async fn exec_020_materialised_outputs_empty_on_failed_run() {
let task =
make_task_with_outputs("build", vec![OutputSpec::parse("artifact.bin").unwrap()]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "build")], vec![]);
let fixture = Fixture::new(ws, g);
let spawner = MockProcessSpawner::new();
push_spec_with_exit(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
let rec = completed_for(&result.outcomes, &tid("p", "build"));
assert_eq!(rec.state, RunState::Failed);
assert!(rec.materialised_outputs.is_empty());
assert!(result.invariant_violations.is_empty());
}
}