Skip to main content

webgraph_algo/llp/
mod.rs

1/*
2 * SPDX-FileCopyrightText: 2024 Tommaso Fontana
3 * SPDX-FileCopyrightText: 2024 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8//! Layered Label Propagation.
9//!
10//! An implementation of the _layered label propagation_ algorithm described by
11//! Paolo Boldi, Sebastiano Vigna, Marco Rosa, Massimo Santini, and Sebastiano
12//! Vigna in "[Layered Label Propagation: A MultiResolution Coordinate-Free
13//! Ordering for Compressing Social Networks][LLP paper]", _Proceedings of the
14//! 20th international conference on World Wide Web_, pages 587–596, ACM, 2011.
15//!
16//! # Requirements
17//!
18//! The graph provided should be _symmetric_ and _loopless_. If this is not the
19//! case, please use [`simplify`](webgraph::transform::simplify) to generate a
20//! suitable graph.
21//!
22//! # Memory Requirements
23//!
24//! LLP requires three `usize` and a boolean per node, plus the memory that is
25//! necessary to load the graph.
26//!
27//! # Algorithm
28//!
29//! Label propagation assigns a _label_ to each node and then iteratively
30//! updates every label to the one that maximizes an objective function based on
31//! the frequency of labels among the node's neighbors and on a resolution
32//! parameter ɣ. Low ɣ values produce many small communities, while high ɣ
33//! values produce few large ones. _Layered_ label propagation runs label
34//! propagation for several values of ɣ and combines the resulting labelings
35//! into a single one that captures community structure at multiple resolutions.
36//!
37//! Nodes of the resulting labeling that share the same label are likely
38//! co-located in the graph, so [permuting the
39//! graph](webgraph::transform::permute) in label order will increase locality,
40//! yielding better compression.
41//!
42//! # Functions
43//!
44//! - [`layered_label_propagation`]: runs LLP and returns the final combined
45//!   labels;
46//! - [`layered_label_propagation_labels_only`]: runs LLP and stores
47//!   per-ɣ labels to disk, but does not combine them; this is useful when
48//!   you want to combine labels in a separate step;
49//! - [`combine_labels`]: combines the per-ɣ labels stored on disk by a
50//!   previous call to [`layered_label_propagation_labels_only`];
51//! - [`labels_to_ranks`]: converts labels to ranks by their natural order,
52//!   yielding a permutation that can be passed to
53//!   [`permute`](webgraph::transform::permute).
54//!
55//! # Choosing ɣ Values
56//!
57//! More values improve the resulting combined labeling, but each value needs a
58//! full run of the label propagation algorithm, so there is a trade-off between
59//! quality and running time. A common choice is a set exponentially-spaced
60//! values, for example ɣ ∈ {1, 1/2, 1/4, …} or ɣ ∈ {1, 1/4, 1/16, …}.
61//!
62//! [LLP paper]: <https://vigna.di.unimi.it/papers.php#BRSLLP>
63//!
64use anyhow::{Context, Result};
65use crossbeam_utils::CachePadded;
66use dsi_progress_logger::prelude::*;
67use epserde::prelude::*;
68use predicates::Predicate;
69use preds::PredParams;
70
71use log::info;
72use rand::SeedableRng;
73use rand::rngs::SmallRng;
74use rand::seq::IndexedRandom;
75use rand::seq::SliceRandom;
76use rayon::prelude::*;
77use std::collections::HashMap;
78use std::collections::VecDeque;
79use std::path::Path;
80use std::sync::atomic::Ordering;
81use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
82use sux::traits::Succ;
83use sync_cell_slice::SyncSlice;
84use webgraph::prelude::PermutedGraph;
85use webgraph::traits::RandomAccessGraph;
86use webgraph::utils::Granularity;
87
88pub(crate) mod gap_cost;
89pub(crate) mod label_store;
90mod mix64;
91pub mod preds;
92
93const RAYON_MIN_LEN: usize = 100000;
94
95#[derive(Epserde, Debug, Clone)]
96/// This struct is how the labels and their metadata are stored on disk.
97pub struct LabelsStore<A> {
98    pub gap_cost: f64,
99    pub gamma: f64,
100    pub labels: A,
101}
102
103/// Runs layered label propagation on the provided symmetric graph and returns
104/// the resulting labels.
105///
106/// Note that no symmetry check is performed, but in that case the algorithm
107/// usually will not give satisfactory results.
108///
109/// # Arguments
110///
111/// * `sym_graph` - The symmetric graph to run LLP on.
112///
113/// * `deg_cumul` - The degree cumulative distribution of the graph, as in
114///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
115///
116/// * `gammas` - The ɣ values to use in the LLP algorithm.
117///
118/// * `chunk_size` - The chunk size used to randomize the permutation. This is
119///   an advanced option: see
120///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
121///
122/// * `granularity` - The granularity of the parallel processing.
123///   This is an advanced option: see
124///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
125///
126/// * `seed` - The seed to use for pseudorandom number generation.
127///
128/// * `predicate` - The stopping criterion for the iterations of the algorithm.
129///
130/// * `work_dir` - The directory where the labels will be stored.
131#[allow(clippy::too_many_arguments)]
132pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
133    sym_graph: R,
134    deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
135    gammas: Vec<f64>,
136    chunk_size: Option<usize>,
137    granularity: Granularity,
138    seed: u64,
139    predicate: impl Predicate<preds::PredParams>,
140    work_dir: impl AsRef<Path>,
141) -> Result<Box<[usize]>> {
142    // compute the labels
143    layered_label_propagation_labels_only(
144        sym_graph,
145        deg_cumul,
146        gammas,
147        chunk_size,
148        granularity,
149        seed,
150        predicate,
151        &work_dir,
152    )?;
153    // merge them
154    combine_labels(work_dir)
155}
156
157/// Computes and stores on disk the labels for the given gammas, but
158/// does not combine them. For the arguments look at
159/// [`layered_label_propagation`].
160#[allow(clippy::too_many_arguments)]
161pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
162    sym_graph: R,
163    deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
164    gammas: Vec<f64>,
165    chunk_size: Option<usize>,
166    granularity: Granularity,
167    seed: u64,
168    predicate: impl Predicate<preds::PredParams>,
169    work_dir: impl AsRef<Path>,
170) -> Result<()> {
171    // work-around to make TempDir Live as much as needed but only create it if
172    // the user didn't provide a work_dir, which can also be a TempDir.
173    let work_path = work_dir.as_ref();
174    let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
175    const IMPROV_WINDOW: usize = 10;
176    let num_nodes = sym_graph.num_nodes();
177    let chunk_size = chunk_size.unwrap_or(1_000_000);
178    let num_threads = rayon::current_num_threads();
179
180    // init the permutation with the indices
181    let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
182
183    let mut can_change = Vec::with_capacity(num_nodes as _);
184    can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
185    let mut label_store = label_store::LabelStore::new(num_nodes as _);
186    // build a thread_pool so we avoid having to re-create the threads
187    let thread_pool = rayon::ThreadPoolBuilder::new()
188        .num_threads(num_threads)
189        .build()
190        .context("Could not create thread pool")?;
191
192    // init the gamma progress logger
193    let mut gamma_pl = progress_logger![
194        display_memory = true,
195        item_name = "gamma",
196        expected_updates = Some(gammas.len()),
197    ];
198
199    // init the iteration progress logger
200    let mut iter_pl = progress_logger![item_name = "update"];
201
202    let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
203
204    // init the update progress logger
205    let mut update_pl = concurrent_progress_logger![item_name = "node", local_speed = true];
206
207    let seed = CachePadded::new(AtomicU64::new(seed));
208    let mut costs = Vec::with_capacity(gammas.len());
209
210    gamma_pl.start(format!("Running {} threads", num_threads));
211    info!("Stopping criterion: {predicate}");
212
213    for (gamma_index, gamma) in gammas.iter().enumerate() {
214        // Reset mutable state for the next gamma
215        iter_pl.start(format!(
216            "Starting iterations with gamma={} ({}/{})...",
217            gamma,
218            gamma_index + 1,
219            gammas.len(),
220        ));
221        label_store.init();
222        thread_pool.install(|| {
223            can_change
224                .par_iter()
225                .with_min_len(RAYON_MIN_LEN)
226                .for_each(|c| c.store(true, Ordering::Relaxed));
227        });
228
229        let mut obj_func = 0.0;
230        let mut prev_gain = f64::MAX;
231        let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
232
233        for update in 0.. {
234            update_pl.expected_updates(Some(num_nodes));
235            update_pl.start(format!(
236                "Starting update {} (for gamma={}, {}/{})...",
237                update,
238                gamma,
239                gamma_index + 1,
240                gammas.len()
241            ));
242
243            update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
244            thread_pool.install(|| {
245                // parallel shuffle
246                update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
247                    let seed = seed.fetch_add(1, Ordering::Relaxed);
248                    let mut rand = SmallRng::seed_from_u64(seed);
249                    chunk.shuffle(&mut rand);
250                });
251            });
252
253            // If this iteration modified anything (early stop)
254            let modified = CachePadded::new(AtomicUsize::new(0));
255
256            let delta_obj_func = sym_graph.par_apply(
257                |range| {
258                    let mut rand = SmallRng::seed_from_u64(range.start as u64);
259                    let mut local_obj_func = 0.0;
260                    for &node in &update_perm[range] {
261                        // Note that here we are using a heuristic optimization:
262                        // if no neighbor has changed, the label of a node
263                        // cannot change. If gamma != 0, this is not necessarily
264                        // true, as a node might need to change its value just
265                        // because of a change of volume of the adjacent labels.
266                        if !can_change[node].load(Ordering::Relaxed) {
267                            continue;
268                        }
269                        // set that the node can't change by default and we'll unset later it if it can
270                        can_change[node].store(false, Ordering::Relaxed);
271
272                        let successors = sym_graph.successors(node);
273                        if sym_graph.outdegree(node) == 0 {
274                            continue;
275                        }
276
277                        // get the label of this node
278                        let curr_label = label_store.label(node);
279
280                        // compute the frequency of successor labels
281                        let mut map =
282                            HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
283                        for succ in successors {
284                            map.entry(label_store.label(succ))
285                                .and_modify(|counter| *counter += 1)
286                                .or_insert(1_usize);
287                        }
288                        // add the current label to the map
289                        map.entry(curr_label).or_insert(0_usize);
290
291                        let mut max = f64::NEG_INFINITY;
292                        let mut old = 0.0;
293                        let mut majorities = vec![];
294                        // compute the most entropic label
295                        for (&label, &count) in map.iter() {
296                            // For replication of the results of the Java
297                            // version, one needs to decrement the volume of
298                            // the current value the Java version does
299                            // (see the commented code below).
300                            //
301                            // Note that this is not exactly equivalent to the
302                            // behavior of the Java version, as during the
303                            // execution of this loop if another thread reads
304                            // the volume of the current label it will get a
305                            // value larger by one WRT the Java version.
306                            let volume = label_store.volume(label); // - (label == curr_label) as usize;
307                            let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
308
309                            if max == val {
310                                majorities.push(label);
311                            }
312
313                            if val > max {
314                                majorities.clear();
315                                max = val;
316                                majorities.push(label);
317                            }
318
319                            if label == curr_label {
320                                old = val;
321                            }
322                        }
323                        // randomly break ties
324                        let next_label = *majorities.choose(&mut rand).unwrap();
325                        // if the label changed we need to update the label store
326                        // and signal that this could change the neighbor nodes
327                        if next_label != curr_label {
328                            modified.fetch_add(1, Ordering::Relaxed);
329                            for succ in sym_graph.successors(node) {
330                                can_change[succ].store(true, Ordering::Relaxed);
331                            }
332                            label_store.update(node, next_label);
333                        }
334                        local_obj_func += max - old;
335                    }
336                    local_obj_func
337                },
338                |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
339                granularity,
340                deg_cumul,
341                &mut update_pl,
342            );
343
344            update_pl.done_with_count(num_nodes);
345            iter_pl.update_and_display();
346
347            obj_func += delta_obj_func;
348            let gain = delta_obj_func / obj_func;
349            let gain_impr = (prev_gain - gain) / prev_gain;
350            prev_gain = gain;
351            improv_window.pop_front();
352            improv_window.push_back(gain_impr);
353            let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
354
355            info!("Gain: {gain}");
356            info!("Gain improvement: {gain_impr}");
357            info!("Average gain improvement: {avg_gain_impr}");
358            info!("Modified: {}", modified.load(Ordering::Relaxed),);
359
360            if predicate.eval(&PredParams {
361                num_nodes: sym_graph.num_nodes(),
362                num_arcs: sym_graph.num_arcs(),
363                gain,
364                avg_gain_impr,
365                modified: modified.load(Ordering::Relaxed),
366                update,
367            }) || modified.load(Ordering::Relaxed) == 0
368            {
369                break;
370            }
371        }
372
373        iter_pl.done();
374
375        // We temporarily use the update permutation to compute the sorting
376        // permutation of the labels.
377        let perm = &mut update_perm;
378        thread_pool.install(|| {
379            perm.par_iter_mut()
380                .with_min_len(RAYON_MIN_LEN)
381                .enumerate()
382                .for_each(|(i, x)| *x = i);
383            // Sort by label
384            perm.par_sort_unstable_by(|&a, &b| {
385                label_store
386                    .label(a as _)
387                    .cmp(&label_store.label(b as _))
388                    .then_with(|| a.cmp(&b))
389            });
390        });
391
392        // Save labels
393        let (labels, volumes) = label_store.labels_and_volumes();
394
395        // We temporarily use the label array from the label store to compute
396        // the inverse permutation. It will be reinitialized at the next
397        // iteration anyway.
398        thread_pool.install(|| {
399            invert_permutation(perm, volumes);
400        });
401
402        update_pl.expected_updates(Some(num_nodes));
403        update_pl.start("Computing log-gap cost...");
404
405        let gap_cost = gap_cost::compute_log_gap_cost(
406            &PermutedGraph {
407                graph: &sym_graph,
408                perm: &volumes,
409            },
410            granularity,
411            deg_cumul,
412            &mut update_pl,
413        );
414
415        update_pl.done();
416
417        info!("Log-gap cost: {}", gap_cost);
418        costs.push(gap_cost);
419
420        // store the labels on disk with their cost and gamma
421        let labels_store = LabelsStore {
422            labels: &*labels,
423            gap_cost,
424            gamma: *gamma,
425        };
426        unsafe {
427            labels_store
428                .store(labels_path(gamma_index))
429                .context("Could not serialize labels")
430        }?;
431
432        gamma_pl.update_and_display();
433    }
434
435    gamma_pl.done();
436
437    Ok(())
438}
439
440/// Combines the labels computed by LLP into a final labels array.
441///
442/// * `work_dir`: The folder where the labels to combine are.
443pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
444    let mut gammas = vec![];
445    let iter = std::fs::read_dir(work_dir.as_ref())?
446        .filter_map(Result::ok)
447        .filter(|path| {
448            let path_name = path.file_name();
449            let path_str = path_name.to_string_lossy();
450            path_str.starts_with("labels_")
451                && path_str.ends_with(".bin")
452                && path.file_type().is_ok_and(|ft| ft.is_file())
453        });
454
455    let mut num_nodes = None;
456    for path in iter {
457        let path = work_dir.as_ref().join(path.file_name());
458        // we only need the cost and gamma here, so we mmap it to ignore the
459        // actual labels which will be needed only later
460        let res = unsafe {
461            <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
462                .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))
463        }?;
464
465        let res = res.uncase();
466
467        info!(
468            "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
469            path.to_string_lossy(),
470            res.gamma,
471            res.gap_cost,
472            res.labels.len()
473        );
474
475        match &mut num_nodes {
476            num_nodes @ None => {
477                *num_nodes = Some(res.labels.len());
478            }
479            Some(num_nodes) => {
480                if res.labels.len() != *num_nodes {
481                    anyhow::bail!(
482                        "Labels '{}' have length {} while we expected {}.",
483                        path.to_string_lossy(),
484                        res.labels.len(),
485                        num_nodes
486                    );
487                }
488            }
489        }
490        gammas.push((res.gap_cost, res.gamma, path));
491    }
492
493    if gammas.is_empty() {
494        anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
495    }
496    let mut temp_perm = vec![0; num_nodes.unwrap()];
497
498    // compute the indices that sorts the gammas by cost
499    // sort in descending order
500    gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
501
502    // the best gamma is the last because it has the min cost
503    let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
504    let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
505    info!(
506        "Best gamma: {}\twith log-gap cost {}",
507        best_gamma, best_gamma_cost
508    );
509    info!(
510        "Worst gamma: {}\twith log-gap cost {}",
511        worst_gamma, worst_gamma_cost
512    );
513
514    let mut result_labels = unsafe {
515        <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
516            .context("Could not load labels from best gamma")
517    }?
518    .uncase()
519    .labels
520    .to_vec();
521
522    let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
523    for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
524        info!(
525            "Starting step {} with gamma {} cost {} and labels {:?}...",
526            i, gamma, cost, gamma_path
527        );
528        let labels = unsafe {
529            <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
530                .context("Could not load labels")
531        }?;
532
533        combine(&mut result_labels, labels.uncase().labels, &mut temp_perm)
534            .context("Could not combine labels")?;
535        drop(labels); // explicit drop so we free labels before loading best_labels
536
537        // This recombination with the best labels does not appear in the paper, but
538        // it is not harmful and fixes a few corner cases in which experimentally
539        // LLP does not perform well. It was introduced by Marco Rosa in the Java
540        // LAW code.
541        info!(
542            "Recombining with gamma {} cost {} and labels {:?}...",
543            best_gamma, best_gamma_cost, best_gamma_path
544        );
545        let best_labels = unsafe {
546            <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
547                .context("Could not load labels from best gamma")
548        }?;
549        let number_of_labels = combine(
550            &mut result_labels,
551            best_labels.uncase().labels,
552            &mut temp_perm,
553        )?;
554        info!("Number of labels: {}", number_of_labels);
555    }
556
557    Ok(result_labels.into_boxed_slice())
558}
559
560/// combine the labels from two permutations into a single one
561fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
562    // re-init the permutation
563    temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
564    // permute by the devilish function
565    temp_perm.par_sort_unstable_by(|&a, &b| {
566        (result[labels[a]].cmp(&result[labels[b]]))
567            .then_with(|| labels[a].cmp(&labels[b]))
568            .then_with(|| result[a].cmp(&result[b]))
569            .then_with(|| a.cmp(&b)) // to make it stable
570    });
571    let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
572    let mut curr_label = 0;
573    result[temp_perm[0]] = curr_label;
574
575    for i in 1..temp_perm.len() {
576        let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
577        if prev_labels != curr_labels {
578            curr_label += 1;
579            prev_labels = curr_labels
580        }
581        result[temp_perm[i]] = curr_label;
582    }
583
584    Ok(curr_label + 1)
585}
586
587/// Inverts a permutation.
588pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
589    let sync_slice = inv_perm.as_sync_slice();
590    perm.par_iter()
591        .with_min_len(RAYON_MIN_LEN)
592        .enumerate()
593        .for_each(|(i, &x)| {
594            unsafe { sync_slice[x].set(i) };
595        });
596}
597
598/// Computes the ranks of a slice of labels by their natural order.
599pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
600    let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
601    llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
602    let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
603    invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
604    llp_inv_perm
605}