1use anyhow::{ensure, Result};
7use dashmap::DashSet;
8use dsi_progress_logger::{concurrent_progress_logger, progress_logger, ProgressLog};
9use epserde::Epserde;
10use rayon::prelude::*;
11use rdst::RadixSort;
12use sux::bits::BitFieldVec;
13use sux::bits::BitVec;
14use sux::dict::elias_fano::{EliasFano, EliasFanoBuilder, EliasFanoConcurrentBuilder};
15use sux::prelude::{SelectAdaptConst, SelectZeroAdaptConst};
16use sux::traits::IndexedSeq;
17use swh_graph::graph::{
18 NodeId, SwhBackwardGraph, SwhForwardGraph, SwhGraph, SwhGraphWithProperties,
19};
20use swh_graph::views::contiguous_subgraph::{
21 ContiguousSubgraph, Contraction, MonotoneContractionBackend,
22};
23use value_traits::slices::SliceByValue;
24use webgraph::traits::labels::SortedIterator;
25
26type EfSeqDict<D> =
27 EliasFano<SelectZeroAdaptConst<SelectAdaptConst<BitVec<D>, D, 12, 3>, D, 12, 3>>;
28type EfSeq<D> = EliasFano<SelectAdaptConst<sux::bits::BitVec<D>, D, 12, 3>>;
29
30#[derive(Epserde)]
32pub struct SubgraphWccs<
33 D: AsRef<[usize]> = Box<[usize]>,
34 V: SliceByValue<Value = usize> = BitFieldVec<usize>,
35> {
36 num_components: usize,
37 contraction: EfSeqDict<D>,
39 ccs: V,
40 component_sizes: Option<EfSeq<D>>,
41}
42
43impl SubgraphWccs<Box<[usize]>, BitFieldVec<usize>> {
44 pub fn build_from_closure<G, I: IntoParallelIterator<Item = NodeId>>(
63 graph: G,
64 nodes: I,
65 sort_by_size: bool,
66 ) -> Result<Self>
67 where
68 G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
70 for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
71 SortedIterator,
72 for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
73 SortedIterator,
74 {
75 let seen = DashSet::new(); let mut pl = concurrent_progress_logger! {
78 item_name = "node",
79 local_speed = true,
80 display_memory = true,
81 };
82 pl.start("Listing nodes in connected closure");
83 nodes
84 .into_par_iter()
85 .for_each_with(pl.clone(), |pl, start_node| {
86 seen.insert(start_node);
87 let mut todo = vec![start_node];
88
89 while let Some(node) = todo.pop() {
90 pl.light_update();
91 for pred in graph.predecessors(node) {
92 let new = seen.insert(pred);
93 if new {
94 todo.push(pred);
95 }
96 }
97 for succ in graph.successors(node) {
98 let new = seen.insert(succ);
99 if new {
100 todo.push(succ);
101 }
102 }
103 }
104 });
105 pl.done();
106
107 let nodes: Vec<_> = seen.into_par_iter().collect();
108 Self::build_from_nodes(graph, nodes, sort_by_size)
109 }
110
111 pub fn build_from_nodes<G>(graph: G, mut nodes: Vec<NodeId>, sort_by_size: bool) -> Result<Self>
130 where
131 G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
133 for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
134 SortedIterator,
135 for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
136 SortedIterator,
137 {
138 log::info!("Sorting reachable nodes");
139 nodes.radix_sort_unstable();
140
141 unsafe { Self::build_from_sorted_nodes(graph, nodes, sort_by_size) }
142 }
143
144 pub unsafe fn build_from_sorted_nodes<G>(
150 graph: G,
151 nodes: Vec<NodeId>,
152 sort_by_size: bool,
153 ) -> Result<Self>
154 where
155 G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
157 for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
158 SortedIterator,
159 for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
160 SortedIterator,
161 {
162 let mut pl = concurrent_progress_logger!(
163 item_name = "node",
164 local_speed = true,
165 display_memory = true,
166 expected_updates = Some(nodes.len()),
167 );
168 ensure!(!nodes.is_empty(), "Empty set of nodes"); let efb = EliasFanoConcurrentBuilder::new(nodes.len(), graph.num_nodes());
170 pl.start("Compressing set of reachable nodes");
171 nodes
172 .into_par_iter()
173 .enumerate()
174 .for_each_with(pl.clone(), |pl, (index, node)| {
176 pl.light_update();
177 unsafe { efb.set(index, node) }
178 });
179 pl.done();
180
181 let contraction = Contraction(efb.build_with_seq_and_dict());
182
183 Self::build_from_contraction(graph, contraction, sort_by_size)
184 }
185
186 pub fn build_from_contraction<G>(
189 graph: G,
190 contraction: Contraction<EfSeqDict<Box<[usize]>>>,
191 sort_by_size: bool,
192 ) -> Result<Self>
193 where
194 G: SwhForwardGraph + SwhBackwardGraph + SwhGraphWithProperties + Sync + Send + 'static,
196 for<'succ> <<G as SwhForwardGraph>::Successors<'succ> as IntoIterator>::IntoIter:
197 SortedIterator,
198 for<'pred> <<G as SwhBackwardGraph>::Predecessors<'pred> as IntoIterator>::IntoIter:
199 SortedIterator,
200 {
201 let contracted_graph = ContiguousSubgraph::new_from_contraction(graph, contraction);
203
204 let mut pl = concurrent_progress_logger!(
205 item_name = "node",
206 local_speed = true,
207 display_memory = true,
208 expected_updates = Some(contracted_graph.num_nodes()),
209 );
210 let symmetrized_graph = swh_graph::views::SymmetricWebgraphAdapter(contracted_graph);
211 let mut sccs = webgraph_algo::sccs::symm_par(&symmetrized_graph, &mut pl);
212 let swh_graph::views::SymmetricWebgraphAdapter(contracted_graph) = symmetrized_graph;
213 pl.done();
214
215 let component_sizes = if sort_by_size {
216 log::info!("Sorting connected components by size...");
217 let sizes_vec = sccs.sort_by_size();
218 let mut pl = progress_logger!(
219 item_name = "node",
220 local_speed = true,
221 display_memory = true,
222 expected_updates = Some(sccs.num_components()),
223 );
224 pl.start("Compacting WCC sizes");
225 let mut efb = EliasFanoBuilder::new(
226 sccs.num_components(),
227 sizes_vec.first().copied().unwrap_or(1),
228 );
229
230 efb.extend(sizes_vec.iter().rev().copied());
232 Some(efb.build_with_seq())
233 } else {
234 None
235 };
236
237 let mut pl = progress_logger!(
238 item_name = "node",
239 local_speed = true,
240 display_memory = true,
241 expected_updates = Some(contracted_graph.num_nodes()),
242 );
243 pl.start("Compacting WCCs array");
244
245 let bit_width = sccs
246 .num_components()
247 .next_power_of_two() .checked_ilog2()
249 .unwrap()
250 .max(1); let bit_width = usize::try_from(bit_width).expect("bit width overflowed usize");
252 let mut ccs = BitFieldVec::with_capacity(bit_width, contracted_graph.num_nodes());
253 for node in contracted_graph.iter_nodes(pl) {
254 ccs.push(
255 sccs.num_components() - sccs.components()[node] - 1,
257 );
258 }
259
260 let (_graph, Contraction(contraction)) = contracted_graph.into_parts();
261
262 Ok(Self {
263 ccs,
264 contraction,
265 component_sizes,
266 num_components: sccs.num_components(),
267 })
268 }
269}
270
271impl<D: AsRef<[usize]>, V: SliceByValue<Value = usize>> SubgraphWccs<D, V> {
272 pub fn num_nodes(&self) -> usize {
274 self.contraction().num_nodes()
275 }
276
277 pub fn contraction(&self) -> Contraction<&impl MonotoneContractionBackend> {
278 Contraction(&self.contraction)
279 }
280
281 pub fn iter_nodes(&self) -> impl Iterator<Item = NodeId> + use<'_, D, V> {
288 (0..self.contraction().num_nodes()).map(|node| self.contraction().underlying_node_id(node))
289 }
290 pub fn par_iter_nodes(&self) -> impl ParallelIterator<Item = NodeId> + use<'_, D, V>
297 where
298 D: Sync + Send,
299 V: Sync + Send,
300 {
301 (0..self.contraction().num_nodes())
302 .into_par_iter()
303 .map(move |node| self.contraction().underlying_node_id(node))
304 }
305
306 pub fn num_components(&self) -> usize {
308 self.num_components
309 }
310
311 #[inline(always)]
313 pub fn component(&self, node: NodeId) -> Option<usize> {
314 self.ccs
315 .get_value(self.contraction().node_id_from_underlying(node)?)
316 }
317
318 pub fn component_size(&self, i: usize) -> Option<usize> {
321 self.component_sizes.as_ref().map(move |sizes| sizes.get(i))
322 }
323
324 pub fn component_sizes(&self) -> Option<impl Iterator<Item = usize> + use<'_, D, V>> {
326 self.component_sizes.as_ref().map(|sizes| sizes.iter())
327 }
328}