webgraph_algo/llp/
mod.rs

1/*
2 * SPDX-FileCopyrightText: 2024 Tommaso Fontana
3 * SPDX-FileCopyrightText: 2024 Sebastiano Vigna
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8//! Layered label propagation.
9//!
10//! An implementation of the _layered label propagation_ algorithm described by
11//! Paolo Boldi, Sebastiano Vigna, Marco Rosa, Massimo Santini, and Sebastiano
12//! Vigna in “Layered label propagation: A multiresolution coordinate-free
13//! ordering for compressing social networks”, _Proceedings of the 20th
14//! international conference on World Wide Web_, pages 587–596, ACM, 2011.
15//!
16//! The function [`layered_label_propagation`] returns a permutation of the
17//! provided symmetric graph which will (hopefully) increase locality (see the
18//! paper). Usually, the permutation is fed to [`permute`](webgraph::transform::permute) to permute the
19//! original graph.
20//!
21//! Note that the graph provided should be _symmetric_ and _loopless_. If this
22//! is not the case, please use [`simplify`](webgraph::transform::simplify) to generate a
23//! suitable graph.
24//!
25//! # Memory requirements
26//!
27//! LLP requires three `usize` and a boolean per node, plus the memory that is
28//! necessary to load the graph.
29//!
30use anyhow::{Context, Result};
31use crossbeam_utils::CachePadded;
32use dsi_progress_logger::prelude::*;
33use epserde::prelude::*;
34use predicates::Predicate;
35use preds::PredParams;
36
37use log::info;
38use rand::SeedableRng;
39use rand::rngs::SmallRng;
40use rand::seq::IndexedRandom;
41use rand::seq::SliceRandom;
42use rayon::prelude::*;
43use std::collections::HashMap;
44use std::collections::VecDeque;
45use std::path::Path;
46use std::sync::atomic::Ordering;
47use std::sync::atomic::{AtomicBool, AtomicU64, AtomicUsize};
48use sux::traits::Succ;
49use sync_cell_slice::SyncSlice;
50use webgraph::prelude::PermutedGraph;
51use webgraph::traits::RandomAccessGraph;
52use webgraph::utils::Granularity;
53
54pub(crate) mod gap_cost;
55pub(crate) mod label_store;
56mod mix64;
57pub mod preds;
58
59const RAYON_MIN_LEN: usize = 100000;
60
61#[derive(Epserde, Debug, Clone)]
62/// This struct is how the labels and their metadata are stored on disk.
63pub struct LabelsStore<A> {
64    pub gap_cost: f64,
65    pub gamma: f64,
66    pub labels: A,
67}
68
69/// Runs layered label propagation on the provided symmetric graph and returns
70/// the resulting labels.
71///
72/// Note that no symmetry check is performed, but in that case the algorithm
73/// usually will not give satisfactory results.
74///
75/// # Arguments
76///
77/// * `sym_graph` - The symmetric graph to run LLP on.
78/// * `deg_cumul` - The degree cumulative distribution of the graph, as in
79///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
80/// * `gammas` - The ɣ values to use in the LLP algorithm.
81/// * `num_threads` - The number of threads to use. If `None`, the number of
82///   threads is set to [`num_cpus::get`].
83/// * `chunk_size` - The chunk size used to randomize the permutation. This is
84///   an advanced option: see
85///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
86/// * `granularity` - The granularity of the parallel processing expressed as
87///   the number of arcs to process at a time. If `None`, the granularity is
88///   computed adaptively. This is an advanced option: see
89///   [par_apply](webgraph::traits::SequentialLabeling::par_apply).
90/// * `seed` - The seed to use for pseudorandom number generation.
91/// * `work_dir` - The directory where the labels will be stored, if `None`, a
92///   temporary directory will be created.
93#[allow(clippy::too_many_arguments)]
94pub fn layered_label_propagation<R: RandomAccessGraph + Sync>(
95    sym_graph: R,
96    deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
97    gammas: Vec<f64>,
98    num_threads: Option<usize>,
99    chunk_size: Option<usize>,
100    arc_granularity: Granularity,
101    seed: u64,
102    predicate: impl Predicate<preds::PredParams>,
103    work_dir: impl AsRef<Path>,
104) -> Result<Box<[usize]>> {
105    // compute the labels
106    layered_label_propagation_labels_only(
107        sym_graph,
108        deg_cumul,
109        gammas,
110        num_threads,
111        chunk_size,
112        arc_granularity,
113        seed,
114        predicate,
115        &work_dir,
116    )?;
117    // merge them
118    combine_labels(work_dir)
119}
120
121#[allow(clippy::too_many_arguments)]
122/// Computes and store on disk the labels for the given gammas, but does not combine them.
123/// For the arguments look at [`layered_label_propagation`].
124pub fn layered_label_propagation_labels_only<R: RandomAccessGraph + Sync>(
125    sym_graph: R,
126    deg_cumul: &(impl for<'a> Succ<Input = usize, Output<'a> = usize> + Send + Sync),
127    gammas: Vec<f64>,
128    num_threads: Option<usize>,
129    chunk_size: Option<usize>,
130    arc_granularity: Granularity,
131    seed: u64,
132    predicate: impl Predicate<preds::PredParams>,
133    work_dir: impl AsRef<Path>,
134) -> Result<()> {
135    // work-around to make TempDir Live as much as needed but only create it if
136    // the user didn't provide a work_dir, which can also be a TempDir.
137    let work_path = work_dir.as_ref();
138    let labels_path = |gamma_index| work_path.join(format!("labels_{gamma_index}.bin"));
139    const IMPROV_WINDOW: usize = 10;
140    let num_nodes = sym_graph.num_nodes();
141    let chunk_size = chunk_size.unwrap_or(1_000_000);
142    let num_threads = num_threads.unwrap_or_else(num_cpus::get);
143
144    // init the permutation with the indices
145    let mut update_perm = (0..num_nodes).collect::<Vec<_>>();
146
147    let mut can_change = Vec::with_capacity(num_nodes as _);
148    can_change.extend((0..num_nodes).map(|_| AtomicBool::new(true)));
149    let mut label_store = label_store::LabelStore::new(num_nodes as _);
150    // build a thread_pool so we avoid having to re-create the threads
151    let thread_pool = rayon::ThreadPoolBuilder::new()
152        .num_threads(num_threads)
153        .build()
154        .context("Could not create thread pool")?;
155
156    // init the gamma progress logger
157    let mut gamma_pl = progress_logger![
158        display_memory = true,
159        item_name = "gamma",
160        expected_updates = Some(gammas.len()),
161    ];
162
163    // init the iteration progress logger
164    let mut iter_pl = progress_logger![item_name = "update"];
165
166    let hash_map_init = Ord::max(sym_graph.num_arcs() / sym_graph.num_nodes() as u64, 16) as usize;
167
168    // init the update progress logger
169    let mut update_pl = concurrent_progress_logger![item_name = "node", local_speed = true];
170
171    let seed = CachePadded::new(AtomicU64::new(seed));
172    let mut costs = Vec::with_capacity(gammas.len());
173
174    gamma_pl.start(format!("Running {} threads", num_threads));
175    info!("Stopping criterion: {predicate}");
176
177    for (gamma_index, gamma) in gammas.iter().enumerate() {
178        // Reset mutable state for the next gamma
179        iter_pl.start(format!(
180            "Starting iterations with gamma={} ({}/{})...",
181            gamma,
182            gamma_index + 1,
183            gammas.len(),
184        ));
185        label_store.init();
186        thread_pool.install(|| {
187            can_change
188                .par_iter()
189                .with_min_len(RAYON_MIN_LEN)
190                .for_each(|c| c.store(true, Ordering::Relaxed));
191        });
192
193        let mut obj_func = 0.0;
194        let mut prev_gain = f64::MAX;
195        let mut improv_window: VecDeque<_> = vec![1.0; IMPROV_WINDOW].into();
196
197        for update in 0.. {
198            update_pl.expected_updates(Some(num_nodes));
199            update_pl.start(format!(
200                "Starting update {} (for gamma={}, {}/{})...",
201                update,
202                gamma,
203                gamma_index + 1,
204                gammas.len()
205            ));
206
207            update_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
208            thread_pool.install(|| {
209                // parallel shuffle
210                update_perm.par_chunks_mut(chunk_size).for_each(|chunk| {
211                    let seed = seed.fetch_add(1, Ordering::Relaxed);
212                    let mut rand = SmallRng::seed_from_u64(seed);
213                    chunk.shuffle(&mut rand);
214                });
215            });
216
217            // If this iteration modified anything (early stop)
218            let modified = CachePadded::new(AtomicUsize::new(0));
219
220            let delta_obj_func = sym_graph.par_apply(
221                |range| {
222                    let mut rand = SmallRng::seed_from_u64(range.start as u64);
223                    let mut local_obj_func = 0.0;
224                    for &node in &update_perm[range] {
225                        // Note that here we are using a heuristic optimization:
226                        // if no neighbor has changed, the label of a node
227                        // cannot change. If gamma != 0, this is not necessarily
228                        // true, as a node might need to change its value just
229                        // because of a change of volume of the adjacent labels.
230                        if !can_change[node].load(Ordering::Relaxed) {
231                            continue;
232                        }
233                        // set that the node can't change by default and we'll unset later it if it can
234                        can_change[node].store(false, Ordering::Relaxed);
235
236                        let successors = sym_graph.successors(node);
237                        if sym_graph.outdegree(node) == 0 {
238                            continue;
239                        }
240
241                        // get the label of this node
242                        let curr_label = label_store.label(node);
243
244                        // compute the frequency of successor labels
245                        let mut map =
246                            HashMap::with_capacity_and_hasher(hash_map_init, mix64::Mix64Builder);
247                        for succ in successors {
248                            map.entry(label_store.label(succ))
249                                .and_modify(|counter| *counter += 1)
250                                .or_insert(1_usize);
251                        }
252                        // add the current label to the map
253                        map.entry(curr_label).or_insert(0_usize);
254
255                        let mut max = f64::NEG_INFINITY;
256                        let mut old = 0.0;
257                        let mut majorities = vec![];
258                        // compute the most entropic label
259                        for (&label, &count) in map.iter() {
260                            // For replication of the results of the Java
261                            // version, one needs to decrement the volume of
262                            // the current value the Java version does
263                            // (see the commented code below).
264                            //
265                            // Note that this is not exactly equivalent to the
266                            // behavior of the Java version, as during the
267                            // execution of this loop if another thread reads
268                            // the volume of the current label it will get a
269                            // value larger by one WRT the Java version.
270                            let volume = label_store.volume(label); // - (label == curr_label) as usize;
271                            let val = (1.0 + gamma) * count as f64 - gamma * (volume + 1) as f64;
272
273                            if max == val {
274                                majorities.push(label);
275                            }
276
277                            if val > max {
278                                majorities.clear();
279                                max = val;
280                                majorities.push(label);
281                            }
282
283                            if label == curr_label {
284                                old = val;
285                            }
286                        }
287                        // randomly break ties
288                        let next_label = *majorities.choose(&mut rand).unwrap();
289                        // if the label changed we need to update the label store
290                        // and signal that this could change the neighbor nodes
291                        if next_label != curr_label {
292                            modified.fetch_add(1, Ordering::Relaxed);
293                            for succ in sym_graph.successors(node) {
294                                can_change[succ].store(true, Ordering::Relaxed);
295                            }
296                            label_store.update(node, next_label);
297                        }
298                        local_obj_func += max - old;
299                    }
300                    local_obj_func
301                },
302                |delta_obj_func_0: f64, delta_obj_func_1| delta_obj_func_0 + delta_obj_func_1,
303                arc_granularity,
304                deg_cumul,
305                &thread_pool,
306                &mut update_pl,
307            );
308
309            update_pl.done_with_count(num_nodes);
310            iter_pl.update_and_display();
311
312            obj_func += delta_obj_func;
313            let gain = delta_obj_func / obj_func;
314            let gain_impr = (prev_gain - gain) / prev_gain;
315            prev_gain = gain;
316            improv_window.pop_front();
317            improv_window.push_back(gain_impr);
318            let avg_gain_impr = improv_window.iter().sum::<f64>() / IMPROV_WINDOW as f64;
319
320            info!("Gain: {gain}");
321            info!("Gain improvement: {gain_impr}");
322            info!("Average gain improvement: {avg_gain_impr}");
323            info!("Modified: {}", modified.load(Ordering::Relaxed),);
324
325            if predicate.eval(&PredParams {
326                num_nodes: sym_graph.num_nodes(),
327                num_arcs: sym_graph.num_arcs(),
328                gain,
329                avg_gain_impr,
330                modified: modified.load(Ordering::Relaxed),
331                update,
332            }) || modified.load(Ordering::Relaxed) == 0
333            {
334                break;
335            }
336        }
337
338        iter_pl.done();
339
340        // We temporarily use the update permutation to compute the sorting
341        // permutation of the labels.
342        let perm = &mut update_perm;
343        thread_pool.install(|| {
344            perm.par_iter_mut()
345                .with_min_len(RAYON_MIN_LEN)
346                .enumerate()
347                .for_each(|(i, x)| *x = i);
348            // Sort by label
349            perm.par_sort_unstable_by(|&a, &b| {
350                label_store
351                    .label(a as _)
352                    .cmp(&label_store.label(b as _))
353                    .then_with(|| a.cmp(&b))
354            });
355        });
356
357        // Save labels
358        let (labels, volumes) = label_store.labels_and_volumes();
359
360        // We temporarily use the label array from the label store to compute
361        // the inverse permutation. It will be reinitialized at the next
362        // iteration anyway.
363        thread_pool.install(|| {
364            invert_permutation(perm, volumes);
365        });
366
367        update_pl.expected_updates(Some(num_nodes));
368        update_pl.start("Computing log-gap cost...");
369
370        let gap_cost = gap_cost::compute_log_gap_cost(
371            &PermutedGraph {
372                graph: &sym_graph,
373                perm: &volumes,
374            },
375            arc_granularity,
376            deg_cumul,
377            &thread_pool,
378            &mut update_pl,
379        );
380
381        update_pl.done();
382
383        info!("Log-gap cost: {}", gap_cost);
384        costs.push(gap_cost);
385
386        // store the labels on disk with their cost and gamma
387        let labels_store = LabelsStore {
388            labels: &*labels,
389            gap_cost,
390            gamma: *gamma,
391        };
392        unsafe {
393            labels_store
394                .store(labels_path(gamma_index))
395                .context("Could not serialize labels")
396        }?;
397
398        gamma_pl.update_and_display();
399    }
400
401    gamma_pl.done();
402
403    Ok(())
404}
405
406/// Combines the labels computed by LLP into a final labels array.
407///
408/// * `work_dir`: The folder where the labels to combine are.
409pub fn combine_labels(work_dir: impl AsRef<Path>) -> Result<Box<[usize]>> {
410    let mut gammas = vec![];
411    let iter = std::fs::read_dir(work_dir.as_ref())?
412        .filter_map(Result::ok)
413        .filter(|path| {
414            let path_name = path.file_name();
415            let path_str = path_name.to_string_lossy();
416            path_str.starts_with("labels_")
417                && path_str.ends_with(".bin")
418                && path.file_type().unwrap().is_file()
419        });
420
421    let mut num_nodes = None;
422    for path in iter {
423        let path = work_dir.as_ref().join(path.file_name());
424        // we only need the cost and gamma here, so we mmap it to ignore the
425        // actual labels which will be needed only later
426        let res = unsafe {
427            <LabelsStore<Vec<usize>>>::mmap(&path, Flags::default())
428                .with_context(|| format!("Could not load labels from {}", path.to_string_lossy(),))
429        }?;
430
431        let res = res.uncase();
432
433        info!(
434            "Found labels from {:?} with gamma {} and cost {} and num_nodes {}",
435            path.to_string_lossy(),
436            res.gamma,
437            res.gap_cost,
438            res.labels.len()
439        );
440
441        match &mut num_nodes {
442            num_nodes @ None => {
443                *num_nodes = Some(res.labels.len());
444            }
445            Some(num_nodes) => {
446                if res.labels.len() != *num_nodes {
447                    anyhow::bail!(
448                        "Labels '{}' have length {} while we expected {}.",
449                        path.to_string_lossy(),
450                        res.labels.len(),
451                        num_nodes
452                    );
453                }
454            }
455        }
456        gammas.push((res.gap_cost, res.gamma, path));
457    }
458
459    if gammas.is_empty() {
460        anyhow::bail!("No labels were found in {}", work_dir.as_ref().display());
461    }
462    let mut temp_perm = vec![0; num_nodes.unwrap()];
463
464    // compute the indices that sorts the gammas by cost
465    // sort in descending order
466    gammas.sort_by(|(a, _, _), (b, _, _)| b.total_cmp(a));
467
468    // the best gamma is the last because it has the min cost
469    let (best_gamma_cost, best_gamma, best_gamma_path) = gammas.last().unwrap();
470    let (worst_gamma_cost, worst_gamma, _worst_gamma_path) = &gammas[0];
471    info!(
472        "Best gamma: {}\twith log-gap cost {}",
473        best_gamma, best_gamma_cost
474    );
475    info!(
476        "Worst gamma: {}\twith log-gap cost {}",
477        worst_gamma, worst_gamma_cost
478    );
479
480    let mut result_labels = unsafe {
481        <LabelsStore<Vec<usize>>>::load_mem(best_gamma_path)
482            .context("Could not load labels from best gamma")
483    }?
484    .uncase()
485    .labels
486    .to_vec();
487
488    let mmap_flags = Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS;
489    for (i, (cost, gamma, gamma_path)) in gammas.iter().enumerate() {
490        info!(
491            "Starting step {} with gamma {} cost {} and labels {:?}...",
492            i, gamma, cost, gamma_path
493        );
494        let labels = unsafe {
495            <LabelsStore<Vec<usize>>>::load_mmap(gamma_path, mmap_flags)
496                .context("Could not load labels")
497        }?;
498
499        combine(&mut result_labels, labels.uncase().labels, &mut temp_perm)
500            .context("Could not combine labels")?;
501        drop(labels); // explicit drop so we free labels before loading best_labels
502
503        // This recombination with the best labels does not appear in the paper, but
504        // it is not harmful and fixes a few corner cases in which experimentally
505        // LLP does not perform well. It was introduced by Marco Rosa in the Java
506        // LAW code.
507        info!(
508            "Recombining with gamma {} cost {} and labels {:?}...",
509            best_gamma, best_gamma_cost, best_gamma_path
510        );
511        let best_labels = unsafe {
512            <LabelsStore<Vec<usize>>>::load_mmap(best_gamma_path, mmap_flags)
513                .context("Could not load labels from best gamma")
514        }?;
515        let number_of_labels = combine(
516            &mut result_labels,
517            best_labels.uncase().labels,
518            &mut temp_perm,
519        )?;
520        info!("Number of labels: {}", number_of_labels);
521    }
522
523    Ok(result_labels.into_boxed_slice())
524}
525
526/// combine the labels from two permutations into a single one
527fn combine(result: &mut [usize], labels: &[usize], temp_perm: &mut [usize]) -> Result<usize> {
528    // re-init the permutation
529    temp_perm.iter_mut().enumerate().for_each(|(i, x)| *x = i);
530    // permute by the devilish function
531    temp_perm.par_sort_unstable_by(|&a, &b| {
532        (result[labels[a]].cmp(&result[labels[b]]))
533            .then_with(|| labels[a].cmp(&labels[b]))
534            .then_with(|| result[a].cmp(&result[b]))
535            .then_with(|| a.cmp(&b)) // to make it stable
536    });
537    let mut prev_labels = (result[temp_perm[0]], labels[temp_perm[0]]);
538    let mut curr_label = 0;
539    result[temp_perm[0]] = curr_label;
540
541    for i in 1..temp_perm.len() {
542        let curr_labels = (result[temp_perm[i]], labels[temp_perm[i]]);
543        if prev_labels != curr_labels {
544            curr_label += 1;
545            prev_labels = curr_labels
546        }
547        result[temp_perm[i]] = curr_label;
548    }
549
550    Ok(curr_label + 1)
551}
552
553/// Inverts a permutation.
554pub fn invert_permutation(perm: &[usize], inv_perm: &mut [usize]) {
555    let sync_slice = inv_perm.as_sync_slice();
556    perm.par_iter()
557        .with_min_len(RAYON_MIN_LEN)
558        .enumerate()
559        .for_each(|(i, &x)| {
560            unsafe { sync_slice[x].set(i) };
561        });
562}
563
564/// Computes the ranks of a slice of labels by their natural order.
565pub fn labels_to_ranks(labels: &[usize]) -> Box<[usize]> {
566    let mut llp_perm = (0..labels.len()).collect::<Vec<_>>().into_boxed_slice();
567    llp_perm.par_sort_by(|&a, &b| labels[a].cmp(&labels[b]));
568    let mut llp_inv_perm = vec![0; llp_perm.len()].into_boxed_slice();
569    invert_permutation(llp_perm.as_ref(), llp_inv_perm.as_mut());
570    llp_inv_perm
571}