parsli 0.1.4

Parallel status lines for Rust
Documentation
use std::collections::HashMap;
use std::hash::Hash;

use petgraph::algo::toposort;
use petgraph::Direction;
use petgraph::dot::{Config, Dot};
use petgraph::graph::NodeIndex;
use petgraph::stable_graph::StableDiGraph;
use petgraph::visit::{Control, depth_first_search, DfsEvent, NodeRef, Visitable, VisitMap};

use crate::dep::state::State;
use crate::task::TaskId;

/// Graph with support for resolving dependencies
pub(crate) struct Tree<K> {
    inner: StableDiGraph<State, (), TaskId>,
    to_id: HashMap<K, TaskId>,
}

impl<K> Tree<K> where K: Clone + Eq + Hash {
    pub(crate) fn new() -> Self {
        Self { inner: Default::default(), to_id: Default::default() }
    }

    /// Add a node from a weight or return the existing one
    pub(crate) fn find_or_add_node(&mut self, k: K) -> TaskId {
        *self.to_id.entry(k).or_insert_with(|| self.inner.add_node(State::Waiting).index())
    }

    /// Add an edge from two weights or return the existing one
    pub(crate) fn find_or_add_edge(&mut self, k0: K, k1: K) -> TaskId {
        let node0 = NodeIndex::from(self.find_or_add_node(k0));
        let node1 = NodeIndex::from(self.find_or_add_node(k1));
        self.inner.find_edge(node0, node1).unwrap_or_else(|| self.inner.add_edge(node0, node1, ())).index()
    }

    /// Removes all nodes that are not depended on by the specified weights
    pub(crate) fn retain_dependencies(&mut self, ks: Vec<K>) {
        // Traverse dep to find required nodes
        let mut vm = self.inner.visit_map();
        for w in ks {
            let node = NodeIndex::from(self.find_or_add_node(w));
            depth_first_search(&self.inner, Some(node), |event| {
                if let DfsEvent::Discover(n, _time) = event {
                    if vm.is_visited(&n) { return Control::<NodeIndex>::Prune; }
                    vm.visit(n);
                }
                Control::<NodeIndex>::Continue
            });
        }
        // Remove unvisited nodes
        self.inner.retain_nodes(|_fg, n| vm.is_visited(&n));
    }

    /// Get the first node that has no dependencies
    /// It will block the thread if there are nodes waiting
    pub(crate) fn find_next_node(&self) -> Result<Option<TaskId>, ()> {
        if self.inner.node_count() == 0 { return Err(()); }
        // Check if there are any waiting nodes
        let waiting_map = |_, s: &State| (*s == State::Waiting).then_some(*s);
        let waiting_nodes = self.inner.filter_map(waiting_map, |_, e| Some(e));
        if waiting_nodes.node_count() == 0 { return Err(()); }
        // Get all nodes with finished dependencies
        let ready_map = |i: NodeIndex<TaskId>, s: &State| {
            let mut neighbors = self.inner.neighbors_directed(i, Direction::Outgoing);
            neighbors.all(|i| self.inner[i] == State::Success).then_some(*s)
        };
        let ready_nodes = waiting_nodes.filter_map(ready_map, |_, e| Some(e));
        // If there are no nodes ready yet, block this thread
        if ready_nodes.node_count() == 0 { return Ok(None); }
        // Topologically sort the waiting nodes and get the last node (no dependencies)
        Ok(toposort(&ready_nodes, None).map_err(|_| ())?.last().map(|ni| TaskId::from(ni.index())))
    }

    /// Get dependencies
    pub(crate) fn children(&self, id: TaskId) -> Vec<TaskId> {
        self.inner.neighbors_directed(NodeIndex::from(id), Direction::Outgoing).map(|n| n.index()).collect()
    }

    /// Get reverse dependencies
    pub(crate) fn parents(&self, id: TaskId) -> Vec<TaskId> {
        self.inner.neighbors_directed(NodeIndex::from(id), Direction::Incoming).map(|n| n.index()).collect()
    }

    /// Finalize node
    pub(crate) fn update_node(&mut self, id: TaskId, state: State) {
        let node = NodeIndex::from(id);
        // Update state of the node
        self.inner[node] = state;
        // In case a node failed, all the nodes depending on it can't be executed
        if state == State::Failure {
            // Iterate reverse dependencies
            for id in self.parents(node.index()) {
                self.update_node(id, state);
            }
        }
    }
}

impl<K> ToString for Tree<K> {
    /// Render this graph to a dot representation
    fn to_string(&self) -> String {
        let dot = Dot::with_attr_getters(
            &self.inner,
            &[Config::NodeIndexLabel, Config::EdgeNoLabel],
            &|_g, _e| "".to_string(),
            &|_g, n| format!("color={}", n.weight().color()),
        );
        format!("{:?}", dot)
    }
}