swh_graph_stdlib/labeling.rs
1// Copyright (C) 2026 The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6//! Associate information to a subset of nodes
7
8use std::collections::HashMap;
9
10use dsi_progress_logger::{ProgressLog, progress_logger};
11use rapidhash::RapidHashMap;
12use swh_graph::graph::*;
13use swh_graph::graph::{NodeId, SwhForwardGraph};
14
15/// Callbacks for [`MapReduce`]
16pub trait MapReducer {
17 type Label: Clone;
18 type Error;
19
20 /// Returns the label to assign to the given node, independently of its successors, if any
21 fn map(&mut self, node: NodeId) -> Result<Option<Self::Label>, Self::Error>;
22
23 /// Given the labels of the children of a node, merge them into a single label.
24 ///
25 /// Not guaranteed to be called for every node.
26 fn reduce<'a, I: Iterator<Item = &'a Self::Label>>(
27 &mut self,
28 first_label: Self::Label,
29 other_labels: I,
30 ) -> Result<Option<Self::Label>, Self::Error>
31 where
32 Self::Label: 'a;
33
34 /// Special-case of [`Self::reduce`] for merging a node's label with its successors' label
35 ///
36 /// Defaults to calling [`Self::reduce`].
37 ///
38 /// Not guaranteed to be called for every node.
39 fn map_reduce(
40 &mut self,
41 node: NodeId,
42 successors_label: Self::Label,
43 ) -> Result<Option<Self::Label>, Self::Error> {
44 match self.map(node)? {
45 Some(own_label) => self.reduce(own_label, [&successors_label].into_iter()),
46 None => Ok(Some(successors_label)),
47 }
48 }
49
50 /// Called once a node's initial label was computed and merged with its successors'
51 ///
52 /// Defaults to no-op.
53 fn on_node_traversed(
54 &mut self,
55 _node: NodeId,
56 _label: Option<&Self::Label>,
57 ) -> Result<(), Self::Error> {
58 Ok(())
59 }
60}
61
62/// Associates labels to nodes in the graph using successor nodes' labels and "bubbling up"
63///
64/// Use [`swh_graph::views::Subgraph`] to select the set of nodes to run this on.
65/// For example, to avoid content and directory nodes (which are typically much slower to process),
66/// use `Subgraph::with_node_constraint("rev,rel,snp,ori".parse().unwrap())`.
67///
68/// # Example
69///
70/// For example, with this graph:
71///
72/// ```text
73/// - 3
74/// /
75/// <-
76/// 1 <--- 4 <--+
77/// +--- 6
78/// 2 <--- 5 <--+
79/// ```
80///
81/// We call 1 a successor of 3, consistent with swh-graph's terminology, even though MapReducer
82/// propagates labels in the other direction.
83///
84/// we would:
85///
86/// * compute label of 1
87/// * compute label of 2
88/// * compute label of 2 and merge it with 1's
89/// * compute label of 4 and merge it with 1's
90/// * compute label of 5 and merge it with 2's
91/// * compute label of 6 and merge it with 4's and 5's
92pub struct MapReduce<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer> {
93 graph: G,
94 num_nodes: usize,
95 pub map_reducer: MR,
96}
97
98impl<G: SwhForwardGraph + SwhBackwardGraph, MR: MapReducer> MapReduce<G, MR> {
99 pub fn new(graph: G, map_reducer: MR) -> Self {
100 MapReduce {
101 num_nodes: graph.actual_num_nodes().unwrap_or(graph.num_nodes()),
102 graph,
103 map_reducer,
104 }
105 }
106
107 pub fn with_num_nodes(mut self, num_nodes: usize) -> Self {
108 self.num_nodes = num_nodes;
109 self
110 }
111
112 pub fn run_in_topological_order(
113 &mut self,
114 nodes: impl Iterator<Item = NodeId>,
115 ) -> Result<(), MR::Error> {
116 let mut labels: RapidHashMap<NodeId, MR::Label> = RapidHashMap::default();
117
118 // For each node it, counts its number of direct dependents that still need to be handled
119 let mut pending_dependents = HashMap::<NodeId, usize>::new();
120
121 let mut pl = progress_logger!(
122 display_memory = true,
123 item_name = "node",
124 local_speed = true,
125 expected_updates = Some(self.num_nodes),
126 );
127
128 pl.start("Traversing graph");
129
130 for node in nodes {
131 pl.light_update();
132 let num_dependents = self.graph.indegree(node);
133
134 if num_dependents > 0 {
135 pending_dependents.insert(node, num_dependents);
136 }
137
138 let mut dependencies = self.graph.successors(node).into_iter();
139
140 let mut merged_label: Option<MR::Label> = None;
141
142 while let Some(first_dependency) = dependencies.next() {
143 // Get label of the first dependency that has a label
144 let first_dependency_label =
145 if pending_dependents.get(&first_dependency) == Some(&1) {
146 // Reuse the dependency's set of contributors.
147 //
148 // This saves a potentially expensive clone in the tight loop.
149 // When working with the revision graph, this branch is almost always taken
150 // because most revisions have a single parent (ie. single dependency)
151 pending_dependents.remove(&first_dependency);
152 labels.remove(&first_dependency)
153 } else {
154 // Dependency is not yet ready to be popped because it has other dependents
155 // to be visited. Copy its contributor set
156 *pending_dependents.get_mut(&first_dependency).unwrap() -= 1;
157 labels.get(&first_dependency).cloned()
158 };
159
160 // Merge it with all the others
161 if let Some(first_dependency_label) = first_dependency_label {
162 merged_label = self.map_reducer.reduce(
163 first_dependency_label,
164 dependencies.flat_map(|dep|
165 // If 'node' is a revision, then 'dep' is its parent revision
166 labels.get(&dep)),
167 )?;
168 break;
169 }
170 }
171
172 let label = match merged_label {
173 Some(merged_label) => self.map_reducer.map_reduce(node, merged_label)?,
174 None => self.map_reducer.map(node)?,
175 };
176 self.map_reducer.on_node_traversed(node, label.as_ref())?;
177 if num_dependents > 0 {
178 if let Some(label) = label {
179 let previous_label = labels.insert(node, label);
180 assert!(previous_label.is_none(), "{node} was labeled twice");
181 }
182 } else {
183 assert!(!labels.contains_key(&node), "{node} was already labeled");
184 }
185 }
186
187 pl.done();
188
189 Ok(())
190 }
191}