pargraph 0.2.0

Operator based parallel graph processing.
Documentation
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

#[test]
#[cfg(not(loom))]
fn test_multithread_executor() {
    use pargraph::prelude::*;
    use petgraph::data::DataMap;
    use petgraph::graph::{DiGraph, NodeIndex};
    use petgraph::visit::GraphBase;
    use petgraph::visit::IntoEdgesDirected;
    use petgraph::visit::IntoNeighborsDirected;
    use petgraph::visit::{Data, EdgeRef};

    struct NodeData {
        /// Minimal distance to the source node.
        distance: i32,
        /// Next hop in the shortest path to the source node.
        prev: Option<NodeIndex>,
    }

    impl Default for NodeData {
        fn default() -> Self {
            Self {
                distance: i32::MAX,
                prev: None,
            }
        }
    }

    // Create a graph like:
    //     a
    //   /  \
    //  b   c
    //  \  /
    //   d
    let mut g = DiGraph::new();

    // Helper function for creating new nodes.
    // Note that the node data is wrapped in a `DataCell`. `DataCell`
    // implements interior mutability and a sort of read-write lock for multi-threaded access.
    let mut new_node = || g.add_node(DataCell::new(NodeData::default()));

    let [a, b, c, d] = [(); 4].map(|_| new_node());

    g.add_edge(a, b, 1);
    g.add_edge(a, c, 1);
    g.add_edge(c, d, 2);
    g.add_edge(b, d, 1);

    let operator = DistanceLabellingOp {};

    let executor = MultiThreadExecutor::new();

    let wl = FifoWorklist::new_global_queue(vec![a].into()); // A priority queue would be more efficient.
    executor.run_node_labelling(wl, &operator, &g);

    let get_distance = |n: petgraph::graph::NodeIndex| -> i32 {
        g.node_weight(n).unwrap().try_read().unwrap().distance
    };

    assert_eq!(get_distance(a), 0);
    assert_eq!(get_distance(b), 1);
    assert_eq!(get_distance(c), 1);
    assert_eq!(get_distance(d), 2);

    // Initialize the data of the source node.
    //g.node_weight_mut(a).unwrap().get_mut().prev = Some(a);

    // Operator which computes for all nodes the distance to the initial source node.
    struct DistanceLabellingOp {}

    impl<G> LabellingOperator<G> for &DistanceLabellingOp
    where
        G: GraphBase<NodeId = NodeIndex>
            + IntoEdgesDirected
            + IntoNeighborsDirected
            + Data<NodeWeight = DataCell<NodeData>, EdgeWeight = i32>
            + DataMap,
        G::NodeId: std::fmt::Debug,
    {
        type NodeWeight = NodeData;

        type WorkItem = G::NodeId;

        fn op(
            &self,
            active_node: Self::WorkItem,
            local_view: LocalGraphView<&G, FullConflictDetection>,
            node_data: &mut NodeData,
            mut worklist: impl WorklistPush<Self::WorkItem>,
        ) -> Result<(), DataConflictErr<G::NodeId, G::EdgeId>> {
            // Acquire read access to all incoming neighbour nodes.
            // Use SmallVec instead of a Vec could help avoiding allocations.
            let neighbors_with_data: Result<Vec<(_, _)>, _> = local_view
                .edges_directed(active_node, petgraph::Direction::Incoming)
                .map(|e| {
                    local_view
                        .try_node_weight(e.source())
                        .expect("node has no data")
                        // Associate the edge with the node data of the other node.
                        .map(|d| (e, d))
                })
                .collect();

            // Abort on data conflicts. This node will be rescheduled.
            let neighbors_with_data = neighbors_with_data?;

            // Find the neighbour which provides the shortest path to the source.
            let closest_neighbor = neighbors_with_data
                .into_iter()
                .filter(|(_e, data)| {
                    // Take only initialized nodes.
                    data.prev.is_some()
                })
                // compute distances
                .map(|(e, data)| (e.source(), data.distance + e.weight()))
                // find the closest
                .min_by_key(|(_, dist)| *dist);

            if let Some((via, distance_to_source)) = closest_neighbor {
                if distance_to_source < node_data.distance {
                    // Update distance.
                    node_data.distance = distance_to_source;
                    node_data.prev = Some(via);

                    // Expand wavefront to other neighbors.
                    local_view
                        .neighbors_directed(active_node, petgraph::Direction::Outgoing)
                        .filter(|n| n != &via)
                        .for_each(|n| worklist.push(n));
                } else {
                    // No improvement.
                }
            } else {
                // No neighbour found with initialized distance. This must be the first node.
                node_data.distance = 0;
                node_data.prev = Some(active_node);

                // Expand wavefront to unprocessed neighbors.
                local_view
                    .neighbors_directed(active_node, petgraph::Direction::Outgoing)
                    .for_each(|n| worklist.push(n));
            };

            Ok(())
        }
    }
}