#![doc = include_str!("README.md")]
mod builder;
mod display;
mod linked;
mod state;
mod walk;
pub use self::builder::Builder;
pub use self::builder::Error as BuildError;
use self::linked::Stack;
use self::state::State;
use crate::{
execution::repository::{Execution, JobRepository},
job::{Code, JobId},
};
use daggy::{Dag, EdgeIndex, NodeIndex, Walker};
use log::warn;
use petgraph::{dot::Dot, visit::GraphBase, Direction};
use std::collections::{BTreeMap, BTreeSet};
#[derive(Debug, Default)]
pub struct Graph {
dag: Dag<State, Edge, usize>,
}
#[derive(Debug, Default)]
enum Relation {
#[default]
Trigger,
Dependency,
}
#[derive(Debug, Default)]
struct Edge {
pub source: Box<str>,
pub origin: JobId,
pub reference: JobId,
pub relation: Relation,
pub codes: Vec<Code>,
}
impl JobRepository for Graph {
fn execute(&mut self, goals: &[Box<str>]) -> Vec<Execution> {
self.get_executions(goals)
}
fn select(&self, goals: &[Box<str>]) -> Vec<JobId> {
self.tree(goals).into_iter().map(|(_, j, _)| j).collect()
}
}
impl Graph {
pub fn builder() -> Builder {
Builder::default()
}
fn get_executions(&mut self, goals: &[Box<str>]) -> Vec<Execution> {
self.update();
let goals = self.get_goals(goals);
let outstanding = self.get_outstanding(goals);
let required = self.dag.graph().filter_map(
|idx, _node| {
outstanding
.contains_key(&idx)
.then(|| (idx.index(), _node.job.id.to_string().into_boxed_str()))
},
|idx, edge| {
if self.passed(idx).unwrap_or_default() {
None
} else {
Some(edge.to_string())
}
},
);
log::trace!("outstanding:\n{:#?}", outstanding);
log::debug!("required:\n{:?}", &Dot::new(&required));
let due = required
.externals(Direction::Incoming)
.map(|idx| NodeIndex::new(required[idx].0))
.filter_map(move |id| {
let state = &mut self.dag[id];
if state.get_status().is_some() {
log::debug!(
"skipped completed {:?}: {}",
state.get_status(),
state.job.id
);
None
} else if state.is_running() {
log::debug!("skipped running {:?}: {}", state.get_status(), state.job.id);
None
} else {
Some(state.execute())
}
})
.collect();
due
}
const DEFAULT_PASS_CODE: [Code; 1] = [Code::Explicit(0)];
fn passed(&self, edge: EdgeIndex<usize>) -> Option<bool> {
let node = self.dag.raw_edges()[edge.index()].source();
let exit = self.dag[node].get_status()?;
let mut codes = self.dag[edge].codes.as_slice();
if codes.is_empty() {
codes = &Self::DEFAULT_PASS_CODE;
}
Some(codes.iter().any(|c| c.matches(exit)))
}
fn is_trigger(&self, edge: EdgeIndex<usize>) -> bool {
match self.dag[edge].relation {
Relation::Trigger => true,
Relation::Dependency => false,
}
}
fn update(&mut self) {
for state in self.dag.node_weights_mut() {
for (_outcome, receiver, history) in &mut state.history {
while let Ok(outcome) = receiver.try_recv() {
history.push(outcome);
}
}
}
}
fn get_goals(&self, goals: &[Box<str>]) -> BTreeSet<NodeIndex<usize>> {
if goals.is_empty() {
self.dag.graph().externals(Direction::Outgoing).collect()
} else {
let mut goals = goals
.iter()
.cloned()
.map(|name| JobId {
name,
namespace: vec![],
})
.collect::<BTreeSet<_>>();
let found = self
.dag
.raw_nodes()
.iter()
.enumerate()
.filter_map(|(i, n)| goals.remove(&n.weight.job.id).then_some(i))
.map(NodeIndex::new)
.collect::<BTreeSet<_>>();
if !goals.is_empty() {
warn!("Some goals were not found: {goals:?} - we shall go ahead, clients may want to check ahead of time if their goals are reachable");
}
found
}
}
fn get_outstanding(
&self,
goals: impl IntoIterator<Item = NodeIndex<usize>>,
) -> BTreeMap<NodeIndex<usize>, Vec<Stack<EdgeIndex<usize>>>> {
let walk = walk::Recurse::new(goals, Self::find_outstanding);
let mut outstanding = BTreeMap::new();
for (node, path) in walk.iter(self).collect::<BTreeMap<_, _>>() {
outstanding.entry(node).or_insert_with(Vec::new).push(path);
}
outstanding
}
fn find_outstanding(
&self,
n: NodeIndex<usize>,
state: &Stack<EdgeIndex<usize>>,
) -> Option<Vec<OutstandingFound>> {
use petgraph::visit::EdgeRef as _;
use petgraph::visit::IntoEdgesDirected as _;
let is_goal = state.iter().next().is_none();
let is_completed = self.dag[n].get_status().is_some();
if is_goal && is_completed {
return None;
}
let mut gate_open = false;
let mut gate_closed = false;
let mut triggered = false;
let mut require = vec![];
for e in self.dag.edges_directed(n, petgraph::Direction::Incoming) {
let passed = self.passed(e.id()).unwrap_or_default();
let gated = !self.is_trigger(e.id());
if passed {
if gated {
gate_open = true;
continue;
} else {
triggered = true;
break;
}
} else if gated {
gate_closed = true;
}
require.push((e.source(), state + e.id()));
}
if triggered || (gate_open && !gate_closed) {
Some(vec![])
} else {
Some(require)
}
}
pub fn tree<'s>(
&'s self,
goals: &[Box<str>],
) -> impl 's + IntoIterator<Item = (usize, JobId, String)> {
let goals = self.get_goals(goals);
let walk = walk::Recurse::new(goals, Self::find_outstanding).depth_first();
walk.iter(self).map(|(node, path)| {
(
path.iter().count(),
self.dag[node].job.id.clone(),
path.head()
.map(|edge| self.dag[*edge].to_string())
.unwrap_or_default()
.replace("\n", ", "),
)
})
}
}
type OutstandingFound = (NodeIndex<usize>, Stack<EdgeIndex<usize>>);
impl GraphBase for Graph {
type EdgeId = EdgeIndex<usize>;
type NodeId = NodeIndex<usize>;
}
#[test]
fn routing() {
let mut graph = Graph::default();
let a = graph.dag.add_node(State::default());
let b: NodeIndex<usize> = graph.dag.add_node(State::default());
let c: NodeIndex<usize> = graph.dag.add_node(State::default());
graph
.dag
.add_edge(
a,
b,
Edge {
codes: vec![Code::Explicit(0)],
..Default::default()
},
)
.expect("a-b");
graph
.dag
.add_edge(
b,
c,
Edge {
codes: vec![Code::Explicit(3)],
..Default::default()
},
)
.expect("b-c");
use petgraph::visit::EdgeRef as _;
use petgraph::visit::IntoEdgesDirected as _;
let mut walker = graph.dag.recursive_walk(c, |graph, node| {
println!("node: {node:?}");
for edge in graph.edges_directed(node, Direction::Incoming) {
return Some((edge.id(), edge.source()));
}
None
});
while let Some(item) = walker.next(&graph.dag) {
println!("item: {:?}", item);
}
}