use std::collections::{BTreeMap, BTreeSet};
use haz_domain::task_id::TaskId;
use crate::run_graph::state::ReadyState;
use crate::run_task::{CancelledRecord, RunObserver, RunOutcome, SkipCause, SkipRecord};
pub(super) fn emit_cascade_skips<O>(
observer: &O,
outcomes: &mut BTreeMap<TaskId, RunOutcome>,
cause: &SkipCause,
newly_skipped: BTreeSet<TaskId>,
) where
O: RunObserver,
{
for descendant in newly_skipped {
let record = SkipRecord {
task: descendant.clone(),
cause: cause.clone(),
};
observer.on_task_skipped(&descendant, &record);
outcomes.insert(descendant, RunOutcome::Skipped(record));
}
}
pub(super) fn emit_cascade_cancellations<O>(
observer: &O,
outcomes: &mut BTreeMap<TaskId, RunOutcome>,
upstream: &TaskId,
newly_cancelled: BTreeSet<TaskId>,
) where
O: RunObserver,
{
for descendant in newly_cancelled {
let record = CancelledRecord::UpstreamCancelled {
task: descendant.clone(),
upstream: upstream.clone(),
};
observer.on_task_cancelled(&descendant, &record);
outcomes.insert(descendant, RunOutcome::Cancelled(record));
}
}
pub(super) fn drain_ready_to_cancelled<O>(
observer: &O,
state: &mut ReadyState,
outcomes: &mut BTreeMap<TaskId, RunOutcome>,
) where
O: RunObserver,
{
let drained: Vec<TaskId> = state.ready.iter().cloned().collect();
for task in drained {
state.ready.remove(&task);
state.skip.insert(task.clone());
let record = CancelledRecord::RunCancelled { task: task.clone() };
observer.on_task_cancelled(&task, &record);
outcomes.insert(task, RunOutcome::Cancelled(record));
}
}
#[cfg(test)]
mod tests {
use std::collections::{BTreeMap, BTreeSet};
use haz_domain::task_id::TaskId;
use crate::run_graph::cascade::emit_cascade_skips;
use crate::run_graph::test_fixtures::{Event, Recorder, skipped_for, tid};
use crate::run_task::{RunOutcome, SkipCause};
#[test]
fn emit_cascade_skips_inserts_outcomes_and_fires_observer() {
let upstream = tid("p", "root");
let descendants = BTreeSet::from([
tid("p", "child_a"),
tid("p", "child_b"),
tid("p", "child_c"),
]);
for cause in [
SkipCause::UpstreamFailed {
upstream: upstream.clone(),
},
SkipCause::UpstreamErrored {
upstream: upstream.clone(),
},
] {
let observer = Recorder::default();
let mut outcomes: BTreeMap<TaskId, RunOutcome> = BTreeMap::new();
emit_cascade_skips(&observer, &mut outcomes, &cause, descendants.clone());
assert_eq!(outcomes.len(), 3, "every descendant gets an outcome entry");
for descendant in &descendants {
assert_eq!(
skipped_for(&outcomes, descendant).cause,
cause,
"{descendant:?} carries the supplied cause variant",
);
}
let skipped_events: Vec<(TaskId, SkipCause)> = observer
.events()
.into_iter()
.filter_map(|e| match e {
Event::Skipped(t, c) => Some((t, c)),
_ => None,
})
.collect();
assert_eq!(
skipped_events.len(),
3,
"observer fires once per descendant",
);
for (task, recorded_cause) in skipped_events {
assert!(descendants.contains(&task));
assert_eq!(recorded_cause, cause);
}
}
}
}