1use anyhow::{Context, Result};
31use dsi_progress_logger::prelude::*;
32use epserde::prelude::*;
33use predicates::Predicate;
34use preds::PredParams;
35
36use log::info;
37use rand::rngs::SmallRng;
38use rand::seq::IndexedRandom;
39use rand::seq::SliceRandom;
40use rand::SeedableRng;
41use rayon::prelude::*;
42use std::collections::HashMap;
43use std::collections::VecDeque;
44use std::path::Path;
45use std::sync::atomic::Ordering;
46use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
47use sux::traits::Succ;
48use sync_cell_slice::SyncSlice;
49use webgraph::prelude::PermutedGraph;
50use webgraph::traits::RandomAccessGraph;
51use webgraph::utils::Granularity;
52
53pub(crate) mod gap_cost;
54pub(crate) mod label_store;
55mod mix64;
56pub mod preds;
57
58const RAYON_MIN_LEN: usize = 100000;
59
60#[derive(Epserde, Debug, Clone)]
61pub struct LabelsStore<A> {
63 pub gap_cost: f64,
64 pub gamma: f64,
65 pub labels: A,
66}
67
68#[allow(clippy::type_complexity)]
93#[allow(clippy::too_many_arguments)]
94pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
95 sym_graph: R,
96 deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
97 gammas: Vec<f64>,
98 num_threads: Option<usize>,
99 chunk_size: Option<usize>,
100 arc_granularity: Granularity,
101 seed: u64,
102 predicate: impl Predicate<preds::PredParams>,
103 work_dir: impl AsRef<Path>,
104) -> Result<Box<[usize]>> {
105 layered_label_propagation_labels_only(
107 sym_graph,
108 deg_cumul,
109 gammas,
110 num_threads,
111 chunk_size,
112 arc_granularity,
113 seed,
114 predicate,
115 &work_dir,
116 )?;
117 combine_labels(work_dir)
119}
120
121#[allow(clippy::type_complexity)]
122#[allow(clippy::too_many_arguments)]
123pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
126 sym_graph: R,
127 deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
128 gammas: Vec<f64>,
129 num_threads: Option<usize>,
130 chunk_size: Option<usize>,
131 arc_granularity: Granularity,
132 seed: u64,
133 predicate: impl Predicate<preds::PredParams>,
134 work_dir: impl AsRef<Path>,
135) -> Result<()> {
136 let work_path = work_dir.as_ref();
139 let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
140 const IMPROV_WINDOW: usize = 10;
141 let num_nodes = sym_graph.num_nodes();
142 let chunk_size = chunk_size.unwrap_or(1_000_000);
143 let num_threads = num_threads.unwrap_or_else(num_cpus::get);
144
145 let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
147
148 let mut can_change = Vec::with_capacity(num_nodes as _);
149 can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
150 let mut label_store = label_store::LabelStore::new(num_nodes as _);
151 let thread_pool = rayon::ThreadPoolBuilder::new()
153 .num_threads(num_threads)
154 .build()
155 .context("Could not create thread pool")?;
156
157 let mut gamma_pl = progress_logger!(
159 display_memory = true,
160 item_name = "gamma",
161 expected_updates = Some(gammas.len()),
162 );
163
164 let mut iter_pl = progress_logger!(item_name = "update");
166
167 let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
168
169 let mut update_pl = concurrent_progress_logger!(item_name = "node", local_speed = true);
171
172 let seed = AtomicU64::new(seed);
173 let mut costs = Vec::with_capacity(gammas.len());
174
175 gamma_pl.start(format!("Running {} threads", num_threads));
176 info!("Stopping criterion: {predicate}");
177
178 for (gamma_index, gamma) in gammas.iter().enumerate() {
179 iter_pl.start(format!(
181 "Starting iterations with gamma={} ({}/{})...",
182 gamma,
183 gamma_index + 1,
184 gammas.len(),
185 ));
186 label_store.init();
187 thread_pool.install(|| {
188 can_change
189 .par_iter()
190 .with_min_len(RAYON_MIN_LEN)
191 .for_each(|c| c.store(true, Ordering::Relaxed));
192 });
193
194 let mut obj_func = 0.0;
195 let mut prev_gain = f64::MAX;
196 let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
197
198 for update in 0.. {
199 update_pl.expected_updates(Some(num_nodes));
200 update_pl.start(format!(
201 "Starting update {} (for gamma={}, {}/{})...",
202 update,
203 gamma,
204 gamma_index + 1,
205 gammas.len()
206 ));
207
208 update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
209 thread_pool.install(|| {
210 update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
212 let seed = seed.fetch_add(1, Ordering::Relaxed);
213 let mut rand = SmallRng::seed_from_u64(seed);
214 chunk.shuffle(&mut rand);
215 });
216 });
217
218 let modified = AtomicUsize::new(0);
220
221 let delta_obj_func = sym_graph.par_apply(
222 |range| {
223 let mut rand = SmallRng::seed_from_u64(range.start as u64);
224 let mut local_obj_func = 0.0;
225 for &node in &update_perm[range] {
226 if !can_change[node].load(Ordering::Relaxed) {
232 continue;
233 }
234 can_change[node].store(false, Ordering::Relaxed);
236
237 let successors = sym_graph.successors(node);
238 if sym_graph.outdegree(node) == 0 {
239 continue;
240 }
241
242 let curr_label = label_store.label(node);
244
245 let mut map =
247 HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
248 for succ in successors {
249 map.entry(label_store.label(succ))
250 .and_modify(|counter| *counter += 1)
251 .or_insert(1_usize);
252 }
253 map.entry(curr_label).or_insert(0_usize);
255
256 let mut max = f64::NEG_INFINITY;
257 let mut old = 0.0;
258 let mut majorities = vec![];
259 for (&label, &count) in map.iter() {
261 let volume = label_store.volume(label); let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
273
274 if max == val {
275 majorities.push(label);
276 }
277
278 if val > max {
279 majorities.clear();
280 max = val;
281 majorities.push(label);
282 }
283
284 if label == curr_label {
285 old = val;
286 }
287 }
288 let next_label = *majorities.choose(&mut rand).unwrap();
290 if next_label != curr_label {
293 modified.fetch_add(1, Ordering::Relaxed);
294 for succ in sym_graph.successors(node) {
295 can_change[succ].store(true, Ordering::Relaxed);
296 }
297 label_store.update(node, next_label);
298 }
299 local_obj_func += max - old;
300 }
301 local_obj_func
302 },
303 |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
304 arc_granularity,
305 deg_cumul,
306 &thread_pool,
307 &mut update_pl,
308 );
309
310 update_pl.done_with_count(num_nodes);
311 iter_pl.update_and_display();
312
313 obj_func += delta_obj_func;
314 let gain = delta_obj_func / obj_func;
315 let gain_impr = (prev_gain - gain) / prev_gain;
316 prev_gain = gain;
317 improv_window.pop_front();
318 improv_window.push_back(gain_impr);
319 let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
320
321 info!("Gain: {gain}");
322 info!("Gain improvement: {gain_impr}");
323 info!("Average gain improvement: {avg_gain_impr}");
324 info!("Modified: {}", modified.load(Ordering::Relaxed),);
325
326 if predicate.eval(&PredParams {
327 num_nodes: sym_graph.num_nodes(),
328 num_arcs: sym_graph.num_arcs(),
329 gain,
330 avg_gain_impr,
331 modified: modified.load(Ordering::Relaxed),
332 update,
333 }) || modified.load(Ordering::Relaxed) == 0
334 {
335 break;
336 }
337 }
338
339 iter_pl.done();
340
341 let perm = &mut update_perm;
344 thread_pool.install(|| {
345 perm.par_iter_mut()
346 .with_min_len(RAYON_MIN_LEN)
347 .enumerate()
348 .for_each(|(i, x)| *x = i);
349 perm.par_sort_unstable_by(|&a, &b| {
351 label_store
352 .label(a as _)
353 .cmp(&label_store.label(b as _))
354 .then_with(|| a.cmp(&b))
355 });
356 });
357
358 let (labels, volumes) = label_store.labels_and_volumes();
360
361 thread_pool.install(|| {
365 invert_permutation(perm, volumes);
366 });
367
368 update_pl.expected_updates(Some(num_nodes));
369 update_pl.start("Computing log-gap cost...");
370
371 let gap_cost = gap_cost::compute_log_gap_cost(
372 &PermutedGraph {
373 graph: &sym_graph,
374 perm: &volumes,
375 },
376 arc_granularity,
377 deg_cumul,
378 &thread_pool,
379 &mut update_pl,
380 );
381
382 update_pl.done();
383
384 info!("Log-gap cost: {}", gap_cost);
385 costs.push(gap_cost);
386
387 let labels_store = LabelsStore {
389 labels: &*labels,
390 gap_cost,
391 gamma: *gamma,
392 };
393 labels_store
394 .store(labels_path(gamma_index))
395 .context("Could not serialize labels")?;
396
397 gamma_pl.update_and_display();
398 }
399
400 gamma_pl.done();
401
402 Ok(())
403}
404
405pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
409 let mut gammas = vec![];
410 let iter = std::fs::read_dir(work_dir.as_ref())?
411 .filter_map(Result::ok)
412 .filter(|path| {
413 let path_name = path.file_name();
414 let path_str = path_name.to_string_lossy();
415 path_str.starts_with("labels_")
416 && path_str.ends_with(".bin")
417 && path.file_type().unwrap().is_file()
418 });
419
420 let mut num_nodes = None;
421 for path in iter {
422 let path = work_dir.as_ref().join(path.file_name());
423 let res = <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
426 .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))?;
427 info!(
428 "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
429 path.to_string_lossy(),
430 res.gamma,
431 res.gap_cost,
432 res.labels.len()
433 );
434
435 match &mut num_nodes {
436 num_nodes @ None => {
437 *num_nodes = Some(res.labels.len());
438 }
439 Some(num_nodes) => {
440 if res.labels.len() != *num_nodes {
441 anyhow::bail!(
442 "Labels '{}' have length {} while we expected {}.",
443 path.to_string_lossy(),
444 res.labels.len(),
445 num_nodes
446 );
447 }
448 }
449 }
450 gammas.push((res.gap_cost, res.gamma, path));
451 }
452
453 if gammas.is_empty() {
454 anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
455 }
456 let mut temp_perm = vec![0; num_nodes.unwrap()];
457
458 gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
461
462 let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
464 let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
465 info!(
466 "Best gamma: {}\twith log-gap cost {}",
467 best_gamma, best_gamma_cost
468 );
469 info!(
470 "Worst gamma: {}\twith log-gap cost {}",
471 worst_gamma, worst_gamma_cost
472 );
473
474 let mut result_labels = <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
475 .context("Could not load labels from best gamma")?
476 .labels
477 .to_vec();
478
479 let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
480 for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
481 info!(
482 "Starting step {} with gamma {} cost {} and labels {:?}...",
483 i, gamma, cost, gamma_path
484 );
485 let labels = <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
486 .context("Could not load labels")?;
487 combine(&mut result_labels, labels.labels, &mut temp_perm)
488 .context("Could not combine labels")?;
489 drop(labels); info!(
496 "Recombining with gamma {} cost {} and labels {:?}...",
497 best_gamma, best_gamma_cost, best_gamma_path
498 );
499 let best_labels = <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
500 .context("Could not load labels from best gamma")?;
501 let number_of_labels = combine(&mut result_labels, best_labels.labels, &mut temp_perm)?;
502 info!("Number of labels: {}", number_of_labels);
503 }
504
505 Ok(result_labels.into_boxed_slice())
506}
507
508fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
510 temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
512 temp_perm.par_sort_unstable_by(|&a, &b| {
514 (result[labels[a]].cmp(&result[labels[b]]))
515 .then_with(|| labels[a].cmp(&labels[b]))
516 .then_with(|| result[a].cmp(&result[b]))
517 .then_with(|| a.cmp(&b)) });
519 let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
520 let mut curr_label = 0;
521 result[temp_perm[0]] = curr_label;
522
523 for i in 1..temp_perm.len() {
524 let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
525 if prev_labels != curr_labels {
526 curr_label += 1;
527 prev_labels = curr_labels
528 }
529 result[temp_perm[i]] = curr_label;
530 }
531
532 Ok(curr_label + 1)
533}
534
535pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
537 let sync_slice = inv_perm.as_sync_slice();
538 perm.par_iter()
539 .with_min_len(RAYON_MIN_LEN)
540 .enumerate()
541 .for_each(|(i, &x)| {
542 unsafe { sync_slice[x].set(i) };
543 });
544}
545
546pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
548 let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
549 llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
550 let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
551 invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
552 llp_inv_perm
553}