use anyhow::{Result, ensure};
use dashmap::DashSet;
use dsi_progress_logger::{ProgressLog, concurrent_progress_logger, progress_logger};
use epserde::Epserde;
use rayon::prelude::*;
use rdst::RadixSort;
use sux::bits::BitFieldVec;
use sux::bits::BitVec;
use sux::dict::elias_fano::{EliasFano, EliasFanoBuilder, EliasFanoConcurrentBuilder};
use sux::prelude::{SelectAdaptConst, SelectZeroAdaptConst};
use sux::traits::IndexedSeq;
use swh_graph::graph::{
NodeId, SwhBackwardGraph, SwhForwardGraph, SwhGraph, SwhGraphWithProperties,
};
use swh_graph::views::contiguous_subgraph::{
ContiguousSubgraph, Contraction, MonotoneContractionBackend,
};
use value_traits::slices::SliceByValue;
use webgraph::traits::labels::SortedIterator;
type EfSeqDict<D> =
EliasFano<SelectZeroAdaptConst<SelectAdaptConst<BitVec<D>, D, 12, 3>, D, 12, 3>>;
type EfSeq<D> = EliasFano<SelectAdaptConst<sux::bits::BitVec<D>, D, 12, 3>>;
#[derive(Epserde)]
pub struct SubgraphWccs<
D: AsRef<[usize]> = Box<[usize]>,
V: SliceByValue<Value = usize> = BitFieldVec<usize>,
> {
num_components: usize,
contraction: EfSeqDict<D>,
ccs: V,
component_sizes: Option<EfSeq<D>>,
}
impl SubgraphWccs<Box<[usize]>, BitFieldVec<usize>> {
pub fn build_from_closure<G, I: IntoParallelIterator<Item = NodeId>>(
graph: G,
nodes: I,
sort_by_size: bool,
) -> Result<Self>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
SortedIterator,
for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
SortedIterator,
{
let seen = DashSet::new();
let mut pl = concurrent_progress_logger! {
item_name = "node",
local_speed = true,
display_memory = true,
};
pl.start("Listing nodes in connected closure");
nodes
.into_par_iter()
.for_each_with(pl.clone(), |pl, start_node| {
seen.insert(start_node);
let mut todo = vec![start_node];
while let Some(node) = todo.pop() {
pl.light_update();
for pred in graph.predecessors(node) {
let new = seen.insert(pred);
if new {
todo.push(pred);
}
}
for succ in graph.successors(node) {
let new = seen.insert(succ);
if new {
todo.push(succ);
}
}
}
});
pl.done();
let nodes: Vec<_> = seen.into_par_iter().collect();
Self::build_from_nodes(graph, nodes, sort_by_size)
}
pub fn build_from_nodes<G>(graph: G, mut nodes: Vec<NodeId>, sort_by_size: bool) -> Result<Self>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
SortedIterator,
for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
SortedIterator,
{
log::info!("Sorting reachable nodes");
nodes.radix_sort_unstable();
unsafe { Self::build_from_sorted_nodes(graph, nodes, sort_by_size) }
}
pub unsafe fn build_from_sorted_nodes<G>(
graph: G,
nodes: Vec<NodeId>,
sort_by_size: bool,
) -> Result<Self>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
SortedIterator,
for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
SortedIterator,
{
let mut pl = concurrent_progress_logger!(
item_name = "node",
local_speed = true,
display_memory = true,
expected_updates = Some(nodes.len()),
);
ensure!(!nodes.is_empty(), "Empty set of nodes"); let efb = EliasFanoConcurrentBuilder::new(nodes.len(), graph.num_nodes());
pl.start("Compressing set of reachable nodes");
nodes
.into_par_iter()
.enumerate()
.for_each_with(pl.clone(), |pl, (index, node)| {
pl.light_update();
unsafe { efb.set(index, node) }
});
pl.done();
let contraction = Contraction(efb.build_with_seq_and_dict());
Self::build_from_contraction(graph, contraction, sort_by_size)
}
pub fn build_from_contraction<G>(
graph: G,
contraction: Contraction<EfSeqDict<Box<[usize]>>>,
sort_by_size: bool,
) -> Result<Self>
where
G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
SortedIterator,
for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
SortedIterator,
{
let contracted_graph = ContiguousSubgraph::new_from_contraction(graph, contraction);
let mut pl = concurrent_progress_logger!(
item_name = "node",
local_speed = true,
display_memory = true,
expected_updates = Some(contracted_graph.num_nodes()),
);
let symmetrized_graph = swh_graph::views::SymmetricWebgraphAdapter(contracted_graph);
let mut sccs = webgraph_algo::sccs::symm_par(&symmetrized_graph, &mut pl);
let swh_graph::views::SymmetricWebgraphAdapter(contracted_graph) = symmetrized_graph;
pl.done();
let component_sizes = if sort_by_size {
log::info!("Sorting connected components by size...");
let sizes_vec = sccs.sort_by_size();
let mut pl = progress_logger!(
item_name = "node",
local_speed = true,
display_memory = true,
expected_updates = Some(sccs.num_components()),
);
pl.start("Compacting WCC sizes");
let mut efb = EliasFanoBuilder::new(
sccs.num_components(),
sizes_vec.first().copied().unwrap_or(1),
);
efb.extend(sizes_vec.iter().rev().copied());
Some(efb.build_with_seq())
} else {
None
};
let mut pl = progress_logger!(
item_name = "node",
local_speed = true,
display_memory = true,
expected_updates = Some(contracted_graph.num_nodes()),
);
pl.start("Compacting WCCs array");
let bit_width = sccs
.num_components()
.next_power_of_two() .checked_ilog2()
.unwrap()
.max(1); let bit_width = usize::try_from(bit_width).expect("bit width overflowed usize");
let mut ccs = BitFieldVec::with_capacity(bit_width, contracted_graph.num_nodes());
for node in contracted_graph.iter_nodes(pl) {
ccs.push(
sccs.num_components() - sccs.components()[node] - 1,
);
}
let (_graph, Contraction(contraction)) = contracted_graph.into_parts();
Ok(Self {
ccs,
contraction,
component_sizes,
num_components: sccs.num_components(),
})
}
}
impl<D: AsRef<[usize]>, V: SliceByValue<Value = usize>> SubgraphWccs<D, V> {
pub fn num_nodes(&self) -> usize {
self.contraction().num_nodes()
}
pub fn contraction(&self) -> Contraction<&impl MonotoneContractionBackend> {
Contraction(&self.contraction)
}
pub fn iter_nodes(&self) -> impl Iterator<Item = NodeId> + use<'_, D, V> {
(0..self.contraction().num_nodes()).map(|node| self.contraction().underlying_node_id(node))
}
pub fn par_iter_nodes(&self) -> impl ParallelIterator<Item = NodeId> + use<'_, D, V>
where
D: Sync + Send,
V: Sync + Send,
{
(0..self.contraction().num_nodes())
.into_par_iter()
.map(move |node| self.contraction().underlying_node_id(node))
}
pub fn num_components(&self) -> usize {
self.num_components
}
#[inline(always)]
pub fn component(&self, node: NodeId) -> Option<usize> {
self.ccs
.get_value(self.contraction().node_id_from_underlying(node)?)
}
pub fn component_size(&self, i: usize) -> Option<usize> {
self.component_sizes.as_ref().map(move |sizes| sizes.get(i))
}
pub fn component_sizes(&self) -> Option<impl Iterator<Item = usize> + use<'_, D, V>> {
self.component_sizes.as_ref().map(|sizes| sizes.iter())
}
}