webgraph_cli/run/
llp.rs

1/*
2 * SPDX-FileCopyrightText: 2023 Inria
3 * SPDX-FileCopyrightText: 2023 Sebastiano Vigna
4 * SPDX-FileCopyrightText: 2023 Tommaso Fontana
5 *
6 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
7 */
8
9use crate::GlobalArgs;
10use crate::GranularityArgs;
11use crate::NumThreadsArg;
12use crate::create_parent_dir;
13use crate::get_thread_pool;
14use anyhow::{Context, Result, bail};
15use clap::Parser;
16use dsi_bitstream::dispatch::factory::CodesReaderFactoryHelper;
17use dsi_bitstream::prelude::*;
18use epserde::prelude::*;
19use webgraph::prelude::*;
20use webgraph_algo::llp::preds::{MaxUpdates, MinAvgImprov, MinGain, MinModified, PercModified};
21use webgraph_algo::{combine_labels, labels_to_ranks};
22
23use predicates::prelude::*;
24use std::io::{BufWriter, Write};
25use std::path::Path;
26use std::path::PathBuf;
27use tempfile::tempdir;
28
29#[derive(Parser, Debug)]
30#[command(name = "llp", about = "Computes a permutation of a graph using Layered Label Propagation.", long_about = None)]
31pub struct CliArgs {
32    /// The basename of the graph.
33    pub src: PathBuf,
34
35    /// A filename for the LLP permutation in binary big-endian format. If not
36    /// provided, we will compute the labels but not combine them into the final
37    /// permutation. If you don't set this parameter, be sure to set `work_dir`
38    /// so the labels will not be deleted at the end.
39    pub perm: Option<PathBuf>,
40
41    /// The folder where the LLP labels are stored. If not specified, a temp
42    /// dir will be used which will be deleted at the end of the computation.
43    /// The temp dir parent folder can be specified with the TMPDIR environment
44    /// variable.
45    /// Setting a work_dir has two purposes: saving the information they
46    /// compute and to be able to resume the computation of gammas as their
47    /// computation on large graphs might take days.
48    /// The labels represent information about communities in the graph, nodes
49    /// similar will have the same label.
50    /// To resume computation you can compute the remaining gammas without
51    /// passing `perm`, and then finally run `combine` that will combine all the
52    /// labels of the gammas present in the folder into a final permutation.
53    #[arg(short, long)]
54    pub work_dir: Option<PathBuf>,
55
56    #[arg(short, long)]
57    /// Save the permutation in ε-serde format.
58    pub epserde: bool,
59
60    #[arg(short, long, allow_hyphen_values = true, use_value_delimiter = true, value_delimiter = ',', default_values_t = vec!["-0".to_string(), "-1".to_string(), "-2".to_string(), "-3".to_string(), "-4".to_string(), "-5".to_string(), "-6".to_string(), "-7".to_string(), "-8".to_string(), "-9".to_string(), "-10".to_string()])]
61    /// The ɣ's to use in LLP, separated by commas. The format is given by a
62    /// integer numerator (if missing, assumed to be one), a dash, and then a
63    /// power-of-two exponent for the denominator. For example, -2 is 1/4, and
64    /// 0-0 is 0.
65    pub gammas: Vec<String>,
66
67    #[arg(short = 'u', long, default_value_t = 100)]
68    /// If specified, the maximum number of updates for a given ɣ.
69    pub max_updates: usize,
70
71    #[arg(short = 'M', long)]
72    /// If true, updates will be stopped when the number of modified nodes is less
73    /// than the square root of the number of nodes of the graph.
74    pub modified: bool,
75
76    #[arg(short = 'p', long)]
77    /// If true, updates will be stopped when the number of modified nodes is less
78    /// than the specified percentage of the number of nodes of the graph.
79    pub perc_modified: Option<f64>,
80
81    #[arg(short = 't', long, default_value_t = MinGain::DEFAULT_THRESHOLD)]
82    /// The gain threshold used to stop the computation (0 to disable).
83    pub gain_threshold: f64,
84
85    #[arg(short = 'i', long, default_value_t = MinAvgImprov::DEFAULT_THRESHOLD)]
86    /// The threshold on the average (over the last ten updates) gain
87    /// improvement used to stop the computation (-Inf to disable).
88    pub improv_threshold: f64,
89
90    #[clap(flatten)]
91    pub num_threads: NumThreadsArg,
92
93    #[arg(short, long, default_value_t = 0)]
94    /// The seed to use for the PRNG.
95    pub seed: u64,
96
97    #[clap(flatten)]
98    pub granularity: GranularityArgs,
99
100    #[arg(long)]
101    /// The chunk size used to localize the random permutation
102    /// (advanced option).
103    pub chunk_size: Option<usize>,
104}
105
106/// Helper method that stores labels with or without epserde
107pub fn store_perm(data: &[usize], perm: impl AsRef<Path>, epserde: bool) -> Result<()> {
108    if epserde {
109        unsafe {
110            data.store(&perm).with_context(|| {
111                format!("Could not write permutation to {}", perm.as_ref().display())
112            })
113        }
114    } else {
115        let mut file = std::fs::File::create(&perm).with_context(|| {
116            format!(
117                "Could not create permutation at {}",
118                perm.as_ref().display()
119            )
120        })?;
121        let mut buf = BufWriter::new(&mut file);
122        for word in data.iter() {
123            buf.write_all(&word.to_be_bytes()).with_context(|| {
124                format!("Could not write permutation to {}", perm.as_ref().display())
125            })?;
126        }
127        Ok(())
128    }
129}
130
131pub fn main(global_args: GlobalArgs, args: CliArgs) -> Result<()> {
132    if args.perm.is_none() && args.work_dir.is_none() {
133        log::warn!(concat!(
134            "If `perm` is not set the llp will just compute the labels and not produce the final permutation. ",
135            "But you didn't set `work_dir` so the labels will be stored in a temp dir that will be deleted at the end of computation. ",
136            "Either set `perm` if you want to compute the permutation, or `work_dir` if you want the labels and combine them later."
137        ));
138        return Ok(());
139    }
140
141    if let Some(perm) = &args.perm {
142        create_parent_dir(perm)?;
143    }
144
145    match get_endianness(&args.src)?.as_str() {
146        #[cfg(feature = "be_bins")]
147        BE::NAME => llp::<BE>(global_args, args),
148        #[cfg(feature = "le_bins")]
149        LE::NAME => llp::<LE>(global_args, args),
150        e => panic!("Unknown endianness: {}", e),
151    }
152}
153
154pub fn llp<E: Endianness + 'static + Send + Sync>(
155    _global_args: GlobalArgs,
156    args: CliArgs,
157) -> Result<()>
158where
159    MemoryFactory<E, MmapHelper<u32>>: CodesReaderFactoryHelper<E>,
160    for<'a> LoadModeCodesReader<'a, E, LoadMmap>: BitSeek,
161{
162    let start = std::time::Instant::now();
163    // due to ownership problems, we always create the temp dir, but only use it
164    // if the user didn't provide a work_dir
165    let temp_dir = tempdir()?;
166    let work_dir = args.work_dir.as_deref().unwrap_or(temp_dir.path());
167    log::info!("Using workdir: {}", work_dir.display());
168
169    // Load the graph in THP memory
170    log::info!(
171        "Loading graph {} in THP memory...",
172        args.src.to_string_lossy()
173    );
174    let graph = BvGraph::with_basename(&args.src)
175        .mode::<LoadMmap>()
176        .flags(MemoryFlags::TRANSPARENT_HUGE_PAGES | MemoryFlags::RANDOM_ACCESS)
177        .endianness::<E>()
178        .load()?;
179
180    // Load degree cumulative function in THP memory
181    log::info!("Loading DCF in THP memory...");
182    let deg_cumul = unsafe {
183        DCF::load_mmap(
184            args.src.with_extension(DEG_CUMUL_EXTENSION),
185            Flags::TRANSPARENT_HUGE_PAGES | Flags::RANDOM_ACCESS,
186        )
187        .with_context(|| {
188            format!(
189                "Could not load degree cumulative function for basename {}",
190                args.src.display()
191            )
192        })
193    }?;
194
195    // parse the gamma format
196    let mut gammas = vec![];
197    for gamma in args.gammas {
198        let t: Vec<_> = gamma.split('-').collect();
199        if t.len() != 2 {
200            bail!("Invalid gamma: {}", gamma);
201        }
202        gammas.push(
203            if t[0].is_empty() {
204                1.0
205            } else {
206                t[0].parse::<usize>()? as f64
207            } * (0.5_f64).powf(t[1].parse::<usize>()? as f64),
208        );
209    }
210
211    gammas.sort_by(|a, b| a.total_cmp(b));
212
213    let mut predicate = MinGain::try_from(args.gain_threshold)?.boxed();
214    predicate = predicate
215        .or(MinAvgImprov::try_from(args.improv_threshold)?)
216        .boxed();
217    predicate = predicate.or(MaxUpdates::from(args.max_updates)).boxed();
218
219    if args.modified {
220        predicate = predicate.or(MinModified::default()).boxed();
221    }
222
223    if let Some(perc_modified) = args.perc_modified {
224        predicate = predicate.or(PercModified::try_from(perc_modified)?).boxed();
225    }
226
227    let granularity = args.granularity.into_granularity();
228
229    // compute the LLP
230    webgraph_algo::llp::layered_label_propagation_labels_only(
231        graph,
232        deg_cumul.uncase(),
233        gammas,
234        Some(args.num_threads.num_threads),
235        args.chunk_size,
236        granularity,
237        args.seed,
238        predicate,
239        work_dir,
240    )
241    .context("Could not compute the LLP")?;
242
243    log::info!("Elapsed: {}", start.elapsed().as_secs_f64());
244    if let Some(perm_path) = args.perm {
245        let thread_pool = get_thread_pool(args.num_threads.num_threads);
246        thread_pool.install(|| -> Result<()> {
247            let labels = combine_labels(work_dir)?;
248            log::info!("Combined labels...");
249            let rank_perm = labels_to_ranks(&labels);
250            log::info!("Saving permutation...");
251            store_perm(&rank_perm, perm_path, args.epserde)?;
252            Ok(())
253        })?;
254    }
255    Ok(())
256}