omne-cli 0.2.0

CLI for managing omne volumes: init, upgrade, and validate kernel and distro releases
Documentation
//! DAG scheduler.
//!
//! Pure-logic state machine over a parsed `pipe::Pipe`. Tracks node
//! lifecycle states, decides which nodes are ready to dispatch given
//! the current outcomes of their predecessors, and respects each
//! node's `trigger_rule` (`all_success` default,
//! `one_success` for synthesis nodes that should fire as soon as any
//! single branch succeeds).
//!
//! No I/O. The runner (Unit 11+) plugs the scheduler into subprocess
//! lifecycle: call `ready()` to find dispatchable nodes, spawn each,
//! then call `mark(id, outcome)` when subprocess termination is
//! observed. The returned set is the *newly* unblocked successors so
//! callers don't re-scan the whole DAG every tick.
//!
//! Failure semantics:
//! - `all_success` predecessors: any failed predecessor permanently
//!   blocks the dependent.
//! - `one_success` predecessors: the dependent becomes ready as soon
//!   as one predecessor completes successfully; remaining
//!   predecessors are no-ops once it fires. If every predecessor
//!   ends up failed, the dependent is permanently blocked.

#![allow(dead_code)]

use std::collections::{BTreeMap, BTreeSet};

use thiserror::Error;

use crate::pipe::{Pipe, TriggerRule};

/// Lifecycle of a node inside one run.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeState {
    /// Predecessors not yet satisfied (or not yet dispatched).
    Pending,
    /// Returned by `ready()` and not yet marked.
    Running,
    /// Subprocess returned successfully (or loop terminated on its
    /// `until` sentinel).
    Completed,
    /// Subprocess returned non-zero / timed out / hit max_iterations
    /// / tripped a gate.
    Failed,
    /// A predecessor failed in a way that prevents this node from
    /// ever being dispatched. Distinct from `Failed` so the runner
    /// can attribute the cascade in `pipe.aborted` reasoning.
    Blocked,
}

/// What `mark` was told about a node's outcome.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeOutcome {
    Completed,
    Failed,
}

#[derive(Debug, Error)]
pub enum SchedulerError {
    #[error("scheduler does not know node `{id}`")]
    UnknownNode { id: String },

    #[error("node `{id}` is not Running (state: {state:?}); cannot mark outcome")]
    NotRunning { id: String, state: NodeState },
}

/// State machine over a parsed pipe. Construct once per run; thread
/// `ready()` / `mark()` calls through the dispatcher loop.
pub struct Scheduler {
    /// Stable iteration order — `ready()` returns ids sorted so
    /// dispatch ordering is deterministic across runs.
    order: Vec<String>,
    states: BTreeMap<String, NodeState>,
    /// `predecessors[id]` = ids that must complete before `id` can run.
    predecessors: BTreeMap<String, Vec<String>>,
    /// `trigger_rule[id]` = how `id`'s predecessors are combined.
    trigger_rule: BTreeMap<String, TriggerRule>,
}

impl Scheduler {
    /// Build a scheduler from a parsed pipe. Every node starts
    /// `Pending`.
    pub fn new(pipe: &Pipe) -> Self {
        let order: Vec<String> = pipe.nodes.iter().map(|n| n.id.clone()).collect();
        let states = order
            .iter()
            .map(|id| (id.clone(), NodeState::Pending))
            .collect();
        let predecessors = pipe
            .nodes
            .iter()
            .map(|n| (n.id.clone(), n.depends_on.clone()))
            .collect();
        let trigger_rule = pipe
            .nodes
            .iter()
            .map(|n| (n.id.clone(), n.trigger_rule))
            .collect();
        Self {
            order,
            states,
            predecessors,
            trigger_rule,
        }
    }

    /// Current state of `id`, or `None` if unknown.
    pub fn state(&self, id: &str) -> Option<NodeState> {
        self.states.get(id).copied()
    }

    /// All node ids in the DAG (insertion order from the pipe).
    pub fn all_ids(&self) -> &[String] {
        &self.order
    }

