pargraph 0.2.0

Operator based parallel graph processing.
Documentation
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later

#[test]
fn example_atomic() {
    use pargraph::prelude::*;
    use petgraph::data::DataMap;
    use petgraph::graph::DiGraph;
    use petgraph::visit::*;
    use std::sync::atomic::AtomicU32;

    struct NodeData {
        distance: AtomicU32,
    }
    impl NodeData {
        fn new(distance: u32) -> Self {
            Self {
                distance: AtomicU32::new(distance),
            }
        }
    }

    // Create a graph like:
    //    src
    //     |
    //     a
    //   /  \
    //  b   c
    //  \  /
    //   d
    let mut g = DiGraph::new();

    // Helper function for creating new nodes with default node data.
    // Initialize the distance to the maximum value.
    let mut new_node = || g.add_node(NodeData::new(u32::MAX));

    // Create some new nodes.
    let [src, a, b, c, d] = [(); 5].map(|_| new_node());

    // Add some edges with weights.
    g.add_edge(src, a, 1);
    g.add_edge(a, b, 1);
    g.add_edge(a, c, 1);
    g.add_edge(c, d, 2);
    g.add_edge(b, d, 1);

    let operator = AtomicDistanceLabelling { source_node: src };

    let executor = MultiThreadExecutor::new();

    // Create a worklist and add the source node.
    let wl = FifoWorklist::new_with_local_queues(vec![src].into());
    executor.run_readonly(wl, &operator, &g);

    let get_distance = |n: petgraph::graph::NodeIndex| -> u32 {
        g.node_weight(n)
            .unwrap()
            .distance
            .load(std::sync::atomic::Ordering::Relaxed)
    };

    // Check the distances.
    assert_eq!(get_distance(src), 0);
    assert_eq!(get_distance(a), 1);
    assert_eq!(get_distance(b), 2);
    assert_eq!(get_distance(c), 2);
    assert_eq!(get_distance(d), 3);

    /// This is our operator.
    struct AtomicDistanceLabelling<NodeId> {
        /// The operator needs to recognize the source node.
        source_node: NodeId,
    }

    impl<G> ReadonlyOperator<G> for &AtomicDistanceLabelling<G::NodeId>
    where
        G: GraphBase + IntoEdgesDirected,
        G: DataMap<NodeWeight = NodeData, EdgeWeight = u32>,
    {
        type WorkItem = G::NodeId;

        fn op(
            &self,
            active_node: Self::WorkItem,
            local_view: LocalGraphView<&G>,
            mut worklist: impl WorklistPush<Self::WorkItem>,
        ) {
            let input_edges = local_view.edges_directed(active_node, petgraph::Direction::Incoming);

            let output_nodes =
                local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);

            let min_distance = if active_node == self.source_node {
                // By definition the source node has distance 0.
                Some(0)
            } else {
                // Compute the smallest distance to the source via the input edges.
                input_edges
                    .map(|e| {
                        // Get minimal distance to source at the input node.
                        let d = local_view
                            .node_weight(e.source())
                            .unwrap()
                            .distance
                            .load(std::sync::atomic::Ordering::Relaxed);
                        // Add edge weight
                        d + *e.weight()
                    })
                    // Take minimal value
                    .min()
            };

            if let Some(min_distance) = min_distance {
                // Atomically store the minimal distance value.
                let prev_min_distance = local_view
                    .node_weight(active_node)
                    .unwrap()
                    .distance
                    .fetch_min(min_distance, std::sync::atomic::Ordering::Relaxed);

                if prev_min_distance != min_distance {
                    // Minimal distance has improved, need to update the output nodes.
                    output_nodes.for_each(|n| worklist.push(n));
                }
            }
        }
    }
}