Skip to main content

swh_graph_stdlib/labeling/
map_reduce.rs

1// Copyright (C) 2026  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Implements labels propagation through a topological sort
7
8use std::borrow::Borrow;
9
10use anyhow::{Result, bail, ensure};
11use dsi_progress_logger::{ProgressLog, progress_logger};
12use rapidhash::RapidHashMap;
13use smallvec::SmallVec;
14use swh_graph::graph::*;
15use swh_graph::graph::{NodeId, SwhForwardGraph};
16
17use super::MapReducer;
18use super::labels::{
19    DenseLabels, Labels, SparseLabels, StridableLabel, StriddenLabels, StriddenLabelsConfig,
20};
21
22/// Builder for [`MapReduce`]
23pub struct MapReduceBuilder<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels> {
24    graph: G,
25    num_nodes: usize,
26    cheap_clones: bool,
27    keep_labels: bool,
28    pub map_reducer: MR,
29    labels_config: L::Config,
30}
31
32impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Sized>>
33    MapReduceBuilder<G, MR, SparseLabels<MR::Label>>
34{
35    /// Stores labels in a HashMap. This is the best when labeling the history-hosting layer
36    ///
37    /// This improves memory usage at the expense of runtime and CPU use.
38    pub fn new_sparse(graph: G, map_reducer: MR) -> Self {
39        MapReduceBuilder {
40            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
41            graph,
42            cheap_clones: false,
43            keep_labels: false,
44            map_reducer,
45            labels_config: (),
46        }
47    }
48}
49
50impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Default + Clone + Sized>>
51    MapReduceBuilder<G, MR, DenseLabels<MR::Label>>
52{
53    /// Stores labels in an array instead of a HashMap. This is the best when labeling the
54    /// directory layer or the whole graph.
55    ///
56    /// This improves runtime and CPU use at the expense of memory.
57    ///
58    /// This should probably be used only if:
59    ///
60    /// * labels are small, or
61    /// * computed labels are sparse (wrt. `graph.num_nodes()`)
62    ///   **and** `None::<MR::Label>` is small
63    pub fn new_dense(graph: G, map_reducer: MR) -> Self {
64        MapReduceBuilder {
65            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
66            graph,
67            cheap_clones: false,
68            keep_labels: false,
69            map_reducer,
70            labels_config: (),
71        }
72    }
73}
74
75impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: StridableLabel>>
76    MapReduceBuilder<G, MR, StriddenLabels<MR::Label>>
77{
78    /// Specialized variant of [`new_labels`](Self::new_dense) for non-[`Sized`](Sized) labels.
79    ///
80    /// All labels must have the same length, but it can be computed at runtime.
81    ///
82    /// Like [`Self::new_dense`], this improves runtime and CPU use at the expense of memory,
83    /// but less than boxing the values would.
84    ///
85    /// This should only be used if labels are small.
86    ///
87    /// `num_words` is the length of the `[Label::Word]` slice needed to store a label.
88    pub fn new_stridden(graph: G, map_reducer: MR, num_words: usize) -> Self
89    where
90        MR::Label: StridableLabel + ToOwned,
91    {
92        MapReduceBuilder {
93            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
94            graph,
95            cheap_clones: false,
96            keep_labels: false,
97            map_reducer,
98            labels_config: StriddenLabelsConfig { num_words },
99        }
100    }
101}
102
103impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
104    MapReduceBuilder<G, MR, L>
105{
106    pub fn num_nodes(mut self, num_nodes: usize) -> Self {
107        self.num_nodes = num_nodes;
108        self
109    }
110
111    /// Tunes the algorithm to assume labels are cheap to clone.
112    ///
113    /// This is probably `true` if and only if the labels implement Copy.
114    ///
115    /// Setting this to `false` (the default) does not imply they are expensive to move.
116    ///
117    /// Defaults to `false`.
118    pub fn cheap_clones(mut self, cheap_clones: bool) -> Self {
119        self.cheap_clones = cheap_clones;
120        self
121    }
122
123    /// Whether the algorithm should keep labels in its store in order to return them at the end
124    ///
125    /// This is incompatible with `cheap_clones(false)` (the default).
126    ///
127    /// This consumes extra memory, except when `with_labels_array()` and labels do not contain
128    /// heap-allocated data.
129    ///
130    /// Defaults to `false`.
131    pub fn keep_labels(mut self, keep_labels: bool) -> Self {
132        self.keep_labels = keep_labels;
133        self
134    }
135
136    pub fn build(self) -> Result<MapReduce<G, MR, L>> {
137        let Self {
138            graph,
139            num_nodes,
140            cheap_clones,
141            keep_labels,
142            map_reducer,
143            labels_config,
144        } = self;
145        let pop_labels = match (cheap_clones, keep_labels) {
146            (true, true) => false, // cheap to clone and we need to keep them
147            (true, false) => true, // cheap to clone, but popping saves memory
148            (false, true) => bail!("MapReduce cannot both keep labels and avoid expensive clones"),
149            (false, false) => true, // expensive to clone, so popping saves a clone
150        };
151        log::info!("Allocating labels...");
152        let labels = L::new(graph.num_nodes(), labels_config);
153        Ok(MapReduce {
154            graph,
155            num_nodes,
156            pop_labels,
157            map_reducer,
158            labels,
159            pending_dependents: Default::default(),
160        })
161    }
162}
163
164/// Associates labels to nodes in the graph using successor nodes' labels and "bubbling up"
165///
166/// Use [`swh_graph::views::Subgraph`] to select the set of nodes to run this on.
167/// For example, to avoid content and directory nodes (which are typically much slower to process),
168/// use `Subgraph::with_node_constraint("rev,rel,snp,ori".parse().unwrap())`.
169///
170/// Built from [`MapReduceBuilder`]
171///
172/// # Example
173///
174/// For example, with this graph:
175///
176/// ```text
177///      - 3
178///     /
179///   <-
180/// 1 <--- 4 <--+
181///             +--- 6
182/// 2 <--- 5 <--+
183/// ```
184///
185/// We call 1 a successor of 3, consistent with swh-graph's terminology, even though MapReducer
186/// propagates labels in the other direction.
187///
188/// we would:
189///
190/// * compute label of 1
191/// * compute label of 2
192/// * compute label of 2 and merge it with 1's
193/// * compute label of 4 and merge it with 1's
194/// * compute label of 5 and merge it with 2's
195/// * compute label of 6 and merge it with 4's and 5's
196pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L> {
197    graph: G,
198    num_nodes: usize,
199    pop_labels: bool,
200    pub map_reducer: MR,
201    labels: L,
202    /// For each node, counts its number of direct dependents that still need to be handled.
203    ///
204    /// Unused if pop_labels is false
205    pending_dependents: RapidHashMap<NodeId, usize>,
206}
207
208impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
209    MapReduce<G, MR, L>
210{
211    /// Runs the configured [`MapReducer`] sequentially on all nodes in the graph.
212    ///
213    /// `nodes` must be an iterator of all nodes in topological order,
214    /// eg. returned by [`GenerationsReader::iter_nodes`](https://docs.rs/swh_graph_topology/latest/swh_graph_topology/generations/struct.GenerationsReader.html#method.iter_nodes)
215    /// with `.map(|(_depth, node)| node`).
216    pub fn run_in_topological_order(
217        &mut self,
218        nodes: impl Iterator<Item = NodeId>,
219    ) -> Result<(), MR::Error> {
220        if self.pop_labels {
221            self.run_in_topological_order_with_popped_labels(nodes)
222        } else {
223            self.run_in_topological_order_with_kept_labels(nodes)
224        }
225    }
226
227    /// A single step of [`Self::run_in_topological_order`], in case the caller does not have an
228    /// iterator of nodes
229    pub fn push_node(&mut self, node: NodeId) -> Result<(), MR::Error> {
230        if self.pop_labels {
231            self.push_node_with_popped_labels(node)
232        } else {
233            self.push_node_with_kept_labels(node)
234        }
235    }
236
237    /// Returns every node's labels, if `MapReduceBuilder::keep_labels` was set to true.
238    pub fn labels(&self) -> Result<&L> {
239        ensure!(
240            !self.pop_labels,
241            "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
242        );
243        Ok(&self.labels)
244    }
245
246    /// Returns every node's labels, if `MapReduceBuilder::keep_labels` was set to true.
247    pub fn take_labels(self) -> Result<L> {
248        ensure!(
249            !self.pop_labels,
250            "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
251        );
252        Ok(self.labels)
253    }
254
255    /// Implementation of [`run_in_topological_order`] optimized for labels that are cheap to clone
256    /// or move
257    fn run_in_topological_order_with_kept_labels(
258        &mut self,
259        nodes: impl Iterator<Item = NodeId>,
260    ) -> Result<(), MR::Error> {
261        let mut pl = progress_logger!(
262            display_memory = true,
263            item_name = "node",
264            local_speed = true,
265            expected_updates = Some(self.num_nodes),
266        );
267
268        pl.start("Traversing graph in topological order...");
269
270        for node in nodes {
271            pl.light_update();
272
273            self.push_node_with_kept_labels(node)?;
274        }
275
276        pl.done();
277
278        Ok(())
279    }
280
281    #[inline]
282    fn push_node_with_kept_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
283        let mut dependencies = self.graph.successors(node).into_iter();
284
285        // get label of first dependencies
286        let first_dependency_label = (&mut dependencies)
287            .flat_map(|dependency| self.labels.get(dependency).map(ToOwned::to_owned))
288            .next();
289
290        // Merge other dependencies' labels with it
291        let label: Option<<MR::Label as ToOwned>::Owned> = match first_dependency_label {
292            Some(label) => {
293                self.map_reducer.reduce(
294                    label,
295                    dependencies.flat_map(|dep|
296                            // If 'node' is a revision, then 'dep' is its parent revision
297                            self.labels.get(dep)),
298                )?
299            }
300            None => {
301                assert!(
302                    dependencies.next().is_none(),
303                    "first_dependency_label is None, but not all dependencies were consumed"
304                );
305                None
306            }
307        };
308
309        // Merge this node's label with them
310        let label = match label {
311            Some(label) => self.map_reducer.map_reduce(node, label)?,
312            None => self.map_reducer.map(node)?,
313        };
314
315        self.map_reducer
316            .on_node_traversed(node, label.as_ref().map(|l| l.borrow()))?;
317        if let Some(label) = label {
318            let previous_label = self.labels.insert(node, label);
319            assert!(previous_label.is_none(), "{node} was labeled twice");
320        }
321
322        Ok(())
323    }
324
325    /// Implementation of [`run_in_topological_order`] optimized for labels that are expensive to
326    /// clone but cheap to move
327    fn run_in_topological_order_with_popped_labels(
328        &mut self,
329        nodes: impl Iterator<Item = NodeId>,
330    ) -> Result<(), MR::Error> {
331        let mut pl = progress_logger!(
332            display_memory = true,
333            item_name = "node",
334            local_speed = true,
335            expected_updates = Some(self.num_nodes),
336        );
337
338        pl.start("Traversing graph in topological order...");
339
340        for node in nodes {
341            pl.light_update();
342
343            self.push_node_with_popped_labels(node)?;
344        }
345
346        pl.done();
347
348        debug_assert!(
349            self.labels.is_empty(),
350            "run_in_topological_order_with_popped_labels ended without clearing its labels store"
351        );
352
353        Ok(())
354    }
355
356    #[inline]
357    fn push_node_with_popped_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
358        let num_dependents = self.graph.indegree(node);
359
360        if num_dependents > 0 {
361            self.pending_dependents.insert(node, num_dependents);
362        }
363
364        let mut dependencies = self.graph.successors(node).into_iter();
365
366        let mut merged_label: Option<<MR::Label as ToOwned>::Owned> = None;
367
368        while let Some(first_dependency) = dependencies.next() {
369            // Get label of the first dependency that has a label
370            let first_dependency_label = if self.pending_dependents.get(&first_dependency)
371                == Some(&1)
372            {
373                // Reuse the dependency's label.
374                //
375                // This saves a potentially expensive clone in the tight loop.
376                // When working with the revision graph, this branch is almost always taken
377                // because most revisions have a single parent (ie. single dependency)
378                self.pending_dependents.remove(&first_dependency);
379                self.labels.remove(first_dependency)
380            } else {
381                // Dependency is not yet ready to be popped because it has other dependents
382                // to be visited.  Copy its label
383                let pending_dependants = self.pending_dependents.get_mut(&first_dependency).unwrap_or_else(|| panic!("Node {node} depends on node {first_dependency} but the latter's label was not computed (yet?). Check the topological order is complete."));
384                *pending_dependants -= 1;
385                self.labels.get(first_dependency).map(
386                    |l: &MR::Label| -> <MR::Label as std::borrow::ToOwned>::Owned { l.to_owned() },
387                )
388            };
389
390            // Merge it with all the others
391            if let Some(first_dependency_label) = first_dependency_label {
392                let mut dependencies_to_remove = SmallVec::<[_; 1]>::new();
393                merged_label = self.map_reducer.reduce(
394                    first_dependency_label,
395                    dependencies.flat_map(|dep| {
396                        *self.pending_dependents.get_mut(&dep).unwrap() -= 1;
397                        if *self.pending_dependents.get(&dep).unwrap() == 0 {
398                            dependencies_to_remove.push(dep);
399                        }
400                        // If 'node' is a revision, then 'dep' is its parent revision
401                        self.labels.get(dep)
402                    }),
403                )?;
404
405                // Clean up deps that have no more pending dependents
406                for dep in dependencies_to_remove {
407                    self.pending_dependents.remove(&dep);
408                    self.labels.remove(dep);
409                }
410                break;
411            }
412        }
413
414        let label = match merged_label {
415            Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
416            None => self.map_reducer.map(node)?,
417        };
418        self.map_reducer
419            .on_node_traversed(node, label.as_ref().map(Borrow::borrow))?;
420        if num_dependents > 0 {
421            if let Some(label) = label {
422                let previous_label = self.labels.insert(node, label);
423                assert!(previous_label.is_none(), "{node} was labeled twice");
424            }
425        } else {
426            assert!(
427                !self.labels.contains_key(node),
428                "{node} was already labeled"
429            );
430        }
431
432        Ok(())
433    }
434}