use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use haz_dag::edge::EdgeKind;
use haz_dag::graph::TaskGraph;
use haz_domain::name::TagName;
use haz_domain::settings::concurrency::{ConcurrencyDefault, ConcurrencySettings};
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use crate::cache_key::PredecessorStreamHashes;
use crate::run_task::CompletedRecord;
pub(super) fn resolve_global_cap(settings: &ConcurrencySettings) -> NonZeroUsize {
match settings.default {
ConcurrencyDefault::Fixed(n) => n,
ConcurrencyDefault::Auto => clamp_to_nonzero(num_cpus::get()),
}
}
pub(super) fn clamp_to_nonzero(n: usize) -> NonZeroUsize {
NonZeroUsize::new(n).unwrap_or(NonZeroUsize::MIN)
}
pub(super) fn precompute_task_tags(
workspace: &Workspace,
graph: &TaskGraph,
) -> BTreeMap<TaskId, BTreeSet<TagName>> {
graph
.nodes
.iter()
.map(|task_id| {
let tags = workspace
.projects
.get(&task_id.project)
.expect("graph node refers to a project absent from the workspace")
.tags
.clone();
(task_id.clone(), tags)
})
.collect()
}
pub(super) fn build_hard_edge_index(
graph: &TaskGraph,
) -> (BTreeMap<TaskId, usize>, BTreeMap<TaskId, BTreeSet<TaskId>>) {
let mut remaining: BTreeMap<TaskId, usize> =
graph.nodes.iter().map(|t| (t.clone(), 0)).collect();
let mut successors: BTreeMap<TaskId, BTreeSet<TaskId>> = graph
.nodes
.iter()
.map(|t| (t.clone(), BTreeSet::new()))
.collect();
for edge in &graph.edges {
if edge.kind == EdgeKind::Hard {
*remaining.entry(edge.to.clone()).or_insert(0) += 1;
successors
.entry(edge.from.clone())
.or_default()
.insert(edge.to.clone());
}
}
(remaining, successors)
}
fn descendants_via_index(
successors: &BTreeMap<TaskId, BTreeSet<TaskId>>,
root: &TaskId,
) -> BTreeSet<TaskId> {
let mut visited = BTreeSet::new();
let mut frontier = vec![root.clone()];
while let Some(t) = frontier.pop() {
if let Some(children) = successors.get(&t) {
for c in children {
if visited.insert(c.clone()) {
frontier.push(c.clone());
}
}
}
}
visited
}
#[cfg(test)]
pub(super) fn hard_descendants(graph: &TaskGraph, root: &TaskId) -> BTreeSet<TaskId> {
let (_, successors) = build_hard_edge_index(graph);
descendants_via_index(&successors, root)
}
#[derive(Debug, Clone, Default)]
pub(super) struct ReadyState {
pub ready: BTreeSet<TaskId>,
pub remaining: BTreeMap<TaskId, usize>,
pub hard_successors: BTreeMap<TaskId, BTreeSet<TaskId>>,
pub skip: BTreeSet<TaskId>,
}
impl ReadyState {
pub(super) fn from_graph(graph: &TaskGraph) -> Self {
let (remaining, hard_successors) = build_hard_edge_index(graph);
let ready = remaining
.iter()
.filter(|&(_, &n)| n == 0)
.map(|(t, _)| t.clone())
.collect();
Self {
ready,
remaining,
hard_successors,
skip: BTreeSet::new(),
}
}
pub(super) fn complete_succeeded(&mut self, task: &TaskId) {
let succs: Vec<TaskId> = match self.hard_successors.get(task) {
Some(s) => s.iter().cloned().collect(),
None => return,
};
for s in succs {
if let Some(r) = self.remaining.get_mut(&s) {
*r = r.saturating_sub(1);
if *r == 0 && !self.skip.contains(&s) {
self.ready.insert(s);
}
}
}
}
pub(super) fn complete_failed(&mut self, task: &TaskId) -> BTreeSet<TaskId> {
let mut newly_skipped = BTreeSet::new();
let descendants = descendants_via_index(&self.hard_successors, task);
for d in descendants {
if self.skip.insert(d.clone()) {
self.ready.remove(&d);
newly_skipped.insert(d);
}
}
newly_skipped
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct InFlightCounts {
pub global: usize,
pub per_tag: BTreeMap<TagName, usize>,
}
impl InFlightCounts {
pub(super) fn can_admit(
&self,
task_tags: &BTreeSet<TagName>,
settings: &ConcurrencySettings,
global_cap: NonZeroUsize,
) -> bool {
if self.global >= global_cap.get() {
return false;
}
for tag in task_tags {
if let Some(cap) = settings.tags.get(tag) {
let in_flight = self.per_tag.get(tag).copied().unwrap_or(0);
if in_flight >= cap.get() {
return false;
}
}
}
true
}
pub(super) fn admit(&mut self, task_tags: &BTreeSet<TagName>) {
self.global += 1;
for tag in task_tags {
*self.per_tag.entry(tag.clone()).or_insert(0) += 1;
}
}
pub(super) fn release(&mut self, task_tags: &BTreeSet<TagName>) {
self.global = self.global.saturating_sub(1);
for tag in task_tags {
if let Some(c) = self.per_tag.get_mut(tag) {
*c = c.saturating_sub(1);
}
}
}
}
#[derive(Debug, Clone, Default)]
pub(super) struct StreamHashAccumulator {
pub by_task: BTreeMap<TaskId, PredecessorStreamHashes>,
}
impl StreamHashAccumulator {
pub(super) fn record(&mut self, task: &TaskId, record: &CompletedRecord) {
self.by_task.insert(
task.clone(),
PredecessorStreamHashes {
stdout_hash: record.stdout_hash,
stderr_hash: record.stderr_hash,
},
);
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::str::FromStr;
use haz_dag::edge::{Edge, EdgeKind};
use haz_dag::graph::TaskGraph;
use haz_domain::action::TaskAction;
use haz_domain::env::EnvSettings;
use haz_domain::name::{ProjectName, TagName, TaskName};
use haz_domain::path::{ProjectRoot, WorkspaceRootPath};
use haz_domain::project::Project;
use haz_domain::settings::WorkspaceSettings;
use haz_domain::settings::concurrency::{ConcurrencyDefault, ConcurrencySettings};
use haz_domain::task::Task;
use haz_domain::task_id::TaskId;
use haz_domain::workspace::Workspace;
use nonempty::NonEmpty;
use crate::cache_key::PredecessorStreamHashes;
use crate::run_graph::state::{
InFlightCounts, ReadyState, StreamHashAccumulator, build_hard_edge_index, clamp_to_nonzero,
hard_descendants, precompute_task_tags, resolve_global_cap,
};
use crate::run_task::{CompletedRecord, RunSource, RunState};
fn project_name(s: &str) -> ProjectName {
ProjectName::from_str(s).unwrap()
}
fn task_name(s: &str) -> TaskName {
TaskName::from_str(s).unwrap()
}
fn tag_name(s: &str) -> TagName {
TagName::from_str(s).unwrap()
}
fn tid(project: &str, task: &str) -> TaskId {
TaskId {
project: project_name(project),
task: task_name(task),
}
}
fn implicit_project_with(name: &str, tags: BTreeSet<TagName>) -> Project {
Project {
name: project_name(name),
root: ProjectRoot::WorkspaceRoot,
tags,
tasks: BTreeMap::new(),
}
}
fn noop_task(name: &str) -> Task {
Task {
name: task_name(name),
action: TaskAction::Command(NonEmpty::from_vec(vec!["true".to_owned()]).unwrap()),
inputs: Vec::new(),
outputs: Vec::new(),
deps: Vec::new(),
weak_deps: Vec::new(),
mutex: None,
env: EnvSettings::default(),
}
}
fn workspace_with(projects: Vec<Project>) -> Workspace {
let mut by_name = BTreeMap::new();
for p in projects {
by_name.insert(p.name.clone(), p);
}
Workspace {
root: WorkspaceRootPath::try_new(PathBuf::from("/abs/ws")).unwrap(),
projects: by_name,
overlays: BTreeMap::new(),
settings: WorkspaceSettings::default(),
}
}
fn hard_edge(from: TaskId, to: TaskId) -> Edge {
Edge {
from,
to,
kind: EdgeKind::Hard,
}
}
fn soft_edge(from: TaskId, to: TaskId) -> Edge {
Edge {
from,
to,
kind: EdgeKind::Soft,
}
}
fn graph_with(nodes: Vec<TaskId>, edges: Vec<Edge>) -> TaskGraph {
TaskGraph {
nodes: nodes.into_iter().collect(),
edges: edges.into_iter().collect(),
}
}
fn completed(task: TaskId, stdout: [u8; 32], stderr: [u8; 32]) -> CompletedRecord {
CompletedRecord {
task,
source: RunSource::FreshRun,
state: RunState::Succeeded,
exit_status: None,
stdout_hash: stdout,
stderr_hash: stderr,
materialised_outputs: Vec::new(),
}
}
fn settings_with_default(default: ConcurrencyDefault) -> ConcurrencySettings {
ConcurrencySettings {
default,
tags: BTreeMap::new(),
}
}
#[test]
fn exec_004_fixed_returns_chosen_value() {
let seven = NonZeroUsize::new(7).unwrap();
let cs = settings_with_default(ConcurrencyDefault::Fixed(seven));
assert_eq!(resolve_global_cap(&cs), seven);
}
#[test]
fn exec_004_auto_is_at_least_one() {
let cs = settings_with_default(ConcurrencyDefault::Auto);
assert!(resolve_global_cap(&cs).get() >= 1);
}
#[test]
fn clamp_to_nonzero_clamps_zero_to_one() {
assert_eq!(clamp_to_nonzero(0), NonZeroUsize::new(1).unwrap());
}
#[test]
fn clamp_to_nonzero_passes_one_through() {
assert_eq!(clamp_to_nonzero(1), NonZeroUsize::new(1).unwrap());
}
#[test]
fn clamp_to_nonzero_passes_large_positive_through() {
assert_eq!(
clamp_to_nonzero(usize::MAX),
NonZeroUsize::new(usize::MAX).unwrap(),
);
}
#[test]
fn precompute_task_tags_empty_workspace_yields_empty_map() {
let ws = workspace_with(vec![]);
let g = graph_with(vec![], vec![]);
assert!(precompute_task_tags(&ws, &g).is_empty());
}
#[test]
fn precompute_task_tags_carries_project_tags() {
let mut p =
implicit_project_with("lib", BTreeSet::from([tag_name("native"), tag_name("io")]));
p.tasks.insert(task_name("build"), noop_task("build"));
p.tasks.insert(task_name("test"), noop_task("test"));
let ws = workspace_with(vec![p]);
let g = graph_with(vec![tid("lib", "build"), tid("lib", "test")], vec![]);
let tags = precompute_task_tags(&ws, &g);
assert_eq!(tags.len(), 2);
assert_eq!(
tags[&tid("lib", "build")],
BTreeSet::from([tag_name("native"), tag_name("io")])
);
assert_eq!(
tags[&tid("lib", "test")],
BTreeSet::from([tag_name("native"), tag_name("io")])
);
}
#[test]
fn precompute_task_tags_distinguishes_tags_across_projects() {
let p1 = implicit_project_with("a", BTreeSet::from([tag_name("rust")]));
let p2 = implicit_project_with("b", BTreeSet::from([tag_name("node")]));
let ws = workspace_with(vec![p1, p2]);
let g = graph_with(vec![tid("a", "x"), tid("b", "x")], vec![]);
let tags = precompute_task_tags(&ws, &g);
assert_eq!(tags[&tid("a", "x")], BTreeSet::from([tag_name("rust")]));
assert_eq!(tags[&tid("b", "x")], BTreeSet::from([tag_name("node")]));
}
#[test]
fn build_hard_edge_index_no_edges_yields_zero_indegree() {
let g = graph_with(vec![tid("a", "x"), tid("b", "y")], vec![]);
let (remaining, successors) = build_hard_edge_index(&g);
assert_eq!(remaining[&tid("a", "x")], 0);
assert_eq!(remaining[&tid("b", "y")], 0);
assert!(successors[&tid("a", "x")].is_empty());
assert!(successors[&tid("b", "y")].is_empty());
}
#[test]
fn build_hard_edge_index_single_hard_edge() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![hard_edge(a.clone(), b.clone())],
);
let (remaining, successors) = build_hard_edge_index(&g);
assert_eq!(remaining[&a], 0);
assert_eq!(remaining[&b], 1);
assert_eq!(successors[&a], BTreeSet::from([b.clone()]));
assert!(successors[&b].is_empty());
}
#[test]
fn build_hard_edge_index_fan_out() {
let a = tid("p", "a");
let b = tid("p", "b");
let c = tid("p", "c");
let g = graph_with(
vec![a.clone(), b.clone(), c.clone()],
vec![
hard_edge(a.clone(), b.clone()),
hard_edge(a.clone(), c.clone()),
],
);
let (remaining, successors) = build_hard_edge_index(&g);
assert_eq!(remaining[&a], 0);
assert_eq!(remaining[&b], 1);
assert_eq!(remaining[&c], 1);
assert_eq!(successors[&a], BTreeSet::from([b, c]));
}
#[test]
fn build_hard_edge_index_fan_in() {
let a = tid("p", "a");
let b = tid("p", "b");
let c = tid("p", "c");
let g = graph_with(
vec![a.clone(), b.clone(), c.clone()],
vec![
hard_edge(a.clone(), c.clone()),
hard_edge(b.clone(), c.clone()),
],
);
let (remaining, successors) = build_hard_edge_index(&g);
assert_eq!(remaining[&c], 2);
assert_eq!(successors[&a], BTreeSet::from([c.clone()]));
assert_eq!(successors[&b], BTreeSet::from([c]));
}
#[test]
fn build_hard_edge_index_ignores_soft_edges() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![soft_edge(a.clone(), b.clone())],
);
let (remaining, successors) = build_hard_edge_index(&g);
assert_eq!(remaining[&b], 0);
assert!(successors[&a].is_empty());
}
#[test]
fn hard_descendants_isolated_node_has_none() {
let a = tid("p", "a");
let g = graph_with(vec![a.clone()], vec![]);
assert!(hard_descendants(&g, &a).is_empty());
}
#[test]
fn hard_descendants_single_child() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![hard_edge(a.clone(), b.clone())],
);
assert_eq!(hard_descendants(&g, &a), BTreeSet::from([b]));
}
#[test]
fn hard_descendants_transitive_chain() {
let a = tid("p", "a");
let b = tid("p", "b");
let c = tid("p", "c");
let g = graph_with(
vec![a.clone(), b.clone(), c.clone()],
vec![
hard_edge(a.clone(), b.clone()),
hard_edge(b.clone(), c.clone()),
],
);
assert_eq!(hard_descendants(&g, &a), BTreeSet::from([b, c]));
}
#[test]
fn hard_descendants_diamond() {
let top = tid("p", "top");
let l = tid("p", "l");
let r = tid("p", "r");
let bot = tid("p", "bot");
let g = graph_with(
vec![top.clone(), l.clone(), r.clone(), bot.clone()],
vec![
hard_edge(top.clone(), l.clone()),
hard_edge(top.clone(), r.clone()),
hard_edge(l.clone(), bot.clone()),
hard_edge(r.clone(), bot.clone()),
],
);
assert_eq!(hard_descendants(&g, &top), BTreeSet::from([l, r, bot]));
}
#[test]
fn hard_descendants_ignores_soft_edges() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![soft_edge(a.clone(), b.clone())],
);
assert!(hard_descendants(&g, &a).is_empty());
}
#[test]
fn ready_state_no_edges_makes_every_node_ready() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(vec![a.clone(), b.clone()], vec![]);
let state = ReadyState::from_graph(&g);
assert_eq!(state.ready, BTreeSet::from([a, b]));
assert!(state.skip.is_empty());
}
#[test]
fn ready_state_chain_only_head_is_ready() {
let a = tid("p", "a");
let b = tid("p", "b");
let c = tid("p", "c");
let g = graph_with(
vec![a.clone(), b.clone(), c.clone()],
vec![
hard_edge(a.clone(), b.clone()),
hard_edge(b.clone(), c.clone()),
],
);
let state = ReadyState::from_graph(&g);
assert_eq!(state.ready, BTreeSet::from([a]));
}
#[test]
fn ready_state_diamond_only_top_is_ready() {
let top = tid("p", "top");
let l = tid("p", "l");
let r = tid("p", "r");
let bot = tid("p", "bot");
let g = graph_with(
vec![top.clone(), l.clone(), r.clone(), bot.clone()],
vec![
hard_edge(top.clone(), l.clone()),
hard_edge(top.clone(), r.clone()),
hard_edge(l.clone(), bot.clone()),
hard_edge(r.clone(), bot.clone()),
],
);
let state = ReadyState::from_graph(&g);
assert_eq!(state.ready, BTreeSet::from([top]));
}
#[test]
fn complete_succeeded_releases_single_successor() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![hard_edge(a.clone(), b.clone())],
);
let mut state = ReadyState::from_graph(&g);
assert!(!state.ready.contains(&b));
state.complete_succeeded(&a);
assert!(state.ready.contains(&b));
}
#[test]
fn complete_succeeded_holds_on_last_predecessor_of_fan_in() {
let a1 = tid("p", "a1");
let a2 = tid("p", "a2");
let b = tid("p", "b");
let g = graph_with(
vec![a1.clone(), a2.clone(), b.clone()],
vec![
hard_edge(a1.clone(), b.clone()),
hard_edge(a2.clone(), b.clone()),
],
);
let mut state = ReadyState::from_graph(&g);
state.complete_succeeded(&a1);
assert!(!state.ready.contains(&b));
state.complete_succeeded(&a2);
assert!(state.ready.contains(&b));
}
#[test]
fn complete_succeeded_is_noop_on_leaf() {
let a = tid("p", "a");
let g = graph_with(vec![a.clone()], vec![]);
let mut state = ReadyState::from_graph(&g);
let before = state.clone();
state.complete_succeeded(&a);
assert_eq!(state.ready, before.ready);
assert_eq!(state.skip, before.skip);
}
#[test]
fn complete_failed_cascades_to_single_child() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![hard_edge(a.clone(), b.clone())],
);
let mut state = ReadyState::from_graph(&g);
let newly = state.complete_failed(&a);
assert_eq!(newly, BTreeSet::from([b.clone()]));
assert!(state.skip.contains(&b));
assert!(!state.ready.contains(&b));
}
#[test]
fn complete_failed_cascades_transitively() {
let a = tid("p", "a");
let b = tid("p", "b");
let c = tid("p", "c");
let g = graph_with(
vec![a.clone(), b.clone(), c.clone()],
vec![
hard_edge(a.clone(), b.clone()),
hard_edge(b.clone(), c.clone()),
],
);
let mut state = ReadyState::from_graph(&g);
let newly = state.complete_failed(&a);
assert_eq!(newly, BTreeSet::from([b.clone(), c.clone()]));
assert_eq!(state.skip, BTreeSet::from([b, c]));
}
#[test]
fn complete_failed_removes_already_ready_descendants() {
let a = tid("p", "a");
let b = tid("p", "b");
let g = graph_with(
vec![a.clone(), b.clone()],
vec![hard_edge(a.clone(), b.clone())],
);
let mut state = ReadyState::from_graph(&g);
state.ready.insert(b.clone()); let newly = state.complete_failed(&a);
assert_eq!(newly, BTreeSet::from([b.clone()]));
assert!(state.skip.contains(&b));
assert!(!state.ready.contains(&b));
}
#[test]
fn complete_failed_leaves_unrelated_subgraph_alone() {
let a = tid("p", "a");
let a_child = tid("p", "a_child");
let b = tid("p", "b");
let b_child = tid("p", "b_child");
let g = graph_with(
vec![a.clone(), a_child.clone(), b.clone(), b_child.clone()],
vec![
hard_edge(a.clone(), a_child.clone()),
hard_edge(b.clone(), b_child.clone()),
],
);
let mut state = ReadyState::from_graph(&g);
let newly = state.complete_failed(&a);
assert_eq!(newly, BTreeSet::from([a_child.clone()]));
assert!(state.skip.contains(&a_child));
assert!(!state.skip.contains(&b));
assert!(!state.skip.contains(&b_child));
assert!(state.ready.contains(&b));
}
#[test]
fn complete_failed_diamond_attributes_each_descendant_once() {
let top = tid("p", "top");
let left = tid("p", "left");
let right = tid("p", "right");
let bot = tid("p", "bot");
let g = graph_with(
vec![top.clone(), left.clone(), right.clone(), bot.clone()],
vec![
hard_edge(top.clone(), left.clone()),
hard_edge(top.clone(), right.clone()),
hard_edge(left.clone(), bot.clone()),
hard_edge(right.clone(), bot.clone()),
],
);
let mut state = ReadyState::from_graph(&g);
let newly = state.complete_failed(&top);
assert_eq!(
newly,
BTreeSet::from([left.clone(), right.clone(), bot.clone()]),
);
assert_eq!(state.skip, BTreeSet::from([left, right, bot]));
}
fn one() -> NonZeroUsize {
NonZeroUsize::new(1).unwrap()
}
fn two() -> NonZeroUsize {
NonZeroUsize::new(2).unwrap()
}
fn ten() -> NonZeroUsize {
NonZeroUsize::new(10).unwrap()
}
fn settings_with_tag(tag: &str, cap: NonZeroUsize) -> ConcurrencySettings {
let mut tags = BTreeMap::new();
tags.insert(tag_name(tag), cap);
ConcurrencySettings {
default: ConcurrencyDefault::Auto,
tags,
}
}
#[test]
fn in_flight_can_admit_below_global_cap() {
let counts = InFlightCounts::default();
let cs = settings_with_default(ConcurrencyDefault::Fixed(two()));
assert!(counts.can_admit(&BTreeSet::new(), &cs, two()));
}
#[test]
fn in_flight_cannot_admit_at_global_cap() {
let mut counts = InFlightCounts::default();
counts.admit(&BTreeSet::new());
counts.admit(&BTreeSet::new());
let cs = settings_with_default(ConcurrencyDefault::Fixed(two()));
assert!(!counts.can_admit(&BTreeSet::new(), &cs, two()));
}
#[test]
fn in_flight_cannot_admit_at_per_tag_cap() {
let mut counts = InFlightCounts::default();
let tags = BTreeSet::from([tag_name("db")]);
counts.admit(&tags);
let cs = settings_with_tag("db", one());
assert!(!counts.can_admit(&tags, &cs, ten()));
}
#[test]
fn in_flight_multi_tag_task_subject_to_every_cap() {
let mut counts = InFlightCounts::default();
let tags = BTreeSet::from([tag_name("db"), tag_name("gpu")]);
counts.admit(&BTreeSet::from([tag_name("gpu")]));
let mut tag_caps = BTreeMap::new();
tag_caps.insert(tag_name("db"), ten());
tag_caps.insert(tag_name("gpu"), one());
let cs = ConcurrencySettings {
default: ConcurrencyDefault::Auto,
tags: tag_caps,
};
assert!(!counts.can_admit(&tags, &cs, ten()));
}
#[test]
fn in_flight_untagged_only_subject_to_global() {
let mut counts = InFlightCounts::default();
counts.admit(&BTreeSet::from([tag_name("db")]));
let cs = settings_with_tag("db", one());
assert!(counts.can_admit(&BTreeSet::new(), &cs, ten()));
}
#[test]
fn in_flight_release_decrements_counters() {
let mut counts = InFlightCounts::default();
let tags = BTreeSet::from([tag_name("db")]);
counts.admit(&tags);
counts.admit(&tags);
counts.release(&tags);
assert_eq!(counts.global, 1);
assert_eq!(counts.per_tag[&tag_name("db")], 1);
}
#[test]
fn stream_hash_accumulator_records_outcome() {
let mut accum = StreamHashAccumulator::default();
let t = tid("p", "a");
let mut so = [0u8; 32];
so[0] = 0xAB;
let mut se = [0u8; 32];
se[0] = 0xCD;
accum.record(&t, &completed(t.clone(), so, se));
assert_eq!(
accum.by_task.get(&t),
Some(&PredecessorStreamHashes {
stdout_hash: so,
stderr_hash: se,
})
);
}
}