willdo 0.0.1

Task manager with DAG
Documentation
#![doc = include_str!("README.md")]

mod builder;
mod display;
mod linked;
mod state;
mod walk;

pub use self::builder::Builder;
pub use self::builder::Error as BuildError;
use self::linked::Stack;
use self::state::State;
use crate::{
    execution::repository::{Execution, JobRepository},
    job::{Code, JobId},
};
use daggy::{Dag, EdgeIndex, NodeIndex, Walker};
use log::warn;
use petgraph::{dot::Dot, visit::GraphBase, Direction};
use std::collections::{BTreeMap, BTreeSet};

/// The WillDo Graph - a repository of jobs, their relations and state
#[derive(Debug, Default)]
pub struct Graph {
    dag: Dag<State, Edge, usize>,
}

#[derive(Debug, Default)]
enum Relation {
    #[default]
    Trigger,
    Dependency,
}

#[derive(Debug, Default)]
struct Edge {
    pub source: Box<str>,
    pub origin: JobId,
    pub reference: JobId,
    pub relation: Relation,
    pub codes: Vec<Code>,
}

impl JobRepository for Graph {
    fn execute(&mut self, goals: &[Box<str>]) -> Vec<Execution> {
        self.get_executions(goals)
    }
    fn select(&self, goals: &[Box<str>]) -> Vec<JobId> {
        self.tree(goals).into_iter().map(|(_, j, _)| j).collect()
    }
}

impl Graph {
    /// Create a graph builder
    pub fn builder() -> Builder {
        Builder::default()
    }

    fn get_executions(&mut self, goals: &[Box<str>]) -> Vec<Execution> {
        self.update();
        let goals = self.get_goals(goals);
        let outstanding = self.get_outstanding(goals);
        //let required = self.get_required_job_id_graph(goals);

        let required = self.dag.graph().filter_map(
            |idx, _node| {
                outstanding
                    .contains_key(&idx)
                    .then(|| (idx.index(), _node.job.id.to_string().into_boxed_str()))
            },
            |idx, edge| {
                if self.passed(idx).unwrap_or_default() {
                    None
                } else {
                    Some(edge.to_string())
                }
            },
        );

        log::trace!("outstanding:\n{:#?}", outstanding);
        log::debug!("required:\n{:?}", &Dot::new(&required));

        let due = required
            .externals(Direction::Incoming)
            .map(|idx| NodeIndex::new(required[idx].0))
            .filter_map(move |id| {
                let state = &mut self.dag[id];
                if state.get_status().is_some() {
                    log::debug!(
                        "skipped completed {:?}: {}",
                        state.get_status(),
                        state.job.id
                    );
                    None
                } else if state.is_running() {
                    log::debug!("skipped running {:?}: {}", state.get_status(), state.job.id);
                    None
                } else {
                    Some(state.execute())
                }
            })
            .collect();
        due
    }

    const DEFAULT_PASS_CODE: [Code; 1] = [Code::Explicit(0)];

    fn passed(&self, edge: EdgeIndex<usize>) -> Option<bool> {
        let node = self.dag.raw_edges()[edge.index()].source();
        let exit = self.dag[node].get_status()?;
        let mut codes = self.dag[edge].codes.as_slice();
        if codes.is_empty() {
            codes = &Self::DEFAULT_PASS_CODE;
        }
        Some(codes.iter().any(|c| c.matches(exit)))
    }

    fn is_trigger(&self, edge: EdgeIndex<usize>) -> bool {
        match self.dag[edge].relation {
            Relation::Trigger => true,
            Relation::Dependency => false,
        }
    }

    /// internal state update
    /// we could spawn this in the background async
    /// for now, the caller needs to poll this
    fn update(&mut self) {
        for state in self.dag.node_weights_mut() {
            for (_outcome, receiver, history) in &mut state.history {
                while let Ok(outcome) = receiver.try_recv() {
                    history.push(outcome);

                    // we could check the number of subscribers here to decide the "running" state
                    // if we go async
                }
            }
        }
    }

