use std::collections::BTreeMap;
use std::future::Future;
use std::pin::Pin;
use haz_cache::CacheKey;
use haz_domain::mutex::Mutex;
use haz_domain::name::ProjectName;
use haz_domain::task_id::TaskId;
use haz_vfs::WritableFilesystem;
use crate::cache_key::PredecessorStreamHashes;
use crate::process::ProcessSpawner;
use crate::run_task::{
CompletedRecord, RunContext, RunObserver, RunTaskError, cache_lookup_phase, restore_from_hit,
run_fresh,
};
#[derive(Debug)]
pub(super) enum LookupStepOutcome {
Hit(CompletedRecord),
Miss {
key: CacheKey,
mutex: Option<Mutex>,
project_name: ProjectName,
},
}
#[derive(Debug)]
pub(super) enum InFlightCompletion {
Lookup {
task: TaskId,
result: Result<LookupStepOutcome, RunTaskError>,
},
Spawn {
task: TaskId,
result: Result<CompletedRecord, RunTaskError>,
},
}
pub(super) type InFlightFuture<'a> = Pin<Box<dyn Future<Output = InFlightCompletion> + 'a>>;
#[allow(clippy::unused_async)]
pub(super) async fn run_lookup_step<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
task: TaskId,
predecessor_streams: BTreeMap<TaskId, PredecessorStreamHashes>,
) -> InFlightCompletion
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let lookup = match cache_lookup_phase(ctx, &task, &predecessor_streams) {
Ok(l) => l,
Err(err) => {
return InFlightCompletion::Lookup {
task,
result: Err(err),
};
}
};
if let Some(manifest) = lookup.manifest.as_ref() {
let outcome = match restore_from_hit(ctx, &task, manifest) {
Ok(o) => o,
Err(err) => {
return InFlightCompletion::Lookup {
task,
result: Err(err),
};
}
};
return InFlightCompletion::Lookup {
task,
result: Ok(LookupStepOutcome::Hit(outcome)),
};
}
InFlightCompletion::Lookup {
task,
result: Ok(LookupStepOutcome::Miss {
key: lookup.key,
mutex: lookup.task_def.mutex.clone(),
project_name: lookup.project.name.clone(),
}),
}
}
pub(super) async fn run_spawn_step<F, S, O>(
ctx: &RunContext<'_, F, S, O>,
task: TaskId,
key: CacheKey,
created_at_unix: u64,
) -> InFlightCompletion
where
F: WritableFilesystem,
S: ProcessSpawner,
O: RunObserver,
{
let project = ctx
.workspace
.projects
.get(&task.project)
.expect("spawn-step task's project must exist (passed the lookup step)");
let task_def = project
.tasks
.get(&task.task)
.expect("spawn-step task name must exist in its project (passed the lookup step)");
let result = run_fresh(ctx, &task, project, task_def, &key, created_at_unix).await;
InFlightCompletion::Spawn { task, result }
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use std::str::FromStr;
use haz_domain::mutex::{Mutex as DomainMutex, MutexMode, MutexScope};
use haz_domain::name::MutexName;
use crate::mock_impl::MockProcessSpawner;
use crate::run_graph::scheduler::run_graph;
use crate::run_graph::test_fixtures::*;
use crate::run_task::{RunSource, RunState, run_task};
fn workspace_mutex(name: &str, mode: MutexMode) -> DomainMutex {
DomainMutex {
scope: MutexScope::Workspace,
name: MutexName::from_str(name).unwrap(),
mode,
}
}
fn project_scoped_mutex(name: &str, mode: MutexMode) -> DomainMutex {
DomainMutex {
scope: MutexScope::Project,
name: MutexName::from_str(name).unwrap(),
mode,
}
}
#[tokio::test]
async fn mutex_007_cache_hit_does_not_block_on_held_exclusive() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph_b_only = make_graph(vec![tid("p", "b")], vec![]);
let mut fixture = Fixture::new(ws, graph_b_only);
{
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let outcome = run_task(&ctx, &tid("p", "b"), &BTreeMap::new(), 1)
.await
.unwrap();
assert_eq!(outcome.source, RunSource::FreshRun);
assert_eq!(outcome.state, RunState::Succeeded);
}
fixture.graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 1); let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 2).await.unwrap();
let record_a = completed_for(&result.outcomes, &tid("p", "a"));
let record_b = completed_for(&result.outcomes, &tid("p", "b"));
assert_eq!(record_a.source, RunSource::FreshRun);
assert_eq!(record_a.state, RunState::Succeeded);
assert_eq!(record_b.source, RunSource::CacheHit);
assert_eq!(record_b.state, RunState::Succeeded);
assert_eq!(
spawner.spawns().len(),
1,
"only `a` should spawn; `b` is a cache hit",
);
let events = observer.events();
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
finished_b < finished_a,
"cache-hit `b` must finish before exclusive-holder `a`: \
finished_b={finished_b}, finished_a={finished_a}, events={events:?}",
);
}
#[tokio::test]
async fn mutex_003_two_exclusive_serialise() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
let result = run_graph(&ctx, 1).await.unwrap();
assert_eq!(result.outcomes.len(), 2);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "a")).state,
RunState::Succeeded,
);
assert_eq!(
completed_for(&result.outcomes, &tid("p", "b")).state,
RunState::Succeeded,
);
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
finished_a < finished_b,
"exclusive holder `a` must finish before contended `b`: \
finished_a={finished_a}, finished_b={finished_b}, events={events:?}",
);
}
#[tokio::test]
async fn mutex_004_two_shared_proceed_in_parallel() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Shared)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Shared)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let started_a = pos_started(&events, &tid("p", "a"));
let started_b = pos_started(&events, &tid("p", "b"));
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
started_a < finished_a && started_b < finished_a,
"both Started events must precede the first Finished under MUTEX-004 \
shared/shared compatibility: events={events:?}",
);
assert!(
started_a < finished_b && started_b < finished_b,
"both Started events must precede the second Finished too: events={events:?}",
);
}
#[tokio::test]
async fn mutex_004_shared_blocks_incoming_exclusive() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Shared)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
finished_a < finished_b,
"shared holder `a` must finish before incoming-exclusive `b`: \
finished_a={finished_a}, finished_b={finished_b}, events={events:?}",
);
assert_eq!(count_started(&events, &tid("p", "b")), 1);
}
#[tokio::test]
async fn mutex_004_exclusive_blocks_incoming_shared() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Shared)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
finished_a < finished_b,
"exclusive holder `a` must finish before incoming-shared `b`: \
finished_a={finished_a}, finished_b={finished_b}, events={events:?}",
);
assert_eq!(count_started(&events, &tid("p", "b")), 1);
}
#[tokio::test]
async fn mutex_001_workspace_and_project_scopes_are_distinct() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(project_scoped_mutex("db", MutexMode::Exclusive)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let started_a = pos_started(&events, &tid("p", "a"));
let started_b = pos_started(&events, &tid("p", "b"));
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(
started_a < finished_a && started_b < finished_a,
"workspace and project scopes are distinct keys; both should \
spawn before either finishes: events={events:?}",
);
assert!(started_a < finished_b && started_b < finished_b);
}
#[tokio::test]
async fn mutex_001_project_scoped_same_name_in_different_projects_does_not_serialise() {
let task_p1 = make_task_with(
"t",
&["echo", "p1"],
Some(project_scoped_mutex("db", MutexMode::Exclusive)),
);
let task_p2 = make_task_with(
"t",
&["echo", "p2"],
Some(project_scoped_mutex("db", MutexMode::Exclusive)),
);
let p1 = make_project("p1", BTreeSet::new(), vec![task_p1]);
let p2 = make_project("p2", BTreeSet::new(), vec![task_p2]);
let ws = make_workspace(vec![p1, p2], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p1", "t"), tid("p2", "t")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
let finished_p1 = pos_finished(&events, &tid("p1", "t"));
let finished_p2 = pos_finished(&events, &tid("p2", "t"));
let started_p1 = pos_started(&events, &tid("p1", "t"));
let started_p2 = pos_started(&events, &tid("p2", "t"));
assert!(
started_p1 < finished_p1
&& started_p2 < finished_p1
&& started_p1 < finished_p2
&& started_p2 < finished_p2,
"project-scoped `~:db` in distinct projects must not serialise: \
events={events:?}",
);
}
#[tokio::test]
async fn mutex_005_contended_task_yields_and_resumes_after_release() {
let task_a = make_task_with(
"a",
&["echo", "a"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let task_b = make_task_with(
"b",
&["echo", "b"],
Some(workspace_mutex("db", MutexMode::Exclusive)),
);
let project = make_project("p", BTreeSet::new(), vec![task_a, task_b]);
let ws = make_workspace(vec![project], workspace_settings_with(fixed_cap(2)));
let graph = make_graph(vec![tid("p", "a"), tid("p", "b")], vec![]);
let fixture = Fixture::new(ws, graph);
let spawner = MockProcessSpawner::new();
push_n_default_specs(&spawner, 2);
let observer = Recorder::default();
let ctx = make_ctx(&fixture, &spawner, &observer);
run_graph(&ctx, 1).await.unwrap();
assert_eq!(spawner.spawns().len(), 2);
let events = observer.events();
assert_eq!(
count_started(&events, &tid("p", "a")),
1,
"Started(a) must fire exactly once: events={events:?}",
);
assert_eq!(
count_started(&events, &tid("p", "b")),
1,
"Started(b) must fire exactly once even after the yield: \
events={events:?}",
);
let started_a = pos_started(&events, &tid("p", "a"));
let started_b = pos_started(&events, &tid("p", "b"));
let finished_a = pos_finished(&events, &tid("p", "a"));
let finished_b = pos_finished(&events, &tid("p", "b"));
assert!(started_a < finished_a);
assert!(started_b < finished_a);
assert!(finished_a < finished_b, "events={events:?}");
}
}