webgraph_algo/sccs/
symm_par.rs

1/*
2 * SPDX-FileCopyrightText: 2024 Matteo Dell'Acqua
3 * SPDX-FileCopyrightText: 2025 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8use crate::{
9    prelude::*,
10    visits::breadth_first::{EventNoPred, ParFairNoPred},
11};
12use dsi_progress_logger::ConcurrentProgressLog;
13use no_break::NoBreak;
14use rayon::ThreadPool;
15use std::{
16    mem::MaybeUninit,
17    ops::ControlFlow::Continue,
18    sync::atomic::{AtomicUsize, Ordering},
19};
20use sync_cell_slice::SyncSlice;
21use webgraph::traits::RandomAccessGraph;
22
23/// Connected components of symmetric graphs by parallel visits.
24pub fn symm_par(
25    graph: impl RandomAccessGraph + Sync,
26    thread_pool: &ThreadPool,
27    pl: &mut impl ConcurrentProgressLog,
28) -> Sccs {
29    // TODO debug_assert!(check_symmetric(&graph));
30
31    let num_nodes = graph.num_nodes();
32    pl.item_name("node");
33    pl.expected_updates(Some(num_nodes));
34    pl.start("Computing strongly connected components...");
35
36    let mut visit = ParFairNoPred::new(&graph);
37    let mut component = Box::new_uninit_slice(num_nodes);
38
39    let number_of_components = AtomicUsize::new(0);
40    let slice = component.as_sync_slice();
41
42    for node in 0..num_nodes {
43        visit
44            .par_visit_with(
45                [node],
46                pl.clone(),
47                |pl, event| {
48                    match event {
49                        EventNoPred::Init { .. } => {}
50                        EventNoPred::Unknown { node, .. } => {
51                            pl.light_update();
52                            unsafe {
53                                slice[node].set(MaybeUninit::new(
54                                    number_of_components.load(Ordering::Relaxed),
55                                ))
56                            };
57                        }
58                        EventNoPred::Done { .. } => {
59                            number_of_components.fetch_add(1, Ordering::Relaxed);
60                        }
61                        _ => (),
62                    }
63                    Continue(())
64                },
65                thread_pool,
66            )
67            .continue_value_no_break();
68    }
69
70    let component = unsafe { component.assume_init() };
71
72    pl.done();
73
74    Sccs::new(number_of_components.load(Ordering::Relaxed), component)
75}