Skip to main content

swh_graph/compress/
transform.rs

1// Copyright (C) 2023-2026  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::num::NonZeroUsize;
7use std::path::PathBuf;
8
9use anyhow::{Context, Result};
10use dsi_bitstream::prelude::BE;
11use dsi_progress_logger::{concurrent_progress_logger, ProgressLog};
12use itertools::Itertools;
13use lender::{IntoIteratorExt, IntoLender, Lender};
14use rayon::prelude::*;
15use webgraph::graphs::arc_list_graph::ArcListGraph;
16use webgraph::prelude::*;
17use webgraph::utils::ParSortPairs;
18
19/// Writes a new graph on disk, obtained by applying the function to all arcs
20/// on the source graph.
21pub fn transform<F, G, Iter>(
22    partitions_per_thread: usize,
23    graph: G,
24    transformation: F,
25    target_path: PathBuf,
26) -> Result<()>
27where
28    F: Fn(usize, usize) -> Iter + Send + Sync,
29    Iter: IntoIterator<Item = (usize, usize), IntoIter: Send + Sync>,
30    G: SplitLabeling<Label=usize>,
31    for<'a> <<G as SplitLabeling>::IntoIterator<'a> as IntoIterator>::IntoIter: Send + Sync,
32    for<'a, 'b> <<<G as SplitLabeling>::SplitLender<'a> as NodeLabelsLender<'b>>::IntoIterator as IntoIterator>::IntoIter: Send + Sync,
33{
34    // Adapted from https://github.com/vigna/webgraph-rs/blob/08969fb1ac4ea59aafdbae976af8e026a99c9ac5/src/bin/perm.rs
35    let num_nodes = graph.num_nodes();
36
37    let temp_dir = tempfile::tempdir().context("Could not create temporary directory")?;
38
39    let num_threads = num_cpus::get();
40    let num_partitions = num_threads * partitions_per_thread;
41    let nodes_per_partition = num_nodes.div_ceil(num_partitions);
42
43    // Avoid empty partitions at the end when there are very few nodes
44    let num_partitions = num_nodes.div_ceil(nodes_per_partition);
45
46    log::info!(
47        "Transforming {} nodes with {} threads, {} partitions, {} nodes per partition",
48        num_nodes,
49        num_threads,
50        num_partitions,
51        nodes_per_partition,
52    );
53
54    let mut pl = concurrent_progress_logger!(
55        display_memory = true,
56        item_name = "node",
57        expected_updates = Some(num_nodes),
58        local_speed = true,
59    );
60    pl.start("Reading and sorting...");
61
62    // Merge sorted arc lists into a single sorted arc list
63    let pair_sorter =
64        ParSortPairs::new(num_nodes)?.num_partitions(NonZeroUsize::new(num_partitions).unwrap());
65    let transformation = &transformation;
66    let sorted_arcs = {
67        let pl = pl.clone();
68        pair_sorter
69            .sort(
70                graph
71                    .split_iter(num_partitions)
72                    .into_iter()
73                    .collect::<Vec<_>>()
74                    .into_par_iter()
75                    .flat_map_iter(move |partition| {
76                        let mut pl = pl.clone();
77                        partition
78                            .flat_map(lender::covar_mut!(
79                                #![with<'g, G: SplitLabeling<Label=usize>>]
80                                for<'lend>
81                                move |(src, succ): (usize, <<G as SplitLabeling>::SplitLender<'g> as NodeLabelsLender<'lend>>::IntoIterator)|
82                                -> lender::FromIter<std::vec::IntoIter<(usize, usize)>> {
83                                    let transformed_succ: Vec<_> = succ
84                                        .into_iter()
85                                        .flat_map(move |dst: usize| transformation(src, dst).into_iter())
86                                        .collect();
87                                    pl.light_update();
88                                    transformed_succ.into_into_lender().into_lender()
89                                }
90                            ))
91                            .iter()
92                    }),
93            )
94            .context("Could not sort arcs")?
95    };
96    pl.done();
97
98    let arc_list_graphs = Vec::from(sorted_arcs.iters).into_iter().enumerate().map(
99        |(partition_id, sorted_arcs_partition)| {
100            ArcListGraph::new(num_nodes, sorted_arcs_partition.into_iter().dedup())
101                .iter_from(sorted_arcs.boundaries[partition_id])
102                .take(
103                    sorted_arcs.boundaries[partition_id + 1]
104                        .checked_sub(sorted_arcs.boundaries[partition_id])
105                        .expect("sorted_arcs.boundaries is not sorted"),
106                )
107        },
108    );
109
110    BvComp::with_basename(target_path)
111        .par_comp_lenders::<BE, _>(arc_list_graphs, num_nodes)
112        .context("Could not build BVGraph from arcs")?;
113
114    drop(temp_dir); // Prevent early deletion
115
116    Ok(())
117}