    fn get_goals(&self, goals: &[Box<str>]) -> BTreeSet<NodeIndex<usize>> {
        if goals.is_empty() {
            self.dag.graph().externals(Direction::Outgoing).collect()
        } else {
            let mut goals = goals
                .iter()
                .cloned()
                .map(|name| JobId {
                    name,
                    namespace: vec![],
                })
                .collect::<BTreeSet<_>>();
            let found = self
                .dag
                .raw_nodes()
                .iter()
                .enumerate()
                .filter_map(|(i, n)| goals.remove(&n.weight.job.id).then_some(i))
                .map(NodeIndex::new)
                .collect::<BTreeSet<_>>();
            if !goals.is_empty() {
                warn!("Some goals were not found: {goals:?} - we shall go ahead, clients may want to check ahead of time if their goals are reachable");
            }
            found
        }
    }
    fn get_outstanding(
        &self,
        goals: impl IntoIterator<Item = NodeIndex<usize>>,
    ) -> BTreeMap<NodeIndex<usize>, Vec<Stack<EdgeIndex<usize>>>> {
        let walk = walk::Recurse::new(goals, Self::find_outstanding);
        let mut outstanding = BTreeMap::new();
        for (node, path) in walk.iter(self).collect::<BTreeMap<_, _>>() {
            outstanding.entry(node).or_insert_with(Vec::new).push(path);
        }
        outstanding
    }

    fn find_outstanding(
        &self,
        n: NodeIndex<usize>,
        state: &Stack<EdgeIndex<usize>>,
    ) -> Option<Vec<OutstandingFound>> {
        use petgraph::visit::EdgeRef as _;
        use petgraph::visit::IntoEdgesDirected as _;

        let is_goal = state.iter().next().is_none();
        let is_completed = self.dag[n].get_status().is_some();
        if is_goal && is_completed {
            return None;
        }
        let mut gate_open = false;
        let mut gate_closed = false;
        let mut triggered = false;
        let mut require = vec![];
        for e in self.dag.edges_directed(n, petgraph::Direction::Incoming) {
            let passed = self.passed(e.id()).unwrap_or_default();
            let gated = !self.is_trigger(e.id());
            if passed {
                if gated {
                    gate_open = true;
                    continue;
                } else {
                    triggered = true;
                    break;
                }
            } else if gated {
                gate_closed = true;
            }
            require.push((e.source(), state + e.id()));
        }
        if triggered || (gate_open && !gate_closed) {
            Some(vec![])
        } else {
            Some(require)
        }
    }

    /// Get a list of UI friendly entries representing the job dependency tree
    pub fn tree<'s>(
        &'s self,
        goals: &[Box<str>],
    ) -> impl 's + IntoIterator<Item = (usize, JobId, String)> {
        //let mut seen = BTreeSet::new();
        let goals = self.get_goals(goals);
        let walk = walk::Recurse::new(goals, Self::find_outstanding).depth_first();
        walk.iter(self).map(|(node, path)| {
            (
                path.iter().count(),
                self.dag[node].job.id.clone(),
                path.head()
                    .map(|edge| self.dag[*edge].to_string())
                    .unwrap_or_default()
                    .replace("\n", ", "),
            )
        })
    }
}

type OutstandingFound = (NodeIndex<usize>, Stack<EdgeIndex<usize>>);

impl GraphBase for Graph {
    type EdgeId = EdgeIndex<usize>;
    type NodeId = NodeIndex<usize>;
}

#[test]
fn routing() {
    let mut graph = Graph::default();
    let a = graph.dag.add_node(State::default());
    let b: NodeIndex<usize> = graph.dag.add_node(State::default());
    let c: NodeIndex<usize> = graph.dag.add_node(State::default());
    graph
        .dag
        .add_edge(
            a,
            b,
            Edge {
                codes: vec![Code::Explicit(0)],
                ..Default::default()
            },
        )
        .expect("a-b");
    graph
        .dag
        .add_edge(
            b,
            c,
            Edge {
                codes: vec![Code::Explicit(3)],
                ..Default::default()
            },
        )
        .expect("b-c");

    use petgraph::visit::EdgeRef as _;
    use petgraph::visit::IntoEdgesDirected as _;

    let mut walker = graph.dag.recursive_walk(c, |graph, node| {
        println!("node: {node:?}");
        for edge in graph.edges_directed(node, Direction::Incoming) {
            return Some((edge.id(), edge.source()));
        }
        None
    });

    while let Some(item) = walker.next(&graph.dag) {
        println!("item: {:?}", item);
    }

    //todo!()
}