swh_graph_stdlib/labeling/
map_reduce.rs1use std::borrow::Borrow;
9
10use anyhow::{Result, bail, ensure};
11use dsi_progress_logger::{ProgressLog, progress_logger};
12use rapidhash::RapidHashMap;
13use smallvec::SmallVec;
14use swh_graph::graph::*;
15use swh_graph::graph::{NodeId, SwhForwardGraph};
16
17use super::MapReducer;
18use super::labels::{
19 DenseLabels, Labels, SparseLabels, StridableLabel, StriddenLabels, StriddenLabelsConfig,
20};
21
22pub struct MapReduceBuilder<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels> {
24 graph: G,
25 num_nodes: usize,
26 cheap_clones: bool,
27 keep_labels: bool,
28 pub map_reducer: MR,
29 labels_config: L::Config,
30}
31
32impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Sized>>
33 MapReduceBuilder<G, MR, SparseLabels<MR::Label>>
34{
35 pub fn new_sparse(graph: G, map_reducer: MR) -> Self {
39 MapReduceBuilder {
40 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
41 graph,
42 cheap_clones: false,
43 keep_labels: false,
44 map_reducer,
45 labels_config: (),
46 }
47 }
48}
49
50impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Default + Clone + Sized>>
51 MapReduceBuilder<G, MR, DenseLabels<MR::Label>>
52{
53 pub fn new_dense(graph: G, map_reducer: MR) -> Self {
64 MapReduceBuilder {
65 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
66 graph,
67 cheap_clones: false,
68 keep_labels: false,
69 map_reducer,
70 labels_config: (),
71 }
72 }
73}
74
75impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: StridableLabel>>
76 MapReduceBuilder<G, MR, StriddenLabels<MR::Label>>
77{
78 pub fn new_stridden(graph: G, map_reducer: MR, num_words: usize) -> Self
89 where
90 MR::Label: StridableLabel + ToOwned,
91 {
92 MapReduceBuilder {
93 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
94 graph,
95 cheap_clones: false,
96 keep_labels: false,
97 map_reducer,
98 labels_config: StriddenLabelsConfig { num_words },
99 }
100 }
101}
102
103impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
104 MapReduceBuilder<G, MR, L>
105{
106 pub fn num_nodes(mut self, num_nodes: usize) -> Self {
107 self.num_nodes = num_nodes;
108 self
109 }
110
111 pub fn cheap_clones(mut self, cheap_clones: bool) -> Self {
118 self.cheap_clones = cheap_clones;
119 self
120 }
121
122 pub fn keep_labels(mut self, keep_labels: bool) -> Self {
131 self.keep_labels = keep_labels;
132 self
133 }
134
135 pub fn build(self) -> Result<MapReduce<G, MR, L>> {
136 let Self {
137 graph,
138 num_nodes,
139 cheap_clones,
140 keep_labels,
141 map_reducer,
142 labels_config,
143 } = self;
144 let pop_labels = match (cheap_clones, keep_labels) {
145 (true, true) => false, (true, false) => true, (false, true) => bail!("MapReduce cannot both keep labels and avoid expensive clones"),
148 (false, false) => true, };
150 log::info!("Allocating labels...");
151 let labels = L::new(graph.num_nodes(), labels_config);
152 Ok(MapReduce {
153 graph,
154 num_nodes,
155 pop_labels,
156 map_reducer,
157 labels,
158 pending_dependents: Default::default(),
159 })
160 }
161}
162
163pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L> {
196 graph: G,
197 num_nodes: usize,
198 pop_labels: bool,
199 pub map_reducer: MR,
200 labels: L,
201 pending_dependents: RapidHashMap<NodeId, usize>,
205}
206
207impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
208 MapReduce<G, MR, L>
209{
210 pub fn run_in_topological_order(
216 &mut self,
217 nodes: impl Iterator<Item = NodeId>,
218 ) -> Result<(), MR::Error> {
219 if self.pop_labels {
220 self.run_in_topological_order_with_popped_labels(nodes)
221 } else {
222 self.run_in_topological_order_with_kept_labels(nodes)
223 }
224 }
225
226 pub fn push_node(&mut self, node: NodeId) -> Result<(), MR::Error> {
229 if self.pop_labels {
230 self.push_node_with_popped_labels(node)
231 } else {
232 self.push_node_with_kept_labels(node)
233 }
234 }
235
236 pub fn labels(&self) -> Result<&L> {
238 ensure!(
239 !self.pop_labels,
240 "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
241 );
242 Ok(&self.labels)
243 }
244
245 pub fn take_labels(self) -> Result<L> {
247 ensure!(
248 !self.pop_labels,
249 "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
250 );
251 Ok(self.labels)
252 }
253
254 fn run_in_topological_order_with_kept_labels(
257 &mut self,
258 nodes: impl Iterator<Item = NodeId>,
259 ) -> Result<(), MR::Error> {
260 let mut pl = progress_logger!(
261 display_memory = true,
262 item_name = "node",
263 local_speed = true,
264 expected_updates = Some(self.num_nodes),
265 );
266
267 pl.start("Traversing graph in topological order...");
268
269 for node in nodes {
270 pl.light_update();
271
272 self.push_node_with_kept_labels(node)?;
273 }
274
275 pl.done();
276
277 Ok(())
278 }
279
280 #[inline]
281 fn push_node_with_kept_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
282 let mut dependencies = self.graph.successors(node).into_iter();
283
284 let first_dependency_label = (&mut dependencies)
286 .flat_map(|dependency| self.labels.get(dependency).map(ToOwned::to_owned))
287 .next();
288
289 let label: Option<<MR::Label as ToOwned>::Owned> = match first_dependency_label {
291 Some(label) => {
292 self.map_reducer.reduce(
293 label,
294 dependencies.flat_map(|dep|
295 self.labels.get(dep)),
297 )?
298 }
299 None => {
300 assert!(
301 dependencies.next().is_none(),
302 "first_dependency_label is None, but not all dependencies were consumed"
303 );
304 None
305 }
306 };
307
308 let label = match label {
310 Some(label) => self.map_reducer.map_reduce(node, label)?,
311 None => self.map_reducer.map(node)?,
312 };
313
314 self.map_reducer
315 .on_node_traversed(node, label.as_ref().map(|l| l.borrow()))?;
316 if let Some(label) = label {
317 let previous_label = self.labels.insert(node, label);
318 assert!(previous_label.is_none(), "{node} was labeled twice");
319 }
320
321 Ok(())
322 }
323
324 fn run_in_topological_order_with_popped_labels(
327 &mut self,
328 nodes: impl Iterator<Item = NodeId>,
329 ) -> Result<(), MR::Error> {
330 let mut pl = progress_logger!(
331 display_memory = true,
332 item_name = "node",
333 local_speed = true,
334 expected_updates = Some(self.num_nodes),
335 );
336
337 pl.start("Traversing graph in topological order...");
338
339 for node in nodes {
340 pl.light_update();
341
342 self.push_node_with_popped_labels(node)?;
343 }
344
345 pl.done();
346
347 debug_assert!(
348 self.labels.is_empty(),
349 "run_in_topological_order_with_popped_labels ended without clearing its labels store"
350 );
351
352 Ok(())
353 }
354
355 #[inline]
356 fn push_node_with_popped_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
357 let num_dependents = self.graph.indegree(node);
358
359 if num_dependents > 0 {
360 self.pending_dependents.insert(node, num_dependents);
361 }
362
363 let mut dependencies = self.graph.successors(node).into_iter();
364
365 let mut merged_label: Option<<MR::Label as ToOwned>::Owned> = None;
366
367 while let Some(first_dependency) = dependencies.next() {
368 let first_dependency_label = if self.pending_dependents.get(&first_dependency)
370 == Some(&1)
371 {
372 self.pending_dependents.remove(&first_dependency);
378 self.labels.remove(first_dependency)
379 } else {
380 let pending_dependants = self.pending_dependents.get_mut(&first_dependency).unwrap_or_else(|| panic!("Node {node} depends on node {first_dependency} but the latter's label was not computed (yet?). Check the topological order is complete."));
383 *pending_dependants -= 1;
384 self.labels.get(first_dependency).map(
385 |l: &MR::Label| -> <MR::Label as std::borrow::ToOwned>::Owned { l.to_owned() },
386 )
387 };
388
389 if let Some(first_dependency_label) = first_dependency_label {
391 let mut dependencies_to_remove = SmallVec::<[_; 1]>::new();
392 merged_label = self.map_reducer.reduce(
393 first_dependency_label,
394 dependencies.flat_map(|dep| {
395 *self.pending_dependents.get_mut(&dep).unwrap() -= 1;
396 if *self.pending_dependents.get(&dep).unwrap() == 0 {
397 dependencies_to_remove.push(dep);
398 }
399 self.labels.get(dep)
401 }),
402 )?;
403
404 for dep in dependencies_to_remove {
406 self.pending_dependents.remove(&dep);
407 self.labels.remove(dep);
408 }
409 break;
410 }
411 }
412
413 let label = match merged_label {
414 Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
415 None => self.map_reducer.map(node)?,
416 };
417 self.map_reducer
418 .on_node_traversed(node, label.as_ref().map(Borrow::borrow))?;
419 if num_dependents > 0 {
420 if let Some(label) = label {
421 let previous_label = self.labels.insert(node, label);
422 assert!(previous_label.is_none(), "{node} was labeled twice");
423 }
424 } else {
425 assert!(
426 !self.labels.contains_key(node),
427 "{node} was already labeled"
428 );
429 }
430
431 Ok(())
432 }
433}