Skip to main content

swh_graph_stdlib/
labeling.rs

1// Copyright (C) 2026  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Associate information to a subset of nodes
7
8use std::collections::HashMap;
9
10use dsi_progress_logger::{ProgressLog, progress_logger};
11use rapidhash::RapidHashMap;
12use swh_graph::graph::*;
13use swh_graph::graph::{NodeId, SwhForwardGraph};
14
15/// Callbacks for [`MapReduce`]
16pub trait MapReducer {
17    type Label: Clone;
18    type Error;
19
20    /// Returns the label to assign to the given node, independently of its successors, if any
21    fn map(&mut self, node: NodeId) -> Result<Option<Self::Label>, Self::Error>;
22
23    /// Given the labels of the children of a node, merge them into a single label.
24    ///
25    /// Not guaranteed to be called for every node.
26    fn reduce<'a, I: Iterator<Item = &'a Self::Label>>(
27        &mut self,
28        first_label: Self::Label,
29        other_labels: I,
30    ) -> Result<Option<Self::Label>, Self::Error>
31    where
32        Self::Label: 'a;
33
34    /// Special-case of [`Self::reduce`] for merging a node's label with its successors' label
35    ///
36    /// Defaults to calling [`Self::reduce`].
37    ///
38    /// Not guaranteed to be called for every node.
39    fn map_reduce(
40        &mut self,
41        node: NodeId,
42        successors_label: Self::Label,
43    ) -> Result<Option<Self::Label>, Self::Error> {
44        match self.map(node)? {
45            Some(own_label) => self.reduce(own_label, [&successors_label].into_iter()),
46            None => Ok(Some(successors_label)),
47        }
48    }
49
50    /// Called once a node's initial label was computed and merged with its successors'
51    ///
52    /// Defaults to no-op.
53    fn on_node_traversed(
54        &mut self,
55        _node: NodeId,
56        _label: Option<&Self::Label>,
57    ) -> Result<(), Self::Error> {
58        Ok(())
59    }
60}
61
62/// Associates labels to nodes in the graph using successor nodes' labels and "bubbling up"
63///
64/// Use [`swh_graph::views::Subgraph`] to select the set of nodes to run this on.
65/// For example, to avoid content and directory nodes (which are typically much slower to process),
66/// use `Subgraph::with_node_constraint("rev,rel,snp,ori".parse().unwrap())`.
67///
68/// # Example
69///
70/// For example, with this graph:
71///
72/// ```text
73///      - 3
74///     /
75///   <-
76/// 1 <--- 4 <--+
77///             +--- 6
78/// 2 <--- 5 <--+
79/// ```
80///
81/// We call 1 a successor of 3, consistent with swh-graph's terminology, even though MapReducer
82/// propagates labels in the other direction.
83///
84/// we would:
85///
86/// * compute label of 1
87/// * compute label of 2
88/// * compute label of 2 and merge it with 1's
89/// * compute label of 4 and merge it with 1's
90/// * compute label of 5 and merge it with 2's
91/// * compute label of 6 and merge it with 4's and 5's
92pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer> {
93    graph: G,
94    num_nodes: usize,
95    pub map_reducer: MR,
96}
97
98impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer> MapReduce<G, MR> {
99    pub fn new(graph: G, map_reducer: MR) -> Self {
100        MapReduce {
101            num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
102            graph,
103            map_reducer,
104        }
105    }
106
107    pub fn with_num_nodes(mut self, num_nodes: usize) -> Self {
108        self.num_nodes = num_nodes;
109        self
110    }
111
112    pub fn run_in_topological_order(
113        &mut self,
114        nodes: impl Iterator<Item = NodeId>,
115    ) -> Result<(), MR::Error> {
116        let mut labels: RapidHashMap<NodeId, MR::Label> = RapidHashMap::default();
117
118        // For each node it, counts its number of direct dependents that still need to be handled
119        let mut pending_dependents = HashMap::<NodeId, usize>::new();
120
121        let mut pl = progress_logger!(
122            display_memory = true,
123            item_name = "node",
124            local_speed = true,
125            expected_updates = Some(self.num_nodes),
126        );
127
128        pl.start("Traversing graph");
129
130        for node in nodes {
131            pl.light_update();
132            let num_dependents = self.graph.indegree(node);
133
134            if num_dependents > 0 {
135                pending_dependents.insert(node, num_dependents);
136            }
137
138            let mut dependencies = self.graph.successors(node).into_iter();
139
140            let mut merged_label: Option<MR::Label> = None;
141
142            while let Some(first_dependency) = dependencies.next() {
143                // Get label of the first dependency that has a label
144                let first_dependency_label =
145                    if pending_dependents.get(&first_dependency) == Some(&1) {
146                        // Reuse the dependency's set of contributors.
147                        //
148                        // This saves a potentially expensive clone in the tight loop.
149                        // When working with the revision graph, this branch is almost always taken
150                        // because most revisions have a single parent (ie. single dependency)
151                        pending_dependents.remove(&first_dependency);
152                        labels.remove(&first_dependency)
153                    } else {
154                        // Dependency is not yet ready to be popped because it has other dependents
155                        // to be visited.  Copy its contributor set
156                        *pending_dependents.get_mut(&first_dependency).unwrap() -= 1;
157                        labels.get(&first_dependency).cloned()
158                    };
159
160                // Merge it with all the others
161                if let Some(first_dependency_label) = first_dependency_label {
162                    merged_label = self.map_reducer.reduce(
163                        first_dependency_label,
164                        dependencies.flat_map(|dep|
165                            // If 'node' is a revision, then 'dep' is its parent revision
166                            labels.get(&dep)),
167                    )?;
168                    break;
169                }
170            }
171
172            let label = match merged_label {
173                Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
174                None => self.map_reducer.map(node)?,
175            };
176            self.map_reducer.on_node_traversed(node, label.as_ref())?;
177            if num_dependents > 0 {
178                if let Some(label) = label {
179                    let previous_label = labels.insert(node, label);
180                    assert!(previous_label.is_none(), "{node} was labeled twice");
181                }
182            } else {
183                assert!(!labels.contains_key(&node), "{node} was already labeled");
184            }
185        }
186
187        pl.done();
188
189        Ok(())
190    }
191}