haz-exec 0.1.0

Async task execution engine for haz.
Documentation
//! Cascade emission for `EXEC-010` / `EXEC-011` and the
//! ready-set drain for `EXEC-013` step 1.
//!
//! Three free helpers, all consumed by the scheduler loop in
//! [`crate::run_graph`]:
//!
//! - [`emit_cascade_skips`] turns the
//!   [`crate::run_graph::state::ReadyState::complete_failed`]
//!   return value (the newly-skipped descendant set of a failed
//!   task) into [`RunOutcome::Skipped`] entries and observer
//!   events.
//! - [`emit_cascade_cancellations`] does the analogous work for
//!   the cancellation cascade, emitting
//!   [`CancelledRecord::UpstreamCancelled`] per descendant.
//! - [`drain_ready_to_cancelled`] empties the ready set into
//!   [`CancelledRecord::RunCancelled`] entries on every loop
//!   iteration that observes the run's cancellation flag.

use std::collections::{BTreeMap, BTreeSet};

use haz_domain::task_id::TaskId;

use crate::run_graph::state::ReadyState;
use crate::run_task::{CancelledRecord, RunObserver, RunOutcome, SkipCause, SkipRecord};

/// Emit a [`RunOutcome::Skipped`] entry into the scheduler's
/// `outcomes` map for every member of `newly_skipped`, attribute
/// each to `cause` (the root failed/erroring task), and fire
/// [`RunObserver::on_task_skipped`] once per descendant
/// (`EXEC-010` / `EXEC-011`).
///
/// The caller MUST have already invoked
/// [`crate::run_graph::state::ReadyState::complete_failed`]
/// (whose return value is `newly_skipped`) so the cascade
/// closure has been computed and the scheduler's ready/skip
/// sets are in lock-step with the emitted records.
pub(super) fn emit_cascade_skips<O>(
    observer: &O,
    outcomes: &mut BTreeMap<TaskId, RunOutcome>,
    cause: &SkipCause,
    newly_skipped: BTreeSet<TaskId>,
) where
    O: RunObserver,
{
    for descendant in newly_skipped {
        let record = SkipRecord {
            task: descendant.clone(),
            cause: cause.clone(),
        };
        observer.on_task_skipped(&descendant, &record);
        outcomes.insert(descendant, RunOutcome::Skipped(record));
    }
}

/// Emit a [`RunOutcome::Cancelled`] entry into the scheduler's
/// `outcomes` map for every member of `newly_cancelled`, attribute
/// each to `upstream` (the root cancelled task), and fire
/// [`RunObserver::on_task_cancelled`] once per descendant
/// (`EXEC-011` for the cancellation cascade).
///
/// The caller MUST have already invoked
/// [`crate::run_graph::state::ReadyState::complete_failed`]
/// (whose return value is `newly_cancelled`) so the cascade
/// closure has been computed and the scheduler's ready/skip
/// sets are in lock-step with the emitted records.
pub(super) fn emit_cascade_cancellations<O>(
    observer: &O,
    outcomes: &mut BTreeMap<TaskId, RunOutcome>,
    upstream: &TaskId,
    newly_cancelled: BTreeSet<TaskId>,
) where
    O: RunObserver,
{
    for descendant in newly_cancelled {
        let record = CancelledRecord::UpstreamCancelled {
            task: descendant.clone(),
            upstream: upstream.clone(),
        };
        observer.on_task_cancelled(&descendant, &record);
        outcomes.insert(descendant, RunOutcome::Cancelled(record));
    }
}

/// Drain every task currently in [`ReadyState::ready`] into the
/// scheduler's `outcomes` map as
/// [`RunOutcome::Cancelled`]`(`[`CancelledRecord::RunCancelled`]`)`
/// and fire [`RunObserver::on_task_cancelled`] once per task.
///
/// Each drained task is also inserted into [`ReadyState::skip`] so
/// any subsequent failure or cancellation cascade
/// (`complete_failed` + the `emit_cascade_*` helpers) treats it as
/// already settled and does not re-emit a conflicting outcome.
/// The drain is idempotent: a second call after the ready set
/// empties is a no-op.
///
/// Called both at the cancel-fire transition and once per loop
/// iteration while the run is cancelled, so a successor that gets
/// promoted into [`ReadyState::ready`] by a post-cancel succeeded
/// completion is still surfaced as cancelled rather than left as
/// silent debris in the ready set.
pub(super) fn drain_ready_to_cancelled<O>(
    observer: &O,
    state: &mut ReadyState,
    outcomes: &mut BTreeMap<TaskId, RunOutcome>,
) where
    O: RunObserver,
{
    let drained: Vec<TaskId> = state.ready.iter().cloned().collect();
    for task in drained {
        state.ready.remove(&task);
        state.skip.insert(task.clone());
        let record = CancelledRecord::RunCancelled { task: task.clone() };
        observer.on_task_cancelled(&task, &record);
        outcomes.insert(task, RunOutcome::Cancelled(record));
    }
}

#[cfg(test)]
mod tests {
    use std::collections::{BTreeMap, BTreeSet};

    use haz_domain::task_id::TaskId;

    use crate::run_graph::cascade::emit_cascade_skips;
    use crate::run_graph::test_fixtures::{Event, Recorder, skipped_for, tid};
    use crate::run_task::{RunOutcome, SkipCause};

    #[test]
    fn emit_cascade_skips_inserts_outcomes_and_fires_observer() {
        // Direct unit test of the cascade-emit helper. Both
        // SkipCause variants are recorded in outcomes with the
        // correct attribution and trigger one observer event
        // per descendant.
        let upstream = tid("p", "root");
        let descendants = BTreeSet::from([
            tid("p", "child_a"),
            tid("p", "child_b"),
            tid("p", "child_c"),
        ]);

        for cause in [
            SkipCause::UpstreamFailed {
                upstream: upstream.clone(),
            },
            SkipCause::UpstreamErrored {
                upstream: upstream.clone(),
            },
        ] {
            let observer = Recorder::default();
            let mut outcomes: BTreeMap<TaskId, RunOutcome> = BTreeMap::new();
            emit_cascade_skips(&observer, &mut outcomes, &cause, descendants.clone());

            assert_eq!(outcomes.len(), 3, "every descendant gets an outcome entry");
            for descendant in &descendants {
                assert_eq!(
                    skipped_for(&outcomes, descendant).cause,
                    cause,
                    "{descendant:?} carries the supplied cause variant",
                );
            }

            let skipped_events: Vec<(TaskId, SkipCause)> = observer
                .events()
                .into_iter()
                .filter_map(|e| match e {
                    Event::Skipped(t, c) => Some((t, c)),
                    _ => None,
                })
                .collect();
            assert_eq!(
                skipped_events.len(),
                3,
                "observer fires once per descendant",
            );
            for (task, recorded_cause) in skipped_events {
                assert!(descendants.contains(&task));
                assert_eq!(recorded_cause, cause);
            }
        }
    }
}