#![allow(dead_code)]
use std::collections::{BTreeMap, BTreeSet};
use thiserror::Error;
use crate::pipe::{Pipe, TriggerRule};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeState {
Pending,
Running,
Completed,
Failed,
Blocked,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum NodeOutcome {
Completed,
Failed,
}
#[derive(Debug, Error)]
pub enum SchedulerError {
#[error("scheduler does not know node `{id}`")]
UnknownNode { id: String },
#[error("node `{id}` is not Running (state: {state:?}); cannot mark outcome")]
NotRunning { id: String, state: NodeState },
}
pub struct Scheduler {
order: Vec<String>,
states: BTreeMap<String, NodeState>,
predecessors: BTreeMap<String, Vec<String>>,
trigger_rule: BTreeMap<String, TriggerRule>,
}
impl Scheduler {
pub fn new(pipe: &Pipe) -> Self {
let order: Vec<String> = pipe.nodes.iter().map(|n| n.id.clone()).collect();
let states = order
.iter()
.map(|id| (id.clone(), NodeState::Pending))
.collect();
let predecessors = pipe
.nodes
.iter()
.map(|n| (n.id.clone(), n.depends_on.clone()))
.collect();
let trigger_rule = pipe
.nodes
.iter()
.map(|n| (n.id.clone(), n.trigger_rule))
.collect();
Self {
order,
states,
predecessors,
trigger_rule,
}
}
pub fn state(&self, id: &str) -> Option<NodeState> {
self.states.get(id).copied()
}
pub fn all_ids(&self) -> &[String] {
&self.order
}
pub fn ready(&mut self) -> Vec<String> {
let mut ready_ids = Vec::new();
for id in &self.order {
if self.states.get(id) != Some(&NodeState::Pending) {
continue;
}
if !self.predecessors_satisfied(id) {
continue;
}
ready_ids.push(id.clone());
}
for id in &ready_ids {
self.states.insert(id.clone(), NodeState::Running);
}
ready_ids
}
pub fn mark(&mut self, id: &str, outcome: NodeOutcome) -> Result<Vec<String>, SchedulerError> {
let current = self
.states
.get(id)
.copied()
.ok_or_else(|| SchedulerError::UnknownNode { id: id.to_string() })?;
if current != NodeState::Running {
return Err(SchedulerError::NotRunning {
id: id.to_string(),
state: current,
});
}
let new_state = match outcome {
NodeOutcome::Completed => NodeState::Completed,
NodeOutcome::Failed => NodeState::Failed,
};
self.states.insert(id.to_string(), new_state);
let mut newly_unblocked = BTreeSet::new();
for (dep_id, deps) in &self.predecessors {
if !deps.iter().any(|d| d == id) {
continue;
}
if self.states.get(dep_id) != Some(&NodeState::Pending) {
continue;
}
if self.permanently_blocked(dep_id) {
self.states.insert(dep_id.clone(), NodeState::Blocked);
newly_unblocked.insert(dep_id.clone());
} else if self.predecessors_satisfied(dep_id) {
newly_unblocked.insert(dep_id.clone());
}
}
Ok(newly_unblocked.into_iter().collect())
}
fn predecessors_satisfied(&self, id: &str) -> bool {
let Some(deps) = self.predecessors.get(id) else {
return false;
};
if deps.is_empty() {
return true;
}
let rule = self
.trigger_rule
.get(id)
.copied()
.unwrap_or(TriggerRule::AllSuccess);
match rule {
TriggerRule::AllSuccess => deps
.iter()
.all(|d| self.states.get(d) == Some(&NodeState::Completed)),
TriggerRule::OneSuccess => deps
.iter()
.any(|d| self.states.get(d) == Some(&NodeState::Completed)),
}
}
fn permanently_blocked(&self, id: &str) -> bool {
let Some(deps) = self.predecessors.get(id) else {
return false;
};
if deps.is_empty() {
return false;
}
let rule = self
.trigger_rule
.get(id)
.copied()
.unwrap_or(TriggerRule::AllSuccess);
match rule {
TriggerRule::AllSuccess => deps.iter().any(|d| {
matches!(
self.states.get(d),
Some(NodeState::Failed) | Some(NodeState::Blocked)
)
}),
TriggerRule::OneSuccess => deps.iter().all(|d| {
matches!(
self.states.get(d),
Some(NodeState::Failed) | Some(NodeState::Blocked)
)
}),
}
}
pub fn is_terminal(&self) -> bool {
self.states.values().all(|s| {
matches!(
s,
NodeState::Completed | NodeState::Failed | NodeState::Blocked
)
})
}
}