webgraph_algo/llp/
mod.rs

1/*
2 * SPDX-FileCopyrightText: 2024 Tommaso Fontana
3 * SPDX-FileCopyrightText: 2024 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8//! Layered label propagation.
9//!
10//! An implementation of the _layered label propagation_ algorithm described by
11//! Paolo Boldi, Sebastiano Vigna, Marco Rosa, Massimo Santini, and Sebastiano
12//! Vigna in “Layered label propagation: A multiresolution coordinate-free
13//! ordering for compressing social networks”, _Proceedings of the 20th
14//! international conference on World Wide Web_, pages 587–596, ACM, 2011.
15//!
16//! The function [`layered_label_propagation`] returns a permutation of the
17//! provided symmetric graph which will (hopefully) increase locality (see the
18//! paper). Usually, the permutation is fed to [`permute`](webgraph::transform::permute) to permute the
19//! original graph.
20//!
21//! Note that the graph provided should be _symmetric_ and _loopless_. If this
22//! is not the case, please use [`simplify`](webgraph::transform::simplify) to generate a
23//! suitable graph.
24//!
25//! # Memory requirements
26//!
27//! LLP requires three `usize` and a boolean per node, plus the memory that is
28//! necessary to load the graph.
29//!
30use anyhow::{Context, Result};
31use dsi_progress_logger::prelude::*;
32use epserde::prelude::*;
33use predicates::Predicate;
34use preds::PredParams;
35
36use log::info;
37use rand::rngs::SmallRng;
38use rand::seq::IndexedRandom;
39use rand::seq::SliceRandom;
40use rand::SeedableRng;
41use rayon::prelude::*;
42use std::collections::HashMap;
43use std::collections::VecDeque;
44use std::path::Path;
45use std::sync::atomic::Ordering;
46use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
47use sux::traits::Succ;
48use sync_cell_slice::SyncSlice;
49use webgraph::prelude::PermutedGraph;
50use webgraph::traits::RandomAccessGraph;
51use webgraph::utils::Granularity;
52
53pub(crate) mod gap_cost;
54pub(crate) mod label_store;
55mod mix64;
56pub mod preds;
57
58const RAYON_MIN_LEN: usize = 100000;
59
60#[derive(Epserde, Debug, Clone)]
61/// This struct is how the labels and their metadata are stored on disk.
62pub struct LabelsStore<A> {
63    pub gap_cost: f64,
64    pub gamma: f64,
65    pub labels: A,
66}
67
68/// Runs layered label propagation on the provided symmetric graph and returns
69/// the resulting labels.
70///
71/// Note that no symmetry check is performed, but in that case the algorithm
72/// usually will not give satisfactory results.
73///
74/// # Arguments
75///
76/// * `sym_graph` - The symmetric graph to run LLP on.
77/// * `deg_cumul` - The degree cumulative distribution of the graph, as in
78///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
79/// * `gammas` - The ɣ values to use in the LLP algorithm.
80/// * `num_threads` - The number of threads to use. If `None`, the number of
81///   threads is set to [`num_cpus::get`].
82/// * `chunk_size` - The chunk size used to randomize the permutation. This is
83///   an advanced option: see
84///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
85/// * `granularity` - The granularity of the parallel processing expressed as
86///   the number of arcs to process at a time. If `None`, the granularity is
87///   computed adaptively. This is an advanced option: see
88///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
89/// * `seed` - The seed to use for pseudorandom number generation.
90/// * `work_dir` - The directory where the labels will be stored, if `None`, a
91///   temporary directory will be created.
92#[allow(clippy::type_complexity)]
93#[allow(clippy::too_many_arguments)]
94pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
95    sym_graph: R,
96    deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
97    gammas: Vec<f64>,
98    num_threads: Option<usize>,
99    chunk_size: Option<usize>,
100    arc_granularity: Granularity,
101    seed: u64,
102    predicate: impl Predicate<preds::PredParams>,
103    work_dir: impl AsRef<Path>,
104) -> Result<Box<[usize]>> {
105    // compute the labels
106    layered_label_propagation_labels_only(
107        sym_graph,
108        deg_cumul,
109        gammas,
110        num_threads,
111        chunk_size,
112        arc_granularity,
113        seed,
114        predicate,
115        &work_dir,
116    )?;
117    // merge them
118    combine_labels(work_dir)
119}
120
121#[allow(clippy::type_complexity)]
122#[allow(clippy::too_many_arguments)]
123/// Computes and store on disk the labels for the given gammas, but does not combine them.
124/// For the arguments look at [`layered_label_propagation`].
125pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
126    sym_graph: R,
127    deg_cumul: &(impl Succ<Input = usize, Output = usize> + Send + Sync),
128    gammas: Vec<f64>,
129    num_threads: Option<usize>,
130    chunk_size: Option<usize>,
131    arc_granularity: Granularity,
132    seed: u64,
133    predicate: impl Predicate<preds::PredParams>,
134    work_dir: impl AsRef<Path>,
135) -> Result<()> {
136    // work-around to make TempDir Live as much as needed but only create it if
137    // the user didn't provide a work_dir, which can also be a TempDir.
138    let work_path = work_dir.as_ref();
139    let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
140    const IMPROV_WINDOW: usize = 10;
141    let num_nodes = sym_graph.num_nodes();
142    let chunk_size = chunk_size.unwrap_or(1_000_000);
143    let num_threads = num_threads.unwrap_or_else(num_cpus::get);
144
145    // init the permutation with the indices
146    let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
147
148    let mut can_change = Vec::with_capacity(num_nodes as _);
149    can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
150    let mut label_store = label_store::LabelStore::new(num_nodes as _);
151    // build a thread_pool so we avoid having to re-create the threads
152    let thread_pool = rayon::ThreadPoolBuilder::new()
153        .num_threads(num_threads)
154        .build()
155        .context("Could not create thread pool")?;
156
157    // init the gamma progress logger
158    let mut gamma_pl = progress_logger!(
159        display_memory = true,
160        item_name = "gamma",
161        expected_updates = Some(gammas.len()),
162    );
163
164    // init the iteration progress logger
165    let mut iter_pl = progress_logger!(item_name = "update");
166
167    let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
168
169    // init the update progress logger
170    let mut update_pl = concurrent_progress_logger!(item_name = "node", local_speed = true);
171
172    let seed = AtomicU64::new(seed);
173    let mut costs = Vec::with_capacity(gammas.len());
174
175    gamma_pl.start(format!("Running {} threads", num_threads));
176    info!("Stopping criterion: {predicate}");
177
178    for (gamma_index, gamma) in gammas.iter().enumerate() {
179        // Reset mutable state for the next gamma
180        iter_pl.start(format!(
181            "Starting iterations with gamma={} ({}/{})...",
182            gamma,
183            gamma_index + 1,
184            gammas.len(),
185        ));
186        label_store.init();
187        thread_pool.install(|| {
188            can_change
189                .par_iter()
190                .with_min_len(RAYON_MIN_LEN)
191                .for_each(|c| c.store(true, Ordering::Relaxed));
192        });
193
194        let mut obj_func = 0.0;
195        let mut prev_gain = f64::MAX;
196        let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
197
198        for update in 0.. {
199            update_pl.expected_updates(Some(num_nodes));
200            update_pl.start(format!(
201                "Starting update {} (for gamma={}, {}/{})...",
202                update,
203                gamma,
204                gamma_index + 1,
205                gammas.len()
206            ));
207
208            update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
209            thread_pool.install(|| {
210                // parallel shuffle
211                update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
212                    let seed = seed.fetch_add(1, Ordering::Relaxed);
213                    let mut rand = SmallRng::seed_from_u64(seed);
214                    chunk.shuffle(&mut rand);
215                });
216            });
217
218            // If this iteration modified anything (early stop)
219            let modified = AtomicUsize::new(0);
220
221            let delta_obj_func = sym_graph.par_apply(
222                |range| {
223                    let mut rand = SmallRng::seed_from_u64(range.start as u64);
224                    let mut local_obj_func = 0.0;
225                    for &node in &update_perm[range] {
226                        // Note that here we are using a heuristic optimization:
227                        // if no neighbor has changed, the label of a node
228                        // cannot change. If gamma != 0, this is not necessarily
229                        // true, as a node might need to change its value just
230                        // because of a change of volume of the adjacent labels.
231                        if !can_change[node].load(Ordering::Relaxed) {
232                            continue;
233                        }
234                        // set that the node can't change by default and we'll unset later it if it can
235                        can_change[node].store(false, Ordering::Relaxed);
236
237                        let successors = sym_graph.successors(node);
238                        if sym_graph.outdegree(node) == 0 {
239                            continue;
240                        }
241
242                        // get the label of this node
243                        let curr_label = label_store.label(node);
244
245                        // compute the frequency of successor labels
246                        let mut map =
247                            HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
248                        for succ in successors {
249                            map.entry(label_store.label(succ))
250                                .and_modify(|counter| *counter += 1)
251                                .or_insert(1_usize);
252                        }
253                        // add the current label to the map
254                        map.entry(curr_label).or_insert(0_usize);
255
256                        let mut max = f64::NEG_INFINITY;
257                        let mut old = 0.0;
258                        let mut majorities = vec![];
259                        // compute the most entropic label
260                        for (&label, &count) in map.iter() {
261                            // For replication of the results of the Java
262                            // version, one needs to decrement the volume of
263                            // the current value the Java version does
264                            // (see the commented code below).
265                            //
266                            // Note that this is not exactly equivalent to the
267                            // behavior of the Java version, as during the
268                            // execution of this loop if another thread reads
269                            // the volume of the current label it will get a
270                            // value larger by one WRT the Java version.
271                            let volume = label_store.volume(label); // - (label == curr_label) as usize;
272                            let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
273
274                            if max == val {
275                                majorities.push(label);
276                            }
277
278                            if val > max {
279                                majorities.clear();
280                                max = val;
281                                majorities.push(label);
282                            }
283
284                            if label == curr_label {
285                                old = val;
286                            }
287                        }
288                        // randomly break ties
289                        let next_label = *majorities.choose(&mut rand).unwrap();
290                        // if the label changed we need to update the label store
291                        // and signal that this could change the neighbor nodes
292                        if next_label != curr_label {
293                            modified.fetch_add(1, Ordering::Relaxed);
294                            for succ in sym_graph.successors(node) {
295                                can_change[succ].store(true, Ordering::Relaxed);
296                            }
297                            label_store.update(node, next_label);
298                        }
299                        local_obj_func += max - old;
300                    }
301                    local_obj_func
302                },
303                |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
304                arc_granularity,
305                deg_cumul,
306                &thread_pool,
307                &mut update_pl,
308            );
309
310            update_pl.done_with_count(num_nodes);
311            iter_pl.update_and_display();
312
313            obj_func += delta_obj_func;
314            let gain = delta_obj_func / obj_func;
315            let gain_impr = (prev_gain - gain) / prev_gain;
316            prev_gain = gain;
317            improv_window.pop_front();
318            improv_window.push_back(gain_impr);
319            let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
320
321            info!("Gain: {gain}");
322            info!("Gain improvement: {gain_impr}");
323            info!("Average gain improvement: {avg_gain_impr}");
324            info!("Modified: {}", modified.load(Ordering::Relaxed),);
325
326            if predicate.eval(&PredParams {
327                num_nodes: sym_graph.num_nodes(),
328                num_arcs: sym_graph.num_arcs(),
329                gain,
330                avg_gain_impr,
331                modified: modified.load(Ordering::Relaxed),
332                update,
333            }) || modified.load(Ordering::Relaxed) == 0
334            {
335                break;
336            }
337        }
338
339        iter_pl.done();
340
341        // We temporarily use the update permutation to compute the sorting
342        // permutation of the labels.
343        let perm = &mut update_perm;
344        thread_pool.install(|| {
345            perm.par_iter_mut()
346                .with_min_len(RAYON_MIN_LEN)
347                .enumerate()
348                .for_each(|(i, x)| *x = i);
349            // Sort by label
350            perm.par_sort_unstable_by(|&a, &b| {
351                label_store
352                    .label(a as _)
353                    .cmp(&label_store.label(b as _))
354                    .then_with(|| a.cmp(&b))
355            });
356        });
357
358        // Save labels
359        let (labels, volumes) = label_store.labels_and_volumes();
360
361        // We temporarily use the label array from the label store to compute
362        // the inverse permutation. It will be reinitialized at the next
363        // iteration anyway.
364        thread_pool.install(|| {
365            invert_permutation(perm, volumes);
366        });
367
368        update_pl.expected_updates(Some(num_nodes));
369        update_pl.start("Computing log-gap cost...");
370
371        let gap_cost = gap_cost::compute_log_gap_cost(
372            &PermutedGraph {
373                graph: &sym_graph,
374                perm: &volumes,
375            },
376            arc_granularity,
377            deg_cumul,
378            &thread_pool,
379            &mut update_pl,
380        );
381
382        update_pl.done();
383
384        info!("Log-gap cost: {}", gap_cost);
385        costs.push(gap_cost);
386
387        // store the labels on disk with their cost and gamma
388        let labels_store = LabelsStore {
389            labels: &*labels,
390            gap_cost,
391            gamma: *gamma,
392        };
393        labels_store
394            .store(labels_path(gamma_index))
395            .context("Could not serialize labels")?;
396
397        gamma_pl.update_and_display();
398    }
399
400    gamma_pl.done();
401
402    Ok(())
403}
404
405/// Combines the labels computed by LLP into a final labels array.
406///
407/// * `work_dir`: The folder where the labels to combine are.
408pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
409    let mut gammas = vec![];
410    let iter = std::fs::read_dir(work_dir.as_ref())?
411        .filter_map(Result::ok)
412        .filter(|path| {
413            let path_name = path.file_name();
414            let path_str = path_name.to_string_lossy();
415            path_str.starts_with("labels_")
416                && path_str.ends_with(".bin")
417                && path.file_type().unwrap().is_file()
418        });
419
420    let mut num_nodes = None;
421    for path in iter {
422        let path = work_dir.as_ref().join(path.file_name());
423        // we only need the cost and gamma here, so we mmap it to ignore the
424        // actual labels which will be needed only later
425        let res = <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
426            .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))?;
427        info!(
428            "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
429            path.to_string_lossy(),
430            res.gamma,
431            res.gap_cost,
432            res.labels.len()
433        );
434
435        match &mut num_nodes {
436            num_nodes @ None => {
437                *num_nodes = Some(res.labels.len());
438            }
439            Some(num_nodes) => {
440                if res.labels.len() != *num_nodes {
441                    anyhow::bail!(
442                        "Labels '{}' have length {} while we expected {}.",
443                        path.to_string_lossy(),
444                        res.labels.len(),
445                        num_nodes
446                    );
447                }
448            }
449        }
450        gammas.push((res.gap_cost, res.gamma, path));
451    }
452
453    if gammas.is_empty() {
454        anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
455    }
456    let mut temp_perm = vec![0; num_nodes.unwrap()];
457
458    // compute the indices that sorts the gammas by cost
459    // sort in descending order
460    gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
461
462    // the best gamma is the last because it has the min cost
463    let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
464    let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
465    info!(
466        "Best gamma: {}\twith log-gap cost {}",
467        best_gamma, best_gamma_cost
468    );
469    info!(
470        "Worst gamma: {}\twith log-gap cost {}",
471        worst_gamma, worst_gamma_cost
472    );
473
474    let mut result_labels = <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
475        .context("Could not load labels from best gamma")?
476        .labels
477        .to_vec();
478
479    let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
480    for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
481        info!(
482            "Starting step {} with gamma {} cost {} and labels {:?}...",
483            i, gamma, cost, gamma_path
484        );
485        let labels = <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
486            .context("Could not load labels")?;
487        combine(&mut result_labels, labels.labels, &mut temp_perm)
488            .context("Could not combine labels")?;
489        drop(labels); // explicit drop so we free labels before loading best_labels
490
491        // This recombination with the best labels does not appear in the paper, but
492        // it is not harmful and fixes a few corner cases in which experimentally
493        // LLP does not perform well. It was introduced by Marco Rosa in the Java
494        // LAW code.
495        info!(
496            "Recombining with gamma {} cost {} and labels {:?}...",
497            best_gamma, best_gamma_cost, best_gamma_path
498        );
499        let best_labels = <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
500            .context("Could not load labels from best gamma")?;
501        let number_of_labels = combine(&mut result_labels, best_labels.labels, &mut temp_perm)?;
502        info!("Number of labels: {}", number_of_labels);
503    }
504
505    Ok(result_labels.into_boxed_slice())
506}
507
508/// combine the labels from two permutations into a single one
509fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
510    // re-init the permutation
511    temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
512    // permute by the devilish function
513    temp_perm.par_sort_unstable_by(|&a, &b| {
514        (result[labels[a]].cmp(&result[labels[b]]))
515            .then_with(|| labels[a].cmp(&labels[b]))
516            .then_with(|| result[a].cmp(&result[b]))
517            .then_with(|| a.cmp(&b)) // to make it stable
518    });
519    let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
520    let mut curr_label = 0;
521    result[temp_perm[0]] = curr_label;
522
523    for i in 1..temp_perm.len() {
524        let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
525        if prev_labels != curr_labels {
526            curr_label += 1;
527            prev_labels = curr_labels
528        }
529        result[temp_perm[i]] = curr_label;
530    }
531
532    Ok(curr_label + 1)
533}
534
535/// Inverts a permutation.
536pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
537    let sync_slice = inv_perm.as_sync_slice();
538    perm.par_iter()
539        .with_min_len(RAYON_MIN_LEN)
540        .enumerate()
541        .for_each(|(i, &x)| {
542            unsafe { sync_slice[x].set(i) };
543        });
544}
545
546/// Computes the ranks of a slice of labels by their natural order.
547pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
548    let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
549    llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
550    let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
551    invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
552    llp_inv_perm
553}