#[test]
fn example_atomic() {
use pargraph::prelude::*;
use petgraph::data::DataMap;
use petgraph::graph::DiGraph;
use petgraph::visit::*;
use std::sync::atomic::AtomicU32;
struct NodeData {
distance: AtomicU32,
}
impl NodeData {
fn new(distance: u32) -> Self {
Self {
distance: AtomicU32::new(distance),
}
}
}
let mut g = DiGraph::new();
let mut new_node = || g.add_node(NodeData::new(u32::MAX));
let [src, a, b, c, d] = [(); 5].map(|_| new_node());
g.add_edge(src, a, 1);
g.add_edge(a, b, 1);
g.add_edge(a, c, 1);
g.add_edge(c, d, 2);
g.add_edge(b, d, 1);
let operator = AtomicDistanceLabelling { source_node: src };
let executor = MultiThreadExecutor::new();
let wl = FifoWorklist::new_with_local_queues(vec![src].into());
executor.run_readonly(wl, &operator, &g);
let get_distance = |n: petgraph::graph::NodeIndex| -> u32 {
g.node_weight(n)
.unwrap()
.distance
.load(std::sync::atomic::Ordering::Relaxed)
};
assert_eq!(get_distance(src), 0);
assert_eq!(get_distance(a), 1);
assert_eq!(get_distance(b), 2);
assert_eq!(get_distance(c), 2);
assert_eq!(get_distance(d), 3);
struct AtomicDistanceLabelling<NodeId> {
source_node: NodeId,
}
impl<G> ReadonlyOperator<G> for &AtomicDistanceLabelling<G::NodeId>
where
G: GraphBase + IntoEdgesDirected,
G: DataMap<NodeWeight = NodeData, EdgeWeight = u32>,
{
type WorkItem = G::NodeId;
fn op(
&self,
active_node: Self::WorkItem,
local_view: LocalGraphView<&G>,
mut worklist: impl WorklistPush<Self::WorkItem>,
) {
let input_edges = local_view.edges_directed(active_node, petgraph::Direction::Incoming);
let output_nodes =
local_view.neighbors_directed(active_node, petgraph::Direction::Outgoing);
let min_distance = if active_node == self.source_node {
Some(0)
} else {
input_edges
.map(|e| {
let d = local_view
.node_weight(e.source())
.unwrap()
.distance
.load(std::sync::atomic::Ordering::Relaxed);
d + *e.weight()
})
.min()
};
if let Some(min_distance) = min_distance {
let prev_min_distance = local_view
.node_weight(active_node)
.unwrap()
.distance
.fetch_min(min_distance, std::sync::atomic::Ordering::Relaxed);
if prev_min_distance != min_distance {
output_nodes.for_each(|n| worklist.push(n));
}
}
}
}
}