    /// Ids currently dispatchable: `Pending` nodes whose
    /// predecessors satisfy their `trigger_rule`. Idempotent —
    /// promotes to `Running` and returns the freshly-promoted set.
    /// Sorted for determinism.
    pub fn ready(&mut self) -> Vec<String> {
        let mut ready_ids = Vec::new();
        for id in &self.order {
            if self.states.get(id) != Some(&NodeState::Pending) {
                continue;
            }
            if !self.predecessors_satisfied(id) {
                continue;
            }
            ready_ids.push(id.clone());
        }
        for id in &ready_ids {
            self.states.insert(id.clone(), NodeState::Running);
        }
        ready_ids
    }

    /// Record the outcome of a `Running` node and return the ids
    /// that just became unblocked (whether ready-to-dispatch or
    /// permanently blocked is a follow-up `ready()` / `state()`
    /// inspection).
    ///
    /// Returns `Err` if `id` is unknown or not currently `Running`.
    pub fn mark(&mut self, id: &str, outcome: NodeOutcome) -> Result<Vec<String>, SchedulerError> {
        let current = self
            .states
            .get(id)
            .copied()
            .ok_or_else(|| SchedulerError::UnknownNode { id: id.to_string() })?;
        if current != NodeState::Running {
            return Err(SchedulerError::NotRunning {
                id: id.to_string(),
                state: current,
            });
        }
        let new_state = match outcome {
            NodeOutcome::Completed => NodeState::Completed,
            NodeOutcome::Failed => NodeState::Failed,
        };
        self.states.insert(id.to_string(), new_state);

        // Cascade: any Pending node that depends on `id` may now be
        // ready (predecessors satisfied) or blocked (an `all_success`
        // predecessor failed, or every `one_success` predecessor
        // exhausted itself failing).
        let mut newly_unblocked = BTreeSet::new();
        for (dep_id, deps) in &self.predecessors {
            if !deps.iter().any(|d| d == id) {
                continue;
            }
            if self.states.get(dep_id) != Some(&NodeState::Pending) {
                continue;
            }
            if self.permanently_blocked(dep_id) {
                self.states.insert(dep_id.clone(), NodeState::Blocked);
                newly_unblocked.insert(dep_id.clone());
            } else if self.predecessors_satisfied(dep_id) {
                newly_unblocked.insert(dep_id.clone());
            }
        }
        Ok(newly_unblocked.into_iter().collect())
    }

    fn predecessors_satisfied(&self, id: &str) -> bool {
        let Some(deps) = self.predecessors.get(id) else {
            return false;
        };
        if deps.is_empty() {
            return true;
        }
        let rule = self
            .trigger_rule
            .get(id)
            .copied()
            .unwrap_or(TriggerRule::AllSuccess);
        match rule {
            TriggerRule::AllSuccess => deps
                .iter()
                .all(|d| self.states.get(d) == Some(&NodeState::Completed)),
            TriggerRule::OneSuccess => deps
                .iter()
                .any(|d| self.states.get(d) == Some(&NodeState::Completed)),
        }
    }

    /// True iff no future outcome can satisfy `id`'s `trigger_rule`.
    /// `all_success`: any predecessor `Failed` or `Blocked` poisons
    /// the dependent. `one_success`: every predecessor must end in a
    /// non-success state for the dependent to be permanently blocked.
    fn permanently_blocked(&self, id: &str) -> bool {
        let Some(deps) = self.predecessors.get(id) else {
            return false;
        };
        if deps.is_empty() {
            return false;
        }
        let rule = self
            .trigger_rule
            .get(id)
            .copied()
            .unwrap_or(TriggerRule::AllSuccess);
        match rule {
            TriggerRule::AllSuccess => deps.iter().any(|d| {
                matches!(
                    self.states.get(d),
                    Some(NodeState::Failed) | Some(NodeState::Blocked)
                )
            }),
            TriggerRule::OneSuccess => deps.iter().all(|d| {
                matches!(
                    self.states.get(d),
                    Some(NodeState::Failed) | Some(NodeState::Blocked)
                )
            }),
        }
    }

    /// True when no more progress is possible: every node is
    /// `Completed`, `Failed`, or `Blocked`.
    pub fn is_terminal(&self) -> bool {
        self.states.values().all(|s| {
            matches!(
                s,
                NodeState::Completed | NodeState::Failed | NodeState::Blocked
            )
        })
    }
}