1use anyhow::{Context, Result};
65use crossbeam_utils::CachePadded;
66use dsi_progress_logger::prelude::*;
67use epserde::prelude::*;
68use predicates::Predicate;
69use preds::PredParams;
70
71use log::info;
72use rand::SeedableRng;
73use rand::rngs::SmallRng;
74use rand::seq::IndexedRandom;
75use rand::seq::SliceRandom;
76use rayon::prelude::*;
77use std::collections::HashMap;
78use std::collections::VecDeque;
79use std::path::Path;
80use std::sync::atomic::Ordering;
81use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
82use sux::traits::Succ;
83use sync_cell_slice::SyncSlice;
84use webgraph::prelude::PermutedGraph;
85use webgraph::traits::RandomAccessGraph;
86use webgraph::utils::Granularity;
87
88pub(crate) mod gap_cost;
89pub(crate) mod label_store;
90mod mix64;
91pub mod preds;
92
93const RAYON_MIN_LEN: usize = 100000;
94
95#[derive(Epserde, Debug, Clone)]
96pub struct LabelsStore<A> {
98 pub gap_cost: f64,
99 pub gamma: f64,
100 pub labels: A,
101}
102
103#[allow(clippy::too_many_arguments)]
132pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
133 sym_graph: R,
134 deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
135 gammas: Vec<f64>,
136 chunk_size: Option<usize>,
137 granularity: Granularity,
138 seed: u64,
139 predicate: impl Predicate<preds::PredParams>,
140 work_dir: impl AsRef<Path>,
141) -> Result<Box<[usize]>> {
142 layered_label_propagation_labels_only(
144 sym_graph,
145 deg_cumul,
146 gammas,
147 chunk_size,
148 granularity,
149 seed,
150 predicate,
151 &work_dir,
152 )?;
153 combine_labels(work_dir)
155}
156
157#[allow(clippy::too_many_arguments)]
161pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
162 sym_graph: R,
163 deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
164 gammas: Vec<f64>,
165 chunk_size: Option<usize>,
166 granularity: Granularity,
167 seed: u64,
168 predicate: impl Predicate<preds::PredParams>,
169 work_dir: impl AsRef<Path>,
170) -> Result<()> {
171 let work_path = work_dir.as_ref();
174 let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
175 const IMPROV_WINDOW: usize = 10;
176 let num_nodes = sym_graph.num_nodes();
177 let chunk_size = chunk_size.unwrap_or(1_000_000);
178 let num_threads = rayon::current_num_threads();
179
180 let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
182
183 let mut can_change = Vec::with_capacity(num_nodes as _);
184 can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
185 let mut label_store = label_store::LabelStore::new(num_nodes as _);
186 let thread_pool = rayon::ThreadPoolBuilder::new()
188 .num_threads(num_threads)
189 .build()
190 .context("Could not create thread pool")?;
191
192 let mut gamma_pl = progress_logger![
194 display_memory = true,
195 item_name = "gamma",
196 expected_updates = Some(gammas.len()),
197 ];
198
199 let mut iter_pl = progress_logger![item_name = "update"];
201
202 let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
203
204 let mut update_pl = concurrent_progress_logger![item_name = "node", local_speed = true];
206
207 let seed = CachePadded::new(AtomicU64::new(seed));
208 let mut costs = Vec::with_capacity(gammas.len());
209
210 gamma_pl.start(format!("Running {} threads", num_threads));
211 info!("Stopping criterion: {predicate}");
212
213 for (gamma_index, gamma) in gammas.iter().enumerate() {
214 iter_pl.start(format!(
216 "Starting iterations with gamma={} ({}/{})...",
217 gamma,
218 gamma_index + 1,
219 gammas.len(),
220 ));
221 label_store.init();
222 thread_pool.install(|| {
223 can_change
224 .par_iter()
225 .with_min_len(RAYON_MIN_LEN)
226 .for_each(|c| c.store(true, Ordering::Relaxed));
227 });
228
229 let mut obj_func = 0.0;
230 let mut prev_gain = f64::MAX;
231 let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
232
233 for update in 0.. {
234 update_pl.expected_updates(Some(num_nodes));
235 update_pl.start(format!(
236 "Starting update {} (for gamma={}, {}/{})...",
237 update,
238 gamma,
239 gamma_index + 1,
240 gammas.len()
241 ));
242
243 update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
244 thread_pool.install(|| {
245 update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
247 let seed = seed.fetch_add(1, Ordering::Relaxed);
248 let mut rand = SmallRng::seed_from_u64(seed);
249 chunk.shuffle(&mut rand);
250 });
251 });
252
253 let modified = CachePadded::new(AtomicUsize::new(0));
255
256 let delta_obj_func = sym_graph.par_apply(
257 |range| {
258 let mut rand = SmallRng::seed_from_u64(range.start as u64);
259 let mut local_obj_func = 0.0;
260 for &node in &update_perm[range] {
261 if !can_change[node].load(Ordering::Relaxed) {
267 continue;
268 }
269 can_change[node].store(false, Ordering::Relaxed);
271
272 let successors = sym_graph.successors(node);
273 if sym_graph.outdegree(node) == 0 {
274 continue;
275 }
276
277 let curr_label = label_store.label(node);
279
280 let mut map =
282 HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
283 for succ in successors {
284 map.entry(label_store.label(succ))
285 .and_modify(|counter| *counter += 1)
286 .or_insert(1_usize);
287 }
288 map.entry(curr_label).or_insert(0_usize);
290
291 let mut max = f64::NEG_INFINITY;
292 let mut old = 0.0;
293 let mut majorities = vec![];
294 for (&label, &count) in map.iter() {
296 let volume = label_store.volume(label); let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
308
309 if max == val {
310 majorities.push(label);
311 }
312
313 if val > max {
314 majorities.clear();
315 max = val;
316 majorities.push(label);
317 }
318
319 if label == curr_label {
320 old = val;
321 }
322 }
323 let next_label = *majorities.choose(&mut rand).unwrap();
325 if next_label != curr_label {
328 modified.fetch_add(1, Ordering::Relaxed);
329 for succ in sym_graph.successors(node) {
330 can_change[succ].store(true, Ordering::Relaxed);
331 }
332 label_store.update(node, next_label);
333 }
334 local_obj_func += max - old;
335 }
336 local_obj_func
337 },
338 |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
339 granularity,
340 deg_cumul,
341 &mut update_pl,
342 );
343
344 update_pl.done_with_count(num_nodes);
345 iter_pl.update_and_display();
346
347 obj_func += delta_obj_func;
348 let gain = delta_obj_func / obj_func;
349 let gain_impr = (prev_gain - gain) / prev_gain;
350 prev_gain = gain;
351 improv_window.pop_front();
352 improv_window.push_back(gain_impr);
353 let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
354
355 info!("Gain: {gain}");
356 info!("Gain improvement: {gain_impr}");
357 info!("Average gain improvement: {avg_gain_impr}");
358 info!("Modified: {}", modified.load(Ordering::Relaxed),);
359
360 if predicate.eval(&PredParams {
361 num_nodes: sym_graph.num_nodes(),
362 num_arcs: sym_graph.num_arcs(),
363 gain,
364 avg_gain_impr,
365 modified: modified.load(Ordering::Relaxed),
366 update,
367 }) || modified.load(Ordering::Relaxed) == 0
368 {
369 break;
370 }
371 }
372
373 iter_pl.done();
374
375 let perm = &mut update_perm;
378 thread_pool.install(|| {
379 perm.par_iter_mut()
380 .with_min_len(RAYON_MIN_LEN)
381 .enumerate()
382 .for_each(|(i, x)| *x = i);
383 perm.par_sort_unstable_by(|&a, &b| {
385 label_store
386 .label(a as _)
387 .cmp(&label_store.label(b as _))
388 .then_with(|| a.cmp(&b))
389 });
390 });
391
392 let (labels, volumes) = label_store.labels_and_volumes();
394
395 thread_pool.install(|| {
399 invert_permutation(perm, volumes);
400 });
401
402 update_pl.expected_updates(Some(num_nodes));
403 update_pl.start("Computing log-gap cost...");
404
405 let gap_cost = gap_cost::compute_log_gap_cost(
406 &PermutedGraph {
407 graph: &sym_graph,
408 perm: &volumes,
409 },
410 granularity,
411 deg_cumul,
412 &mut update_pl,
413 );
414
415 update_pl.done();
416
417 info!("Log-gap cost: {}", gap_cost);
418 costs.push(gap_cost);
419
420 let labels_store = LabelsStore {
422 labels: &*labels,
423 gap_cost,
424 gamma: *gamma,
425 };
426 unsafe {
427 labels_store
428 .store(labels_path(gamma_index))
429 .context("Could not serialize labels")
430 }?;
431
432 gamma_pl.update_and_display();
433 }
434
435 gamma_pl.done();
436
437 Ok(())
438}
439
440pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
444 let mut gammas = vec![];
445 let iter = std::fs::read_dir(work_dir.as_ref())?
446 .filter_map(Result::ok)
447 .filter(|path| {
448 let path_name = path.file_name();
449 let path_str = path_name.to_string_lossy();
450 path_str.starts_with("labels_")
451 && path_str.ends_with(".bin")
452 && path.file_type().is_ok_and(|ft| ft.is_file())
453 });
454
455 let mut num_nodes = None;
456 for path in iter {
457 let path = work_dir.as_ref().join(path.file_name());
458 let res = unsafe {
461 <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
462 .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))
463 }?;
464
465 let res = res.uncase();
466
467 info!(
468 "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
469 path.to_string_lossy(),
470 res.gamma,
471 res.gap_cost,
472 res.labels.len()
473 );
474
475 match &mut num_nodes {
476 num_nodes @ None => {
477 *num_nodes = Some(res.labels.len());
478 }
479 Some(num_nodes) => {
480 if res.labels.len() != *num_nodes {
481 anyhow::bail!(
482 "Labels '{}' have length {} while we expected {}.",
483 path.to_string_lossy(),
484 res.labels.len(),
485 num_nodes
486 );
487 }
488 }
489 }
490 gammas.push((res.gap_cost, res.gamma, path));
491 }
492
493 if gammas.is_empty() {
494 anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
495 }
496 let mut temp_perm = vec![0; num_nodes.unwrap()];
497
498 gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
501
502 let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
504 let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
505 info!(
506 "Best gamma: {}\twith log-gap cost {}",
507 best_gamma, best_gamma_cost
508 );
509 info!(
510 "Worst gamma: {}\twith log-gap cost {}",
511 worst_gamma, worst_gamma_cost
512 );
513
514 let mut result_labels = unsafe {
515 <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
516 .context("Could not load labels from best gamma")
517 }?
518 .uncase()
519 .labels
520 .to_vec();
521
522 let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
523 for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
524 info!(
525 "Starting step {} with gamma {} cost {} and labels {:?}...",
526 i, gamma, cost, gamma_path
527 );
528 let labels = unsafe {
529 <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
530 .context("Could not load labels")
531 }?;
532
533 combine(&mut result_labels, labels.uncase().labels, &mut temp_perm)
534 .context("Could not combine labels")?;
535 drop(labels); info!(
542 "Recombining with gamma {} cost {} and labels {:?}...",
543 best_gamma, best_gamma_cost, best_gamma_path
544 );
545 let best_labels = unsafe {
546 <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
547 .context("Could not load labels from best gamma")
548 }?;
549 let number_of_labels = combine(
550 &mut result_labels,
551 best_labels.uncase().labels,
552 &mut temp_perm,
553 )?;
554 info!("Number of labels: {}", number_of_labels);
555 }
556
557 Ok(result_labels.into_boxed_slice())
558}
559
560fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
562 temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
564 temp_perm.par_sort_unstable_by(|&a, &b| {
566 (result[labels[a]].cmp(&result[labels[b]]))
567 .then_with(|| labels[a].cmp(&labels[b]))
568 .then_with(|| result[a].cmp(&result[b]))
569 .then_with(|| a.cmp(&b)) });
571 let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
572 let mut curr_label = 0;
573 result[temp_perm[0]] = curr_label;
574
575 for i in 1..temp_perm.len() {
576 let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
577 if prev_labels != curr_labels {
578 curr_label += 1;
579 prev_labels = curr_labels
580 }
581 result[temp_perm[i]] = curr_label;
582 }
583
584 Ok(curr_label + 1)
585}
586
587pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
589 let sync_slice = inv_perm.as_sync_slice();
590 perm.par_iter()
591 .with_min_len(RAYON_MIN_LEN)
592 .enumerate()
593 .for_each(|(i, &x)| {
594 unsafe { sync_slice[x].set(i) };
595 });
596}
597
598pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
600 let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
601 llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
602 let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
603 invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
604 llp_inv_perm
605}