use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
use haz_domain::path::pattern::PathAnchor;
use haz_domain::path::{CanonicalPath, PathPattern, ProjectRoot};
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use crate::pattern_walk::{literal_workspace_segments, workspace_absolute_string_from_segments};
use crate::run_graph::RuntimeInvariantViolation;
use crate::run_graph::state::ReadyState;
use crate::run_task::{RunObserver, RunOutcome, SkipCause, SkipRecord};
pub(super) fn tasks_matching_path(workspace: &Workspace, path: &CanonicalPath) -> BTreeSet<TaskId> {
let path_workspace_absolute = path.to_string();
let mut matches = BTreeSet::new();
for (project_name, project) in &workspace.projects {
for (task_name, task) in &project.tasks {
for input in &task.inputs {
if pattern_matches_workspace_absolute_path(
input.pattern(),
&project.root,
&path_workspace_absolute,
) {
matches.insert(TaskId {
project: project_name.clone(),
task: task_name.clone(),
});
break;
}
}
}
}
matches
}
fn pattern_matches_workspace_absolute_path(
pattern: &PathPattern,
project_root: &ProjectRoot,
path_workspace_absolute: &str,
) -> bool {
match pattern {
PathPattern::Literal(haz_path) => {
let pattern_segments = literal_workspace_segments(haz_path, project_root);
let pattern_absolute = workspace_absolute_string_from_segments(&pattern_segments);
pattern_absolute == path_workspace_absolute
}
PathPattern::Glob(glob) => {
let matcher = glob.compile().compile_matcher();
match (glob.anchor(), project_root) {
(PathAnchor::WorkspaceAbsolute, _) => matcher.is_match(path_workspace_absolute),
(PathAnchor::ProjectRelative, ProjectRoot::WorkspaceRoot) => {
let stripped = path_workspace_absolute
.strip_prefix('/')
.unwrap_or(path_workspace_absolute);
matcher.is_match(stripped)
}
(PathAnchor::ProjectRelative, ProjectRoot::Nested(proj_root)) => {
let mut prefix = String::new();
for seg in proj_root.segments().iter() {
prefix.push('/');
prefix.push_str(seg.as_str());
}
let Some(remainder) = path_workspace_absolute.strip_prefix(&prefix) else {
return false;
};
let Some(rel) = remainder.strip_prefix('/') else {
return false;
};
if rel.is_empty() {
return false;
}
matcher.is_match(rel)
}
}
}
}
}
fn find_cycle_through_new_edge(
augmented: &BTreeSet<(TaskId, TaskId)>,
new_edge_from: &TaskId,
new_edge_to: &TaskId,
) -> Option<BTreeSet<TaskId>> {
let mut parent: HashMap<TaskId, TaskId> = HashMap::new();
let mut visited: BTreeSet<TaskId> = BTreeSet::from([new_edge_to.clone()]);
let mut queue: VecDeque<TaskId> = VecDeque::from([new_edge_to.clone()]);
while let Some(node) = queue.pop_front() {
for (from, to) in augmented {
if from != &node || from == to || visited.contains(to) {
continue;
}
if to == new_edge_from {
let mut cycle: BTreeSet<TaskId> = BTreeSet::new();
cycle.insert(new_edge_from.clone());
cycle.insert(node.clone());
let mut cur = node.clone();
while let Some(p) = parent.get(&cur).cloned() {
cycle.insert(p.clone());
if &p == new_edge_to {
break;
}
cur = p;
}
return Some(cycle);
}
parent.insert(to.clone(), node.clone());
visited.insert(to.clone());
queue.push_back(to.clone());
}
}
None
}
pub(super) fn check_and_record_runtime_cycle_for_completion(
augmented_edges: &mut BTreeSet<(TaskId, TaskId)>,
invariant_violations: &mut Vec<RuntimeInvariantViolation>,
workspace: &Workspace,
completing_task: &TaskId,
materialised_outputs: &[CanonicalPath],
) -> Option<BTreeSet<TaskId>> {
for path in materialised_outputs {
for consumer in tasks_matching_path(workspace, path) {
let edge = (completing_task.clone(), consumer.clone());
if !augmented_edges.insert(edge.clone()) {
continue;
}
if &consumer == completing_task {
continue;
}
if let Some(cycle) =
find_cycle_through_new_edge(augmented_edges, completing_task, &consumer)
{
invariant_violations.push(RuntimeInvariantViolation::RuntimeCycle {
nodes: cycle.clone(),
offending_edge: edge,
});
return Some(cycle);
}
}
}
None
}
pub(super) fn skip_ready_cycle_members<O>(
observer: &O,
state: &mut ReadyState,
outcomes: &mut BTreeMap<TaskId, RunOutcome>,
cycle_nodes: &BTreeSet<TaskId>,
) where
O: RunObserver,
{
let drained: Vec<TaskId> = state
.ready
.iter()
.filter(|t| cycle_nodes.contains(*t))
.cloned()
.collect();
for task in drained {
state.ready.remove(&task);
state.skip.insert(task.clone());
let record = SkipRecord {
task: task.clone(),
cause: SkipCause::RuntimeCycle,
};
observer.on_task_skipped(&task, &record);
outcomes.insert(task, RunOutcome::Skipped(record));
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use std::path::Path;
use haz_domain::path::CanonicalPath;
use haz_domain::settings::WorkspaceSettings;
use haz_domain::task_id::TaskId;
use haz_vfs::WritableFilesystem;
use crate::mock_impl::{MockProcessSpawner, MockSpec};
use crate::run_graph::RuntimeInvariantViolation;
use crate::run_graph::cycle::tasks_matching_path;
use crate::run_graph::scheduler::run_graph;
use crate::run_graph::test_fixtures::*;
use crate::run_task::{CancelledRecord, RunOutcome, RunState};
#[tokio::test]
async fn exec_019_two_node_runtime_cycle_yields_diagnostic() {
let task_a = make_task_with_inputs_outputs("a", vec!["b_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/a_out"), b"a")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/b_out"), b"b")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "a")).state,
RunState::Succeeded
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b")).state,
RunState::Succeeded
);
assert_eq!(result.invariant_violations.len(), 1);
match &result.invariant_violations[0] {
RuntimeInvariantViolation::RuntimeCycle {
nodes,
offending_edge,
} => {
let expected: BTreeSet<TaskId> = BTreeSet::from([tid("p", "a"), tid("p", "b")]);
assert_eq!(nodes, &expected);
assert!(expected.contains(&offending_edge.0));
assert!(expected.contains(&offending_edge.1));
assert_ne!(offending_edge.0, offending_edge.1);
}
other @ RuntimeInvariantViolation::OutputOverlap { .. } => {
panic!("expected RuntimeCycle, got {other:?}")
}
}
}
#[tokio::test]
async fn exec_019_cycle_through_static_hard_plus_runtime_producer() {
let task_a = make_task_with_inputs_outputs("a", vec!["b_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec!["~:a"]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(
vec![tid("p", "a"), tid("p", "b")],
vec![h_edge(tid("p", "a"), tid("p", "b"))],
);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/a_out"), b"a")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/b_out"), b"b")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "a")).state,
RunState::Succeeded
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b")).state,
RunState::Succeeded
);
assert_eq!(result.invariant_violations.len(), 1);
match &result.invariant_violations[0] {
RuntimeInvariantViolation::RuntimeCycle {
nodes,
offending_edge,
} => {
assert_eq!(nodes, &BTreeSet::from([tid("p", "a"), tid("p", "b")]));
assert_eq!(offending_edge, &(tid("p", "b"), tid("p", "a")));
}
other @ RuntimeInvariantViolation::OutputOverlap { .. } => {
panic!("expected RuntimeCycle, got {other:?}")
}
}
}
#[tokio::test]
async fn exec_019_three_node_cycle_yields_full_node_set() {
let task_a = make_task_with_inputs_outputs("a", vec!["c_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec![]);
let task_c = make_task_with_inputs_outputs("c", vec!["b_out"], vec!["c_out"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b, task_c]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
for name in ["a_out", "b_out", "c_out"] {
fixture
.cache
.fs()
.write_file(&Path::new("/ws").join(name), b"x")
.unwrap();
}
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 3);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
for n in ["a", "b", "c"] {
assert_eq!(
completed_for(&result.outcomes, &tid("p", n)).state,
RunState::Succeeded
);
}
assert_eq!(result.invariant_violations.len(), 1);
match &result.invariant_violations[0] {
RuntimeInvariantViolation::RuntimeCycle {
nodes,
offending_edge,
} => {
assert_eq!(
nodes,
&BTreeSet::from([tid("p", "a"), tid("p", "b"), tid("p", "c")])
);
assert_eq!(offending_edge, &(tid("p", "c"), tid("p", "a")));
}
other @ RuntimeInvariantViolation::OutputOverlap { .. } => {
panic!("expected RuntimeCycle, got {other:?}")
}
}
}
#[tokio::test]
async fn exec_019_already_completed_member_keeps_succeeded_outcome() {
let task_a = make_task_with_inputs_outputs("a", vec!["b_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/a_out"), b"a")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/b_out"), b"b")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
for n in ["a", "b"] {
let outcome = result.outcomes.get(&tid("p", n));
match outcome {
Some(RunOutcome::Completed(record)) => {
assert_eq!(record.state, RunState::Succeeded);
}
other => panic!("expected Completed(Succeeded) for {n}, got {other:?}"),
}
}
assert_eq!(result.invariant_violations.len(), 1);
}
#[tokio::test]
async fn exec_019_in_flight_non_cycle_task_gets_cancelled() {
let task_a = make_task_with_inputs_outputs("a", vec!["b_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec![]);
let task_c = make_task_with_inputs_outputs("c", vec![], vec!["c_out"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b, task_c]);
let ws = make_workspace(vec![p], workspace_settings_with_grace(fixed_cap(3), 0.05));
let g = make_graph(vec![tid("p", "a"), tid("p", "b"), tid("p", "c")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/a_out"), b"a")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/b_out"), b"b")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/c_out"), b"c")
.unwrap();
let spawner = MockProcessSpawner::new();
spawner.push_spec(MockSpec::default()); spawner.push_spec(MockSpec::default()); spawner.push_spec(exit_on_terminate_spec(0)); let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
for n in ["a", "b"] {
assert_eq!(
completed_for(&result.outcomes, &tid("p", n)).state,
RunState::Succeeded
);
}
match result.outcomes.get(&tid("p", "c")) {
Some(RunOutcome::Cancelled(CancelledRecord::SignaledInFlight { .. })) => {}
other => panic!("expected Cancelled(SignaledInFlight) for c, got {other:?}"),
}
assert!(!spawner.signals_for(2).unwrap().is_empty());
assert_eq!(result.invariant_violations.len(), 1);
}
#[tokio::test]
async fn exec_019_internal_cancel_does_not_pollute_user_token() {
let task_a = make_task_with_inputs_outputs("a", vec!["b_out"], vec!["a_out"], vec![]);
let task_b = make_task_with_inputs_outputs("b", vec!["a_out"], vec!["b_out"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(2)));
let g = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/a_out"), b"a")
.unwrap();
fixture
.cache
.fs()
.write_file(Path::new("/ws/b_out"), b"b")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
assert!(!fixture.cancel.is_cancelled());
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(result.invariant_violations.len(), 1);
assert!(!fixture.cancel.is_cancelled());
}
#[tokio::test]
async fn exec_019_self_edge_does_not_trigger_cycle() {
let task = make_task_with_inputs_outputs("fmt", vec!["src.txt"], vec!["src.txt"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "fmt")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/src.txt"), b"x")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "fmt")).state,
RunState::Succeeded
);
assert!(
result.invariant_violations.is_empty(),
"self-edge MUST NOT trigger EXEC-019; got {:?}",
result.invariant_violations
);
}
#[tokio::test]
async fn exec_018_self_edge_task_runs_exactly_once_per_invocation() {
let task = make_task_with_inputs_outputs("fmt", vec!["src.txt"], vec!["src.txt"], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], workspace_settings_with(fixed_cap(1)));
let g = make_graph(vec![tid("p", "fmt")], vec![]);
let fixture = Fixture::new(ws, g);
fixture
.cache
.fs()
.write_file(Path::new("/ws/src.txt"), b"original")
.unwrap();
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1_700_000_000).await.unwrap();
assert_eq!(
completed_for(&result.outcomes, &tid("p", "fmt")).state,
RunState::Succeeded,
);
assert_eq!(
spawner.spawns().len(),
1,
"EXEC-018: self-edge task must spawn exactly once per invocation",
);
assert_eq!(
count_started(&observer.events(), &tid("p", "fmt")),
1,
"EXEC-018: self-edge task must fire exactly one Started event",
);
assert!(
result.invariant_violations.is_empty(),
"self-edge run should not violate EXEC-019 / EXEC-020; got {:?}",
result.invariant_violations,
);
}
#[test]
fn tasks_matching_path_literal_match_returns_consumer_task() {
let task = make_task_with_inputs_outputs("consumer", vec!["foo.txt"], vec![], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let path = CanonicalPath::parse_workspace_absolute("/foo.txt").unwrap();
let matched = tasks_matching_path(&ws, &path);
assert_eq!(matched, BTreeSet::from([tid("p", "consumer")]));
}
#[test]
fn tasks_matching_path_glob_match_returns_consumer_task() {
let task = make_task_with_inputs_outputs("consumer", vec!["*.txt"], vec![], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let path = CanonicalPath::parse_workspace_absolute("/foo.txt").unwrap();
let matched = tasks_matching_path(&ws, &path);
assert_eq!(matched, BTreeSet::from([tid("p", "consumer")]));
}
#[test]
fn tasks_matching_path_no_match_returns_empty() {
let task = make_task_with_inputs_outputs("consumer", vec!["foo.txt"], vec![], vec![]);
let p = make_project("p", BTreeSet::new(), vec![task]);
let ws = make_workspace(vec![p], WorkspaceSettings::default());
let path = CanonicalPath::parse_workspace_absolute("/different/path.bin").unwrap();
let matched = tasks_matching_path(&ws, &path);
assert!(matched.is_empty());
}
}