1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// SPDX-FileCopyrightText: 2023 Thomas Kramer <code@tkramer.ch>
//
// SPDX-License-Identifier: GPL-3.0-or-later
#[test]
#[cfg(not(loom))]
fn test_multithread_executor() {
use pargraph::prelude::*;
use petgraph::data::DataMap;
use petgraph::graph::{DiGraph, NodeIndex};
use petgraph::visit::GraphBase;
use petgraph::visit::IntoEdgesDirected;
use petgraph::visit::IntoNeighborsDirected;
use petgraph::visit::{Data, EdgeRef};
struct NodeData {
/// Minimal distance to the source node.
distance: i32,
/// Next hop in the shortest path to the source node.
prev: Option<NodeIndex>,
}
impl Default for NodeData {
fn default() -> Self {
Self {
distance: i32::MAX,
prev: None,
}
}
}
// Create a graph like:
// a
// / \
// b c
// \ /
// d
let mut g = DiGraph::new();
// Helper function for creating new nodes.
// Note that the node data is wrapped in a `DataCell`. `DataCell`
// implements interior mutability and a sort of read-write lock for multi-threaded access.
let mut new_node = || g.add_node(DataCell::new(NodeData::default()));
let [a, b, c, d] = [(); 4].map(|_| new_node());
g.add_edge(a, b, 1);
g.add_edge(a, c, 1);
g.add_edge(c, d, 2);
g.add_edge(b, d, 1);
let operator = DistanceLabellingOp {};
let executor = MultiThreadExecutor::new();
let wl = FifoWorklist::new_global_queue(vec![a].into()); // A priority queue would be more efficient.
executor.run_node_labelling(wl, &operator, &g);
let get_distance = |n: petgraph::graph::NodeIndex| -> i32 {
g.node_weight(n).unwrap().try_read().unwrap().distance
};
assert_eq!(get_distance(a), 0);
assert_eq!(get_distance(b), 1);
assert_eq!(get_distance(c), 1);
assert_eq!(get_distance(d), 2);
// Initialize the data of the source node.
//g.node_weight_mut(a).unwrap().get_mut().prev = Some(a);
// Operator which computes for all nodes the distance to the initial source node.
struct DistanceLabellingOp {}
impl<G> LabellingOperator<G> for &DistanceLabellingOp
where
G: GraphBase<NodeId = NodeIndex>
+ IntoEdgesDirected
+ IntoNeighborsDirected
+ Data<NodeWeight = DataCell<NodeData>, EdgeWeight = i32>
+ DataMap,
G::NodeId: std::fmt::Debug,
{
type NodeWeight = NodeData;
type WorkItem = G::NodeId;
fn op(
&self,
active_node: Self::WorkItem,
local_view: LocalGraphView<&G, FullConflictDetection>,
node_data: &mut NodeData,
mut worklist: impl WorklistPush<Self::WorkItem>,
) -> Result<(), DataConflictErr<G::NodeId, G::EdgeId>> {
// Acquire read access to all incoming neighbour nodes.
// Use SmallVec instead of a Vec could help avoiding allocations.
let neighbors_with_data: Result<Vec<(_, _)>, _> = local_view
.edges_directed(active_node, petgraph::Direction::Incoming)
.map(|e| {
local_view
.try_node_weight(e.source())
.expect("node has no data")
// Associate the edge with the node data of the other node.
.map(|d| (e, d))
})
.collect();
// Abort on data conflicts. This node will be rescheduled.
let neighbors_with_data = neighbors_with_data?;
// Find the neighbour which provides the shortest path to the source.
let closest_neighbor = neighbors_with_data
.into_iter()
.filter(|(_e, data)| {
// Take only initialized nodes.
data.prev.is_some()
})
// compute distances
.map(|(e, data)| (e.source(), data.distance + e.weight()))
// find the closest
.min_by_key(|(_, dist)| *dist);
if let Some((via, distance_to_source)) = closest_neighbor {
if distance_to_source < node_data.distance {
// Update distance.
node_data.distance = distance_to_source;
node_data.prev = Some(via);
// Expand wavefront to other neighbors.
local_view
.neighbors_directed(active_node, petgraph::Direction::Outgoing)
.filter(|n| n != &via)
.for_each(|n| worklist.push(n));
} else {
// No improvement.
}
} else {
// No neighbour found with initialized distance. This must be the first node.
node_data.distance = 0;
node_data.prev = Some(active_node);
// Expand wavefront to unprocessed neighbors.
local_view
.neighbors_directed(active_node, petgraph::Direction::Outgoing)
.for_each(|n| worklist.push(n));
};
Ok(())
}
}
}