1use anyhow::{Context, Result};
31use crossbeam_utils::CachePadded;
32use dsi_progress_logger::prelude::*;
33use epserde::prelude::*;
34use predicates::Predicate;
35use preds::PredParams;
36
37use log::info;
38use rand::SeedableRng;
39use rand::rngs::SmallRng;
40use rand::seq::IndexedRandom;
41use rand::seq::SliceRandom;
42use rayon::prelude::*;
43use std::collections::HashMap;
44use std::collections::VecDeque;
45use std::path::Path;
46use std::sync::atomic::Ordering;
47use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
48use sux::traits::Succ;
49use sync_cell_slice::SyncSlice;
50use webgraph::prelude::PermutedGraph;
51use webgraph::traits::RandomAccessGraph;
52use webgraph::utils::Granularity;
53
54pub(crate) mod gap_cost;
55pub(crate) mod label_store;
56mod mix64;
57pub mod preds;
58
59const RAYON_MIN_LEN: usize = 100000;
60
61#[derive(Epserde, Debug, Clone)]
62pub struct LabelsStore<A> {
64 pub gap_cost: f64,
65 pub gamma: f64,
66 pub labels: A,
67}
68
69#[allow(clippy::too_many_arguments)]
94pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
95 sym_graph: R,
96 deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
97 gammas: Vec<f64>,
98 num_threads: Option<usize>,
99 chunk_size: Option<usize>,
100 arc_granularity: Granularity,
101 seed: u64,
102 predicate: impl Predicate<preds::PredParams>,
103 work_dir: impl AsRef<Path>,
104) -> Result<Box<[usize]>> {
105 layered_label_propagation_labels_only(
107 sym_graph,
108 deg_cumul,
109 gammas,
110 num_threads,
111 chunk_size,
112 arc_granularity,
113 seed,
114 predicate,
115 &work_dir,
116 )?;
117 combine_labels(work_dir)
119}
120
121#[allow(clippy::too_many_arguments)]
122pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
125 sym_graph: R,
126 deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
127 gammas: Vec<f64>,
128 num_threads: Option<usize>,
129 chunk_size: Option<usize>,
130 arc_granularity: Granularity,
131 seed: u64,
132 predicate: impl Predicate<preds::PredParams>,
133 work_dir: impl AsRef<Path>,
134) -> Result<()> {
135 let work_path = work_dir.as_ref();
138 let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
139 const IMPROV_WINDOW: usize = 10;
140 let num_nodes = sym_graph.num_nodes();
141 let chunk_size = chunk_size.unwrap_or(1_000_000);
142 let num_threads = num_threads.unwrap_or_else(num_cpus::get);
143
144 let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
146
147 let mut can_change = Vec::with_capacity(num_nodes as _);
148 can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
149 let mut label_store = label_store::LabelStore::new(num_nodes as _);
150 let thread_pool = rayon::ThreadPoolBuilder::new()
152 .num_threads(num_threads)
153 .build()
154 .context("Could not create thread pool")?;
155
156 let mut gamma_pl = progress_logger![
158 display_memory = true,
159 item_name = "gamma",
160 expected_updates = Some(gammas.len()),
161 ];
162
163 let mut iter_pl = progress_logger![item_name = "update"];
165
166 let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
167
168 let mut update_pl = concurrent_progress_logger![item_name = "node", local_speed = true];
170
171 let seed = CachePadded::new(AtomicU64::new(seed));
172 let mut costs = Vec::with_capacity(gammas.len());
173
174 gamma_pl.start(format!("Running {} threads", num_threads));
175 info!("Stopping criterion: {predicate}");
176
177 for (gamma_index, gamma) in gammas.iter().enumerate() {
178 iter_pl.start(format!(
180 "Starting iterations with gamma={} ({}/{})...",
181 gamma,
182 gamma_index + 1,
183 gammas.len(),
184 ));
185 label_store.init();
186 thread_pool.install(|| {
187 can_change
188 .par_iter()
189 .with_min_len(RAYON_MIN_LEN)
190 .for_each(|c| c.store(true, Ordering::Relaxed));
191 });
192
193 let mut obj_func = 0.0;
194 let mut prev_gain = f64::MAX;
195 let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
196
197 for update in 0.. {
198 update_pl.expected_updates(Some(num_nodes));
199 update_pl.start(format!(
200 "Starting update {} (for gamma={}, {}/{})...",
201 update,
202 gamma,
203 gamma_index + 1,
204 gammas.len()
205 ));
206
207 update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
208 thread_pool.install(|| {
209 update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
211 let seed = seed.fetch_add(1, Ordering::Relaxed);
212 let mut rand = SmallRng::seed_from_u64(seed);
213 chunk.shuffle(&mut rand);
214 });
215 });
216
217 let modified = CachePadded::new(AtomicUsize::new(0));
219
220 let delta_obj_func = sym_graph.par_apply(
221 |range| {
222 let mut rand = SmallRng::seed_from_u64(range.start as u64);
223 let mut local_obj_func = 0.0;
224 for &node in &update_perm[range] {
225 if !can_change[node].load(Ordering::Relaxed) {
231 continue;
232 }
233 can_change[node].store(false, Ordering::Relaxed);
235
236 let successors = sym_graph.successors(node);
237 if sym_graph.outdegree(node) == 0 {
238 continue;
239 }
240
241 let curr_label = label_store.label(node);
243
244 let mut map =
246 HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
247 for succ in successors {
248 map.entry(label_store.label(succ))
249 .and_modify(|counter| *counter += 1)
250 .or_insert(1_usize);
251 }
252 map.entry(curr_label).or_insert(0_usize);
254
255 let mut max = f64::NEG_INFINITY;
256 let mut old = 0.0;
257 let mut majorities = vec![];
258 for (&label, &count) in map.iter() {
260 let volume = label_store.volume(label); let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
272
273 if max == val {
274 majorities.push(label);
275 }
276
277 if val > max {
278 majorities.clear();
279 max = val;
280 majorities.push(label);
281 }
282
283 if label == curr_label {
284 old = val;
285 }
286 }
287 let next_label = *majorities.choose(&mut rand).unwrap();
289 if next_label != curr_label {
292 modified.fetch_add(1, Ordering::Relaxed);
293 for succ in sym_graph.successors(node) {
294 can_change[succ].store(true, Ordering::Relaxed);
295 }
296 label_store.update(node, next_label);
297 }
298 local_obj_func += max - old;
299 }
300 local_obj_func
301 },
302 |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
303 arc_granularity,
304 deg_cumul,
305 &thread_pool,
306 &mut update_pl,
307 );
308
309 update_pl.done_with_count(num_nodes);
310 iter_pl.update_and_display();
311
312 obj_func += delta_obj_func;
313 let gain = delta_obj_func / obj_func;
314 let gain_impr = (prev_gain - gain) / prev_gain;
315 prev_gain = gain;
316 improv_window.pop_front();
317 improv_window.push_back(gain_impr);
318 let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
319
320 info!("Gain: {gain}");
321 info!("Gain improvement: {gain_impr}");
322 info!("Average gain improvement: {avg_gain_impr}");
323 info!("Modified: {}", modified.load(Ordering::Relaxed),);
324
325 if predicate.eval(&PredParams {
326 num_nodes: sym_graph.num_nodes(),
327 num_arcs: sym_graph.num_arcs(),
328 gain,
329 avg_gain_impr,
330 modified: modified.load(Ordering::Relaxed),
331 update,
332 }) || modified.load(Ordering::Relaxed) == 0
333 {
334 break;
335 }
336 }
337
338 iter_pl.done();
339
340 let perm = &mut update_perm;
343 thread_pool.install(|| {
344 perm.par_iter_mut()
345 .with_min_len(RAYON_MIN_LEN)
346 .enumerate()
347 .for_each(|(i, x)| *x = i);
348 perm.par_sort_unstable_by(|&a, &b| {
350 label_store
351 .label(a as _)
352 .cmp(&label_store.label(b as _))
353 .then_with(|| a.cmp(&b))
354 });
355 });
356
357 let (labels, volumes) = label_store.labels_and_volumes();
359
360 thread_pool.install(|| {
364 invert_permutation(perm, volumes);
365 });
366
367 update_pl.expected_updates(Some(num_nodes));
368 update_pl.start("Computing log-gap cost...");
369
370 let gap_cost = gap_cost::compute_log_gap_cost(
371 &PermutedGraph {
372 graph: &sym_graph,
373 perm: &volumes,
374 },
375 arc_granularity,
376 deg_cumul,
377 &thread_pool,
378 &mut update_pl,
379 );
380
381 update_pl.done();
382
383 info!("Log-gap cost: {}", gap_cost);
384 costs.push(gap_cost);
385
386 let labels_store = LabelsStore {
388 labels: &*labels,
389 gap_cost,
390 gamma: *gamma,
391 };
392 unsafe {
393 labels_store
394 .store(labels_path(gamma_index))
395 .context("Could not serialize labels")
396 }?;
397
398 gamma_pl.update_and_display();
399 }
400
401 gamma_pl.done();
402
403 Ok(())
404}
405
406pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
410 let mut gammas = vec![];
411 let iter = std::fs::read_dir(work_dir.as_ref())?
412 .filter_map(Result::ok)
413 .filter(|path| {
414 let path_name = path.file_name();
415 let path_str = path_name.to_string_lossy();
416 path_str.starts_with("labels_")
417 && path_str.ends_with(".bin")
418 && path.file_type().unwrap().is_file()
419 });
420
421 let mut num_nodes = None;
422 for path in iter {
423 let path = work_dir.as_ref().join(path.file_name());
424 let res = unsafe {
427 <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
428 .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))
429 }?;
430
431 let res = res.uncase();
432
433 info!(
434 "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
435 path.to_string_lossy(),
436 res.gamma,
437 res.gap_cost,
438 res.labels.len()
439 );
440
441 match &mut num_nodes {
442 num_nodes @ None => {
443 *num_nodes = Some(res.labels.len());
444 }
445 Some(num_nodes) => {
446 if res.labels.len() != *num_nodes {
447 anyhow::bail!(
448 "Labels '{}' have length {} while we expected {}.",
449 path.to_string_lossy(),
450 res.labels.len(),
451 num_nodes
452 );
453 }
454 }
455 }
456 gammas.push((res.gap_cost, res.gamma, path));
457 }
458
459 if gammas.is_empty() {
460 anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
461 }
462 let mut temp_perm = vec![0; num_nodes.unwrap()];
463
464 gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
467
468 let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
470 let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
471 info!(
472 "Best gamma: {}\twith log-gap cost {}",
473 best_gamma, best_gamma_cost
474 );
475 info!(
476 "Worst gamma: {}\twith log-gap cost {}",
477 worst_gamma, worst_gamma_cost
478 );
479
480 let mut result_labels = unsafe {
481 <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
482 .context("Could not load labels from best gamma")
483 }?
484 .uncase()
485 .labels
486 .to_vec();
487
488 let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
489 for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
490 info!(
491 "Starting step {} with gamma {} cost {} and labels {:?}...",
492 i, gamma, cost, gamma_path
493 );
494 let labels = unsafe {
495 <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
496 .context("Could not load labels")
497 }?;
498
499 combine(&mut result_labels, labels.uncase().labels, &mut temp_perm)
500 .context("Could not combine labels")?;
501 drop(labels); info!(
508 "Recombining with gamma {} cost {} and labels {:?}...",
509 best_gamma, best_gamma_cost, best_gamma_path
510 );
511 let best_labels = unsafe {
512 <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
513 .context("Could not load labels from best gamma")
514 }?;
515 let number_of_labels = combine(
516 &mut result_labels,
517 best_labels.uncase().labels,
518 &mut temp_perm,
519 )?;
520 info!("Number of labels: {}", number_of_labels);
521 }
522
523 Ok(result_labels.into_boxed_slice())
524}
525
526fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
528 temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
530 temp_perm.par_sort_unstable_by(|&a, &b| {
532 (result[labels[a]].cmp(&result[labels[b]]))
533 .then_with(|| labels[a].cmp(&labels[b]))
534 .then_with(|| result[a].cmp(&result[b]))
535 .then_with(|| a.cmp(&b)) });
537 let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
538 let mut curr_label = 0;
539 result[temp_perm[0]] = curr_label;
540
541 for i in 1..temp_perm.len() {
542 let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
543 if prev_labels != curr_labels {
544 curr_label += 1;
545 prev_labels = curr_labels
546 }
547 result[temp_perm[i]] = curr_label;
548 }
549
550 Ok(curr_label + 1)
551}
552
553pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
555 let sync_slice = inv_perm.as_sync_slice();
556 perm.par_iter()
557 .with_min_len(RAYON_MIN_LEN)
558 .enumerate()
559 .for_each(|(i, &x)| {
560 unsafe { sync_slice[x].set(i) };
561 });
562}
563
564pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
566 let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
567 llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
568 let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
569 invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
570 llp_inv_perm
571}