Skip to main content

swh_graph_stdlib/labeling/
map_reduce.rs

1// Copyright (C) 2026  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Implements labels propagation through a topological sort
7
8use std::borrow::Borrow;
9
10use anyhow::{Result, bail, ensure};
11use dsi_progress_logger::{ProgressLog, progress_logger};
12use rapidhash::RapidHashMap;
13use smallvec::SmallVec;
14use swh_graph::graph::*;
15use swh_graph::graph::{NodeId, SwhForwardGraph};
16
17use super::MapReducer;
18use super::labels::{
19    DenseLabels, Labels, SparseLabels, StridableLabel, StriddenLabels, StriddenLabelsConfig,
20};
21
22/// Builder for [`MapReduce`]
23pub struct MapReduceBuilder<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels> {
24    graph: G,
25    num_nodes: usize,
26    cheap_clones: bool,
27    keep_labels: bool,
28    pub map_reducer: MR,
29    labels_config: L::Config,
30}
31
32impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Sized>>
33    MapReduceBuilder<G, MR, SparseLabels<MR::Label>>
34{
35    /// Stores labels in a HashMap. This is the best when labeling the history-hosting layer
36    ///
37    /// This improves memory usage at the expense of runtime and CPU use.
38    pub fn new_sparse(graph: G, map_reducer: MR) -> Self {
39        MapReduceBuilder {
40            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
41            graph,
42            cheap_clones: false,
43            keep_labels: false,
44            map_reducer,
45            labels_config: (),
46        }
47    }
48}
49
50impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: Default + Clone + Sized>>
51    MapReduceBuilder<G, MR, DenseLabels<MR::Label>>
52{
53    /// Stores labels in an array instead of a HashMap. This is the best when labeling the
54    /// directory layer or the whole graph.
55    ///
56    /// This improves runtime and CPU use at the expense of memory.
57    ///
58    /// This should probably be used only if:
59    ///
60    /// * labels are small, or
61    /// * computed labels are sparse (wrt. `graph.num_nodes()`)
62    ///   **and** `None::<MR::Label>` is small
63    pub fn new_dense(graph: G, map_reducer: MR) -> Self {
64        MapReduceBuilder {
65            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
66            graph,
67            cheap_clones: false,
68            keep_labels: false,
69            map_reducer,
70            labels_config: (),
71        }
72    }
73}
74
75impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer<Label: StridableLabel>>
76    MapReduceBuilder<G, MR, StriddenLabels<MR::Label>>
77{
78    /// Specialized variant of [`new_labels`](Self::new_dense) for non-[`Sized`](Sized) labels.
79    ///
80    /// All labels must have the same length, but it can be computed at runtime.
81    ///
82    /// Like [`Self::new_dense`], this improves runtime and CPU use at the expense of memory,
83    /// but less than boxing the values would.
84    ///
85    /// This should only be used if labels are small.
86    ///
87    /// `num_words` is the length of the `[Label::Word]` slice needed to store a label.
88    pub fn new_stridden(graph: G, map_reducer: MR, num_words: usize) -> Self
89    where
90        MR::Label: StridableLabel + ToOwned,
91    {
92        MapReduceBuilder {
93            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
94            graph,
95            cheap_clones: false,
96            keep_labels: false,
97            map_reducer,
98            labels_config: StriddenLabelsConfig { num_words },
99        }
100    }
101}
102
103impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
104    MapReduceBuilder<G, MR, L>
105{
106    pub fn num_nodes(mut self, num_nodes: usize) -> Self {
107        self.num_nodes = num_nodes;
108        self
109    }
110
111    /// Tunes the algorithm to assume labels are cheap to clone.
112    ///
113    /// This is probably `true` if and only if the labels implement Copy.
114    /// Setting this to `true` does not imply they are cheap to move.
115    ///
116    /// Defaults to `false`.
117    pub fn cheap_clones(mut self, cheap_clones: bool) -> Self {
118        self.cheap_clones = cheap_clones;
119        self
120    }
121
122    /// Whether the algorithm should keep labels in its store in order to return them at the end
123    ///
124    /// This is incompatible with `cheap_clones(false)` (the default).
125    ///
126    /// This consumes extra memory, except when `with_labels_array()` and labels do not contain
127    /// heap-allocated data.
128    ///
129    /// Defaults to `false`.
130    pub fn keep_labels(mut self, keep_labels: bool) -> Self {
131        self.keep_labels = keep_labels;
132        self
133    }
134
135    pub fn build(self) -> Result<MapReduce<G, MR, L>> {
136        let Self {
137            graph,
138            num_nodes,
139            cheap_clones,
140            keep_labels,
141            map_reducer,
142            labels_config,
143        } = self;
144        let pop_labels = match (cheap_clones, keep_labels) {
145            (true, true) => false, // cheap to clone and we need to keep them
146            (true, false) => true, // cheap to clone, but popping saves memory
147            (false, true) => bail!("MapReduce cannot both keep labels and avoid expensive clones"),
148            (false, false) => true, // expensive to clone, so popping saves a clone
149        };
150        log::info!("Allocating labels...");
151        let labels = L::new(graph.num_nodes(), labels_config);
152        Ok(MapReduce {
153            graph,
154            num_nodes,
155            pop_labels,
156            map_reducer,
157            labels,
158            pending_dependents: Default::default(),
159        })
160    }
161}
162
163/// Associates labels to nodes in the graph using successor nodes' labels and "bubbling up"
164///
165/// Use [`swh_graph::views::Subgraph`] to select the set of nodes to run this on.
166/// For example, to avoid content and directory nodes (which are typically much slower to process),
167/// use `Subgraph::with_node_constraint("rev,rel,snp,ori".parse().unwrap())`.
168///
169/// Built from [`MapReduceBuilder`]
170///
171/// # Example
172///
173/// For example, with this graph:
174///
175/// ```text
176///      - 3
177///     /
178///   <-
179/// 1 <--- 4 <--+
180///             +--- 6
181/// 2 <--- 5 <--+
182/// ```
183///
184/// We call 1 a successor of 3, consistent with swh-graph's terminology, even though MapReducer
185/// propagates labels in the other direction.
186///
187/// we would:
188///
189/// * compute label of 1
190/// * compute label of 2
191/// * compute label of 2 and merge it with 1's
192/// * compute label of 4 and merge it with 1's
193/// * compute label of 5 and merge it with 2's
194/// * compute label of 6 and merge it with 4's and 5's
195pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L> {
196    graph: G,
197    num_nodes: usize,
198    pop_labels: bool,
199    pub map_reducer: MR,
200    labels: L,
201    /// For each node, counts its number of direct dependents that still need to be handled.
202    ///
203    /// Unused if pop_labels is false
204    pending_dependents: RapidHashMap<NodeId, usize>,
205}
206
207impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer, L: Labels<Label = MR::Label>>
208    MapReduce<G, MR, L>
209{
210    /// Runs the configured [`MapReducer`] sequentially on all nodes in the graph.
211    ///
212    /// `nodes` must be an iterator of all nodes in topological order,
213    /// eg. returned by [`GenerationsReader::iter_nodes`](https://docs.rs/swh_graph_topology/latest/swh_graph_topology/generations/struct.GenerationsReader.html#method.iter_nodes)
214    /// with `.map(|(_depth, node)| node`).
215    pub fn run_in_topological_order(
216        &mut self,
217        nodes: impl Iterator<Item = NodeId>,
218    ) -> Result<(), MR::Error> {
219        if self.pop_labels {
220            self.run_in_topological_order_with_popped_labels(nodes)
221        } else {
222            self.run_in_topological_order_with_kept_labels(nodes)
223        }
224    }
225
226    /// A single step of [`Self::run_in_topological_order`], in case the caller does not have an
227    /// iterator of nodes
228    pub fn push_node(&mut self, node: NodeId) -> Result<(), MR::Error> {
229        if self.pop_labels {
230            self.push_node_with_popped_labels(node)
231        } else {
232            self.push_node_with_kept_labels(node)
233        }
234    }
235
236    /// Returns every node's labels, if `MapReduceBuilder::keep_labels` was set to true.
237    pub fn labels(&self) -> Result<&L> {
238        ensure!(
239            !self.pop_labels,
240            "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
241        );
242        Ok(&self.labels)
243    }
244
245    /// Returns every node's labels, if `MapReduceBuilder::keep_labels` was set to true.
246    pub fn take_labels(self) -> Result<L> {
247        ensure!(
248            !self.pop_labels,
249            "MapReducer::labels() is not available as MapReduceBuilder::keep_labels() was not set to true"
250        );
251        Ok(self.labels)
252    }
253
254    /// Implementation of [`run_in_topological_order`] optimized for labels that are cheap to clone
255    /// or move
256    fn run_in_topological_order_with_kept_labels(
257        &mut self,
258        nodes: impl Iterator<Item = NodeId>,
259    ) -> Result<(), MR::Error> {
260        let mut pl = progress_logger!(
261            display_memory = true,
262            item_name = "node",
263            local_speed = true,
264            expected_updates = Some(self.num_nodes),
265        );
266
267        pl.start("Traversing graph in topological order...");
268
269        for node in nodes {
270            pl.light_update();
271
272            self.push_node_with_kept_labels(node)?;
273        }
274
275        pl.done();
276
277        Ok(())
278    }
279
280    #[inline]
281    fn push_node_with_kept_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
282        let mut dependencies = self.graph.successors(node).into_iter();
283
284        // get label of first dependencies
285        let first_dependency_label = (&mut dependencies)
286            .flat_map(|dependency| self.labels.get(dependency).map(ToOwned::to_owned))
287            .next();
288
289        // Merge other dependencies' labels with it
290        let label: Option<<MR::Label as ToOwned>::Owned> = match first_dependency_label {
291            Some(label) => {
292                self.map_reducer.reduce(
293                    label,
294                    dependencies.flat_map(|dep|
295                            // If 'node' is a revision, then 'dep' is its parent revision
296                            self.labels.get(dep)),
297                )?
298            }
299            None => {
300                assert!(
301                    dependencies.next().is_none(),
302                    "first_dependency_label is None, but not all dependencies were consumed"
303                );
304                None
305            }
306        };
307
308        // Merge this node's label with them
309        let label = match label {
310            Some(label) => self.map_reducer.map_reduce(node, label)?,
311            None => self.map_reducer.map(node)?,
312        };
313
314        self.map_reducer
315            .on_node_traversed(node, label.as_ref().map(|l| l.borrow()))?;
316        if let Some(label) = label {
317            let previous_label = self.labels.insert(node, label);
318            assert!(previous_label.is_none(), "{node} was labeled twice");
319        }
320
321        Ok(())
322    }
323
324    /// Implementation of [`run_in_topological_order`] optimized for labels that are expensive to
325    /// clone but cheap to move
326    fn run_in_topological_order_with_popped_labels(
327        &mut self,
328        nodes: impl Iterator<Item = NodeId>,
329    ) -> Result<(), MR::Error> {
330        let mut pl = progress_logger!(
331            display_memory = true,
332            item_name = "node",
333            local_speed = true,
334            expected_updates = Some(self.num_nodes),
335        );
336
337        pl.start("Traversing graph in topological order...");
338
339        for node in nodes {
340            pl.light_update();
341
342            self.push_node_with_popped_labels(node)?;
343        }
344
345        pl.done();
346
347        debug_assert!(
348            self.labels.is_empty(),
349            "run_in_topological_order_with_popped_labels ended without clearing its labels store"
350        );
351
352        Ok(())
353    }
354
355    #[inline]
356    fn push_node_with_popped_labels(&mut self, node: NodeId) -> Result<(), MR::Error> {
357        let num_dependents = self.graph.indegree(node);
358
359        if num_dependents > 0 {
360            self.pending_dependents.insert(node, num_dependents);
361        }
362
363        let mut dependencies = self.graph.successors(node).into_iter();
364
365        let mut merged_label: Option<<MR::Label as ToOwned>::Owned> = None;
366
367        while let Some(first_dependency) = dependencies.next() {
368            // Get label of the first dependency that has a label
369            let first_dependency_label = if self.pending_dependents.get(&first_dependency)
370                == Some(&1)
371            {
372                // Reuse the dependency's set of contributors.
373                //
374                // This saves a potentially expensive clone in the tight loop.
375                // When working with the revision graph, this branch is almost always taken
376                // because most revisions have a single parent (ie. single dependency)
377                self.pending_dependents.remove(&first_dependency);
378                self.labels.remove(first_dependency)
379            } else {
380                // Dependency is not yet ready to be popped because it has other dependents
381                // to be visited.  Copy its contributor set
382                let pending_dependants = self.pending_dependents.get_mut(&first_dependency).unwrap_or_else(|| panic!("Node {node} depends on node {first_dependency} but the latter's label was not computed (yet?). Check the topological order is complete."));
383                *pending_dependants -= 1;
384                self.labels.get(first_dependency).map(
385                    |l: &MR::Label| -> <MR::Label as std::borrow::ToOwned>::Owned { l.to_owned() },
386                )
387            };
388
389            // Merge it with all the others
390            if let Some(first_dependency_label) = first_dependency_label {
391                let mut dependencies_to_remove = SmallVec::<[_; 1]>::new();
392                merged_label = self.map_reducer.reduce(
393                    first_dependency_label,
394                    dependencies.flat_map(|dep| {
395                        *self.pending_dependents.get_mut(&dep).unwrap() -= 1;
396                        if *self.pending_dependents.get(&dep).unwrap() == 0 {
397                            dependencies_to_remove.push(dep);
398                        }
399                        // If 'node' is a revision, then 'dep' is its parent revision
400                        self.labels.get(dep)
401                    }),
402                )?;
403
404                // Clean up deps that have no more pending dependents
405                for dep in dependencies_to_remove {
406                    self.pending_dependents.remove(&dep);
407                    self.labels.remove(dep);
408                }
409                break;
410            }
411        }
412
413        let label = match merged_label {
414            Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
415            None => self.map_reducer.map(node)?,
416        };
417        self.map_reducer
418            .on_node_traversed(node, label.as_ref().map(Borrow::borrow))?;
419        if num_dependents > 0 {
420            if let Some(label) = label {
421                let previous_label = self.labels.insert(node, label);
422                assert!(previous_label.is_none(), "{node} was labeled twice");
423            }
424        } else {
425            assert!(
426                !self.labels.contains_key(node),
427                "{node} was already labeled"
428            );
429        }
430
431        Ok(())
432    }
433}