swh_graph_stdlib/
connectivity.rs

1// Copyright (C) 2025  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use anyhow::{ensure, Result};
7use dashmap::DashSet;
8use dsi_progress_logger::{concurrent_progress_logger, progress_logger, ProgressLog};
9use epserde::Epserde;
10use rayon::prelude::*;
11use rdst::RadixSort;
12use sux::bits::BitFieldVec;
13use sux::bits::BitVec;
14use sux::dict::elias_fano::{EliasFano, EliasFanoBuilder, EliasFanoConcurrentBuilder};
15use sux::prelude::{SelectAdaptConst, SelectZeroAdaptConst};
16use sux::traits::IndexedSeq;
17use swh_graph::graph::{
18    NodeId, SwhBackwardGraph, SwhForwardGraph, SwhGraph, SwhGraphWithProperties,
19};
20use swh_graph::views::contiguous_subgraph::{
21    ContiguousSubgraph, Contraction, MonotoneContractionBackend,
22};
23use value_traits::slices::SliceByValue;
24use webgraph::traits::labels::SortedIterator;
25
26type EfSeqDict<D> =
27    EliasFano<SelectZeroAdaptConst<SelectAdaptConst<BitVec<D>, D, 12, 3>, D, 12, 3>>;
28type EfSeq<D> = EliasFano<SelectAdaptConst<sux::bits::BitVec<D>, D, 12, 3>>;
29
30/// A structure that gives access to connected components of a subgraph.
31#[derive(Epserde)]
32pub struct SubgraphWccs<
33    D: AsRef<[usize]> = Box<[usize]>,
34    V: SliceByValue<Value = usize> = BitFieldVec<usize>,
35> {
36    num_components: usize,
37    /// To be interpreted as wrapped in [`Contraction`]
38    contraction: EfSeqDict<D>,
39    ccs: V,
40    component_sizes: Option<EfSeq<D>>,
41}
42
43impl SubgraphWccs<Box<[usize]>, BitFieldVec<usize>> {
44    /// Given a set of nodes, computes the connected components in the whole graph
45    /// that contain these nodes.
46    ///
47    /// For example, if a graph is:
48    ///
49    /// ```ignore
50    /// A -> B -> C
51    ///      ^
52    ///     /
53    /// D --
54    /// E -> F -> G
55    /// ```
56    ///
57    /// then:
58    ///
59    /// * `build_from_closure([A, D, F])` and `build_from_closure([A, B, D, F])` compute
60    ///   `[[A, B, C, D], [E, F, G]]`
61    /// * `build_from_closure([A])` computes `[[A, B, C, D]]`
62    pub fn build_from_closure<G, I: IntoParallelIterator<Item = NodeId>>(
63        graph: G,
64        nodes: I,
65        sort_by_size: bool,
66    ) -> Result<Self>
67    where
68        // FIXME: G should not need to be 'static
69        G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
70        for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
71            SortedIterator,
72        for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
73            SortedIterator,
74    {
75        let seen = DashSet::new(); // shared between DFSs to avoid duplicate work
76
77        let mut pl = concurrent_progress_logger! {
78            item_name = "node",
79            local_speed = true,
80            display_memory = true,
81        };
82        pl.start("Listing nodes in connected closure");
83        nodes
84            .into_par_iter()
85            .for_each_with(pl.clone(), |pl, start_node| {
86                seen.insert(start_node);
87                let mut todo = vec![start_node];
88
89                while let Some(node) = todo.pop() {
90                    pl.light_update();
91                    for pred in graph.predecessors(node) {
92                        let new = seen.insert(pred);
93                        if new {
94                            todo.push(pred);
95                        }
96                    }
97                    for succ in graph.successors(node) {
98                        let new = seen.insert(succ);
99                        if new {
100                            todo.push(succ);
101                        }
102                    }
103                }
104            });
105        pl.done();
106
107        let nodes: Vec<_> = seen.into_par_iter().collect();
108        Self::build_from_nodes(graph, nodes, sort_by_size)
109    }
110
111    /// Given a set of nodes, computes the connected components in the subgraph made
112    /// of only these nodes.
113    ///
114    /// For example, if a graph is:
115    ///
116    /// ```ignore
117    /// A -> B -> C
118    ///      ^
119    ///     /
120    /// D --
121    /// E -> F -> G
122    /// ```
123    ///
124    /// then:
125    ///
126    /// * `build_from_nodes([A, D, F])` computes `[[A], [D], [F]]`
127    /// * `build_from_nodes([A, B, D, F])` compute `[[A, B, D], [F]]`
128    /// * `build_from_nodes([A])` computes `[[A]]`
129    pub fn build_from_nodes<G>(graph: G, mut nodes: Vec<NodeId>, sort_by_size: bool) -> Result<Self>
130    where
131        // FIXME: G should not need to be 'static
132        G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
133        for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
134            SortedIterator,
135        for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
136            SortedIterator,
137    {
138        log::info!("Sorting reachable nodes");
139        nodes.radix_sort_unstable();
140
141        unsafe { Self::build_from_sorted_nodes(graph, nodes, sort_by_size) }
142    }
143
144    /// Same as [`Self::build_from_nodes`] but assumes the vector of nodes is sorted.
145    ///
146    /// # Safety
147    ///
148    /// Undefined behavior if the vector is not sorted
149    pub unsafe fn build_from_sorted_nodes<G>(
150        graph: G,
151        nodes: Vec<NodeId>,
152        sort_by_size: bool,
153    ) -> Result<Self>
154    where
155        // FIXME: G should not need to be 'static
156        G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
157        for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
158            SortedIterator,
159        for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
160            SortedIterator,
161    {
162        let mut pl = concurrent_progress_logger!(
163            item_name = "node",
164            local_speed = true,
165            display_memory = true,
166            expected_updates = Some(nodes.len()),
167        );
168        ensure!(!nodes.is_empty(), "Empty set of nodes"); // Makes EliasFanoConcurrentBuilder panic
169        let efb = EliasFanoConcurrentBuilder::new(nodes.len(), graph.num_nodes());
170        pl.start("Compressing set of reachable nodes");
171        nodes
172            .into_par_iter()
173            .enumerate()
174            // SAFETY: 'index' is unique, and the vector is sorted
175            .for_each_with(pl.clone(), |pl, (index, node)| {
176                pl.light_update();
177                unsafe { efb.set(index, node) }
178            });
179        pl.done();
180
181        let contraction = Contraction(efb.build_with_seq_and_dict());
182
183        Self::build_from_contraction(graph, contraction, sort_by_size)
184    }
185
186    /// Same as [`Self::build_from_nodes`] but takes a [`Contraction`] as input
187    /// instead of a `Vec<usize>`
188    pub fn build_from_contraction<G>(
189        graph: G,
190        contraction: Contraction<EfSeqDict<Box<[usize]>>>,
191        sort_by_size: bool,
192    ) -> Result<Self>
193    where
194        // FIXME: G should not need to be 'static
195        G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
196        for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
197            SortedIterator,
198        for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
199            SortedIterator,
200    {
201        // only keep selected nodes
202        let contracted_graph = ContiguousSubgraph::new_from_contraction(graph, contraction);
203
204        let mut pl = concurrent_progress_logger!(
205            item_name = "node",
206            local_speed = true,
207            display_memory = true,
208            expected_updates = Some(contracted_graph.num_nodes()),
209        );
210        let symmetrized_graph = swh_graph::views::SymmetricWebgraphAdapter(contracted_graph);
211        let mut sccs = webgraph_algo::sccs::symm_par(&symmetrized_graph, &mut pl);
212        let swh_graph::views::SymmetricWebgraphAdapter(contracted_graph) = symmetrized_graph;
213        pl.done();
214
215        let component_sizes = if sort_by_size {
216            log::info!("Sorting connected components by size...");
217            let sizes_vec = sccs.sort_by_size();
218            let mut pl = progress_logger!(
219                item_name = "node",
220                local_speed = true,
221                display_memory = true,
222                expected_updates = Some(sccs.num_components()),
223            );
224            pl.start("Compacting WCC sizes");
225            let mut efb = EliasFanoBuilder::new(
226                sccs.num_components(),
227                sizes_vec.first().copied().unwrap_or(1),
228            );
229
230            // Reversed because Elias-Fano needs to be in ascending order
231            efb.extend(sizes_vec.iter().rev().copied());
232            Some(efb.build_with_seq())
233        } else {
234            None
235        };
236
237        let mut pl = progress_logger!(
238            item_name = "node",
239            local_speed = true,
240            display_memory = true,
241            expected_updates = Some(contracted_graph.num_nodes()),
242        );
243        pl.start("Compacting WCCs array");
244
245        let bit_width = sccs
246            .num_components()
247            .next_power_of_two() // because checked_ilog2() rounds down
248            .checked_ilog2()
249            .unwrap()
250            .max(1); // UB in BitFieldVec when bit_width = 0
251        let bit_width = usize::try_from(bit_width).expect("bit width overflowed usize");
252        let mut ccs = BitFieldVec::with_capacity(bit_width, contracted_graph.num_nodes());
253        for node in contracted_graph.iter_nodes(pl) {
254            ccs.push(
255                // Reverse component ids to be consistent with component_sizes
256                sccs.num_components() - sccs.components()[node] - 1,
257            );
258        }
259
260        let (_graph, Contraction(contraction)) = contracted_graph.into_parts();
261
262        Ok(Self {
263            ccs,
264            contraction,
265            component_sizes,
266            num_components: sccs.num_components(),
267        })
268    }
269}
270
271impl<D: AsRef<[usize]>, V: SliceByValue<Value = usize>> SubgraphWccs<D, V> {
272    /// Returns the total number in all connected components
273    pub fn num_nodes(&self) -> usize {
274        self.contraction().num_nodes()
275    }
276
277    pub fn contraction(&self) -> Contraction<&impl MonotoneContractionBackend> {
278        Contraction(&self.contraction)
279    }
280
281    /// Returns an iterator on all the nodes in any connected component
282    ///
283    /// Order is not guaranteed.
284    ///
285    /// Updates the progress logger on every node id from 0 to `self.num_nodes()`,
286    /// even those that are filtered out by an underlying subgraph.
287    pub fn iter_nodes(&self) -> impl Iterator<Item = NodeId> + use<'_, D, V> {
288        (0..self.contraction().num_nodes()).map(|node| self.contraction().underlying_node_id(node))
289    }
290    /// Returns a parallel iterator on all the nodes in any connected component
291    ///
292    /// Order is not guaranteed.
293    ///
294    /// Updates the progress logger on every node id from 0 to `self.num_nodes()`,
295    /// even those that are filtered out by an underlying subgraph.
296    pub fn par_iter_nodes(&self) -> impl ParallelIterator<Item = NodeId> + use<'_, D, V>
297    where
298        D: Sync + Send,
299        V: Sync + Send,
300    {
301        (0..self.contraction().num_nodes())
302            .into_par_iter()
303            .map(move |node| self.contraction().underlying_node_id(node))
304    }
305
306    /// Returns the number of strongly connected components.
307    pub fn num_components(&self) -> usize {
308        self.num_components
309    }
310
311    /// Given a node, returns which component it belongs to, if any
312    #[inline(always)]
313    pub fn component(&self, node: NodeId) -> Option<usize> {
314        self.ccs
315            .get_value(self.contraction().node_id_from_underlying(node)?)
316    }
317
318    /// Returns `None` if component sizes were not computed, or the size of the `i`th component
319    /// otherwise
320    pub fn component_size(&self, i: usize) -> Option<usize> {
321        self.component_sizes.as_ref().map(move |sizes| sizes.get(i))
322    }
323
324    /// Returns `None` if component sizes were not computed, `Some(size)` otherwise
325    pub fn component_sizes(&self) -> Option<impl Iterator<Item = usize> + use<'_, D, V>> {
326        self.component_sizes.as_ref().map(|sizes| sizes.iter())
327    }
328}