use crate::prelude::*;
use crossbeam_utils::CachePadded;
use dsi_progress_logger::ConcurrentProgressLog;
use no_break::NoBreak;
use std::{
mem::MaybeUninit,
ops::ControlFlow::Continue,
sync::atomic::{AtomicUsize, Ordering},
};
use sync_cell_slice::SyncSlice;
use webgraph::traits::RandomAccessGraph;
use webgraph::visits::{
Parallel,
breadth_first::{EventNoPred, ParFairNoPred},
};
pub fn symm_par(graph: impl RandomAccessGraph + Sync, pl: &mut impl ConcurrentProgressLog) -> Sccs {
let num_nodes = graph.num_nodes();
pl.item_name("node");
pl.expected_updates(Some(num_nodes));
pl.start("Computing connected components...");
let mut visit = ParFairNoPred::new(&graph);
let mut component = Box::new_uninit_slice(num_nodes);
let number_of_components = CachePadded::new(AtomicUsize::new(0));
let slice = component.as_sync_slice();
for node in 0..num_nodes {
visit
.par_visit_with([node], pl.clone(), |pl, event| {
match event {
EventNoPred::Init { .. } => {}
EventNoPred::Visit { node, .. } => {
pl.light_update();
unsafe {
slice[node].set(MaybeUninit::new(
number_of_components.load(Ordering::Relaxed),
))
};
}
EventNoPred::Done { .. } => {
number_of_components.fetch_add(1, Ordering::Relaxed);
}
_ => (),
}
Continue(())
})
.continue_value_no_break();
}
let component = unsafe { component.assume_init() };
pl.done();
Sccs::new(number_of_components.load(Ordering::Relaxed), component)
}