swh_graph_stdlib/labeling/
map_reduce.rs1use std::borrow::Borrow;
9
10use anyhow::{Result, bail, ensure};
11use dsi_progress_logger::{ProgressLog, progress_logger};
12use rapidhash::RapidHashMap;
13use smallvec::SmallVec;
14use swh_graph::graph::*;
15use swh_graph::graph::{NodeId, SwhForwardGraph};
16
17use super::MapReducer;
18use super::labels::{
19 DenseLabels, Labels, SparseLabels, StridableLabel, StriddenLabels, StriddenLabelsConfig,
20};
21
22pub struct MapReduceBuilder<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels> {
24 graph: G,
25 num_nodes: usize,
26 cheap_clones: bool,
27 keep_labels: bool,
28 pub map_reducer: MR,
29 labels_config: L::Config,
30}
31
32impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Sized>>
33 MapReduceBuilder<G, MR, SparseLabels<MR::Label>>
34{
35 pub fn new_sparse(graph: G, map_reducer: MR) -> Self {
39 MapReduceBuilder {
40 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
41 graph,
42 cheap_clones: false,
43 keep_labels: false,
44 map_reducer,
45 labels_config: (),
46 }
47 }
48}
49
50impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Default + Clone + Sized>>
51 MapReduceBuilder<G, MR, DenseLabels<MR::Label>>
52{
53 pub fn new_dense(graph: G, map_reducer: MR) -> Self {
64 MapReduceBuilder {
65 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
66 graph,
67 cheap_clones: false,
68 keep_labels: false,
69 map_reducer,
70 labels_config: (),
71 }
72 }
73}
74
75impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: StridableLabel>>
76 MapReduceBuilder<G, MR, StriddenLabels<MR::Label>>
77{
78 pub fn new_stridden(graph: G, map_reducer: MR, num_words: usize) -> Self
89 where
90 MR::Label: StridableLabel + ToOwned,
91 {
92 MapReduceBuilder {
93 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
94 graph,
95 cheap_clones: false,
96 keep_labels: false,
97 map_reducer,
98 labels_config: StriddenLabelsConfig { num_words },
99 }
100 }
101}
102
103impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
104 MapReduceBuilder<G, MR, L>
105{
106 pub fn num_nodes(mut self, num_nodes: usize) -> Self {
107 self.num_nodes = num_nodes;
108 self
109 }
110
111 pub fn cheap_clones(mut self, cheap_clones: bool) -> Self {
119 self.cheap_clones = cheap_clones;
120 self
121 }
122
123 pub fn keep_labels(mut self, keep_labels: bool) -> Self {
132 self.keep_labels = keep_labels;
133 self
134 }
135
136 pub fn build(self) -> Result<MapReduce<G, MR, L>> {
137 let Self {
138 graph,
139 num_nodes,
140 cheap_clones,
141 keep_labels,
142 map_reducer,
143 labels_config,
144 } = self;
145 let pop_labels = match (cheap_clones, keep_labels) {
146 (true, true) => false, (true, false) => true, (false, true) => bail!("MapReduce cannot both keep labels and avoid expensive clones"),
149 (false, false) => true, };
151 log::info!("Allocating labels...");
152 let labels = L::new(graph.num_nodes(), labels_config);
153 Ok(MapReduce {
154 graph,
155 num_nodes,
156 pop_labels,
157 map_reducer,
158 labels,
159 pending_dependents: Default::default(),
160 })
161 }
162}
163
164pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L> {
197 graph: G,
198 num_nodes: usize,
199 pop_labels: bool,
200 pub map_reducer: MR,
201 labels: L,
202 pending_dependents: RapidHashMap<NodeId, usize>,
206}
207
208impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
209 MapReduce<G, MR, L>
210{
211 pub fn run_in_topological_order(
217 &mut self,
218 nodes: impl Iterator<Item = NodeId>,
219 ) -> Result<(), MR::Error> {
220 if self.pop_labels {
221 self.run_in_topological_order_with_popped_labels(nodes)
222 } else {
223 self.run_in_topological_order_with_kept_labels(nodes)
224 }
225 }
226
227 pub fn push_node(&mut self, node: NodeId) -> Result<(), MR::Error> {
230 if self.pop_labels {
231 self.push_node_with_popped_labels(node)
232 } else {
233 self.push_node_with_kept_labels(node)
234 }
235 }
236
237 pub fn labels(&self) -> Result<&L> {
239 ensure!(
240 !self.pop_labels,
241 "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
242 );
243 Ok(&self.labels)
244 }
245
246 pub fn take_labels(self) -> Result<L> {
248 ensure!(
249 !self.pop_labels,
250 "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
251 );
252 Ok(self.labels)
253 }
254
255 fn run_in_topological_order_with_kept_labels(
258 &mut self,
259 nodes: impl Iterator<Item = NodeId>,
260 ) -> Result<(), MR::Error> {
261 let mut pl = progress_logger!(
262 display_memory = true,
263 item_name = "node",
264 local_speed = true,
265 expected_updates = Some(self.num_nodes),
266 );
267
268 pl.start("Traversing graph in topological order...");
269
270 for node in nodes {
271 pl.light_update();
272
273 self.push_node_with_kept_labels(node)?;
274 }
275
276 pl.done();
277
278 Ok(())
279 }
280
281 #[inline]
282 fn push_node_with_kept_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
283 let mut dependencies = self.graph.successors(node).into_iter();
284
285 let first_dependency_label = (&mut dependencies)
287 .flat_map(|dependency| self.labels.get(dependency).map(ToOwned::to_owned))
288 .next();
289
290 let label: Option<<MR::Label as ToOwned>::Owned> = match first_dependency_label {
292 Some(label) => {
293 self.map_reducer.reduce(
294 label,
295 dependencies.flat_map(|dep|
296 self.labels.get(dep)),
298 )?
299 }
300 None => {
301 assert!(
302 dependencies.next().is_none(),
303 "first_dependency_label is None, but not all dependencies were consumed"
304 );
305 None
306 }
307 };
308
309 let label = match label {
311 Some(label) => self.map_reducer.map_reduce(node, label)?,
312 None => self.map_reducer.map(node)?,
313 };
314
315 self.map_reducer
316 .on_node_traversed(node, label.as_ref().map(|l| l.borrow()))?;
317 if let Some(label) = label {
318 let previous_label = self.labels.insert(node, label);
319 assert!(previous_label.is_none(), "{node} was labeled twice");
320 }
321
322 Ok(())
323 }
324
325 fn run_in_topological_order_with_popped_labels(
328 &mut self,
329 nodes: impl Iterator<Item = NodeId>,
330 ) -> Result<(), MR::Error> {
331 let mut pl = progress_logger!(
332 display_memory = true,
333 item_name = "node",
334 local_speed = true,
335 expected_updates = Some(self.num_nodes),
336 );
337
338 pl.start("Traversing graph in topological order...");
339
340 for node in nodes {
341 pl.light_update();
342
343 self.push_node_with_popped_labels(node)?;
344 }
345
346 pl.done();
347
348 debug_assert!(
349 self.labels.is_empty(),
350 "run_in_topological_order_with_popped_labels ended without clearing its labels store"
351 );
352
353 Ok(())
354 }
355
356 #[inline]
357 fn push_node_with_popped_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
358 let num_dependents = self.graph.indegree(node);
359
360 if num_dependents > 0 {
361 self.pending_dependents.insert(node, num_dependents);
362 }
363
364 let mut dependencies = self.graph.successors(node).into_iter();
365
366 let mut merged_label: Option<<MR::Label as ToOwned>::Owned> = None;
367
368 while let Some(first_dependency) = dependencies.next() {
369 let first_dependency_label = if self.pending_dependents.get(&first_dependency)
371 == Some(&1)
372 {
373 self.pending_dependents.remove(&first_dependency);
379 self.labels.remove(first_dependency)
380 } else {
381 let pending_dependants = self.pending_dependents.get_mut(&first_dependency).unwrap_or_else(|| panic!("Node {node} depends on node {first_dependency} but the latter's label was not computed (yet?). Check the topological order is complete."));
384 *pending_dependants -= 1;
385 self.labels.get(first_dependency).map(
386 |l: &MR::Label| -> <MR::Label as std::borrow::ToOwned>::Owned { l.to_owned() },
387 )
388 };
389
390 if let Some(first_dependency_label) = first_dependency_label {
392 let mut dependencies_to_remove = SmallVec::<[_; 1]>::new();
393 merged_label = self.map_reducer.reduce(
394 first_dependency_label,
395 dependencies.flat_map(|dep| {
396 *self.pending_dependents.get_mut(&dep).unwrap() -= 1;
397 if *self.pending_dependents.get(&dep).unwrap() == 0 {
398 dependencies_to_remove.push(dep);
399 }
400 self.labels.get(dep)
402 }),
403 )?;
404
405 for dep in dependencies_to_remove {
407 self.pending_dependents.remove(&dep);
408 self.labels.remove(dep);
409 }
410 break;
411 }
412 }
413
414 let label = match merged_label {
415 Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
416 None => self.map_reducer.map(node)?,
417 };
418 self.map_reducer
419 .on_node_traversed(node, label.as_ref().map(Borrow::borrow))?;
420 if num_dependents > 0 {
421 if let Some(label) = label {
422 let previous_label = self.labels.insert(node, label);
423 assert!(previous_label.is_none(), "{node} was labeled twice");
424 }
425 } else {
426 assert!(
427 !self.labels.contains_key(node),
428 "{node} was already labeled"
429 );
430 }
431
432 Ok(())
433 }
434}