swh_graph/compress/
bv.rs

1// Copyright (C) 2023-2025  The Software Heritage developers
2// See the AUTHORS file at the top-level directory of this distribution
3// License: GNU General Public License version 3, or any later version
4// See top-level LICENSE file for more information
5
6use std::fs::File;
7use std::io::BufWriter;
8use std::path::{Path, PathBuf};
9use std::time::{SystemTime, UNIX_EPOCH};
10
11use anyhow::{anyhow, Context, Result};
12use dsi_bitstream::codes::GammaWrite;
13use dsi_bitstream::prelude::{BitRead, BitWrite, BufBitWriter, WordAdapter, BE, NE};
14use dsi_progress_logger::{concurrent_progress_logger, progress_logger, ProgressLog};
15use itertools::Itertools;
16use lender::{for_, Lender};
17use nonmax::NonMaxU64;
18use pthash::Phf;
19use rayon::prelude::*;
20use tempfile;
21use webgraph::graphs::arc_list_graph::ArcListGraph;
22use webgraph::prelude::sort_pairs::{BitReader, BitWriter};
23use webgraph::prelude::*;
24
25use super::iter_arcs::iter_arcs;
26use super::iter_labeled_arcs::iter_labeled_arcs;
27use super::label_names::{LabelNameHasher, LabelNameMphf};
28use super::stats::estimate_edge_count;
29use crate::map::{MappedPermutation, Permutation};
30use crate::mph::LoadableSwhidMphf;
31use crate::utils::sort::par_sort_arcs;
32
33pub fn bv<MPHF: LoadableSwhidMphf + Sync>(
34    sort_batch_size: usize,
35    partitions_per_thread: usize,
36    mph_basepath: PathBuf,
37    num_nodes: usize,
38    dataset_dir: PathBuf,
39    allowed_node_types: &[crate::NodeType],
40    target_dir: PathBuf,
41) -> Result<()> {
42    log::info!("Reading MPH");
43    let mph = MPHF::load(mph_basepath).context("Could not load MPHF")?;
44    log::info!("MPH loaded, sorting arcs");
45
46    let num_threads = num_cpus::get();
47    let num_partitions = num_threads * partitions_per_thread;
48    let nodes_per_partition = num_nodes.div_ceil(num_partitions);
49
50    // Avoid empty partitions at the end when there are very few nodes
51    let num_partitions = num_nodes.div_ceil(nodes_per_partition);
52
53    let mut pl = concurrent_progress_logger!(
54        display_memory = true,
55        item_name = "arc",
56        local_speed = true,
57        expected_updates = Some(
58            estimate_edge_count(&dataset_dir, allowed_node_types)
59                .context("Could not estimate edge count")? as usize,
60        ),
61    );
62    pl.start("Reading arcs");
63
64    // Sort in parallel in a bunch of SortPairs instances
65    let temp_dir = tempfile::tempdir().context("Could not get temporary_directory")?;
66    let sorted_arcs_path = temp_dir.path().join("sorted_arcs");
67    std::fs::create_dir(&sorted_arcs_path)
68        .with_context(|| format!("Could not create {}", sorted_arcs_path.display()))?;
69    let sorted_arcs = par_sort_arcs(
70        &sorted_arcs_path,
71        sort_batch_size,
72        iter_arcs(&dataset_dir, allowed_node_types)
73            .context("Could not open input files to read arcs")?
74            .map_with(pl.clone(), |thread_pl, (src, dst)| {
75                thread_pl.light_update();
76                (src, dst)
77            }),
78        num_partitions,
79        (),
80        (),
81        |buffer, (src, dst)| {
82            let src = mph
83                .hash_str_array(&src)
84                .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&src)))?;
85            let dst = mph
86                .hash_str_array(&dst)
87                .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&dst)))?;
88            assert!(src < num_nodes, "src node id is greater than {num_nodes}");
89            assert!(dst < num_nodes, "dst node id is greater than {num_nodes}");
90            let partition_id = src / nodes_per_partition;
91            buffer.insert(partition_id, src, dst)?;
92            Ok(())
93        },
94    )?;
95    pl.done();
96
97    let arc_list_graphs =
98        sorted_arcs
99            .into_iter()
100            .enumerate()
101            .map(|(partition_id, sorted_arcs_partition)| {
102                webgraph::prelude::Left(ArcListGraph::new_labeled(
103                    num_nodes,
104                    sorted_arcs_partition.dedup(),
105                ))
106                .iter_from(partition_id * nodes_per_partition)
107                .take(nodes_per_partition)
108            });
109    let comp_flags = Default::default();
110
111    let temp_bv_dir = temp_dir.path().join("bv");
112    std::fs::create_dir(&temp_bv_dir)
113        .with_context(|| format!("Could not create {}", temp_bv_dir.display()))?;
114    BvComp::parallel_iter::<BE, _>(
115        target_dir,
116        arc_list_graphs,
117        num_nodes,
118        comp_flags,
119        &rayon::ThreadPoolBuilder::default()
120            .build()
121            .expect("Could not create BvComp thread pool"),
122        &temp_bv_dir,
123    )
124    .context("Could not build BVGraph from arcs")?;
125
126    drop(temp_dir); // Prevent early deletion
127
128    Ok(())
129}
130
131/// Writes `-labelled.labels`,  `-labelled.labeloffsets`, and returns the label width
132#[allow(clippy::too_many_arguments)]
133pub fn edge_labels<MPHF: LoadableSwhidMphf + Sync>(
134    sort_batch_size: usize,
135    partitions_per_thread: usize,
136    mph_basepath: PathBuf,
137    order: MappedPermutation,
138    label_name_hasher: LabelNameHasher,
139    num_nodes: usize,
140    dataset_dir: PathBuf,
141    allowed_node_types: &[crate::NodeType],
142    transposed: bool,
143    target_dir: &Path,
144) -> Result<usize> {
145    log::info!("Reading MPH");
146    let mph = MPHF::load(mph_basepath).context("Could not load MPHF")?;
147    log::info!("MPH loaded, sorting arcs");
148
149    let num_threads = num_cpus::get();
150    let num_partitions = num_threads * partitions_per_thread;
151    let nodes_per_partition = num_nodes.div_ceil(num_partitions);
152    let label_width = label_width(label_name_hasher.mphf());
153
154    // Avoid empty partitions at the end when there are very few nodes
155    let num_partitions = num_nodes.div_ceil(nodes_per_partition);
156
157    let mut pl = concurrent_progress_logger!(
158        display_memory = true,
159        item_name = "arc",
160        local_speed = true,
161        expected_updates = Some(
162            estimate_edge_count(&dataset_dir, allowed_node_types)
163                .context("Could not estimate edge count")? as usize,
164        ),
165    );
166    pl.start("Reading and sorting arcs");
167
168    // Sort in parallel in a bunch of SortPairs instances
169    let temp_dir = tempfile::tempdir().context("Could not get temporary_directory")?;
170    let sorted_arcs_path = temp_dir.path().join("sorted_arcs");
171    std::fs::create_dir(&sorted_arcs_path)
172        .with_context(|| format!("Could not create {}", sorted_arcs_path.display()))?;
173    let sorted_arcs = par_sort_arcs(
174        &sorted_arcs_path,
175        sort_batch_size,
176        iter_labeled_arcs(&dataset_dir, allowed_node_types, label_name_hasher)
177            .context("Could not open input files to read arcs")?
178            .map_with(pl.clone(), |thread_pl, (src, dst, label)| {
179                thread_pl.light_update();
180                (src, dst, label)
181            }),
182        num_partitions,
183        LabelSerializer { label_width },
184        LabelDeserializer { label_width },
185        |buffer, (src, dst, label)| {
186            let mut src = mph
187                .hash_str_array(&src)
188                .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&src)))?;
189            let mut dst = mph
190                .hash_str_array(&dst)
191                .ok_or_else(|| anyhow!("Unknown SWHID {:?}", String::from_utf8_lossy(&dst)))?;
192            if transposed {
193                (src, dst) = (dst, src);
194            }
195            assert!(src < num_nodes, "src node id is greater than {num_nodes}");
196            assert!(dst < num_nodes, "dst node id is greater than {num_nodes}");
197            let src = order.get(src).expect("Could not permute src");
198            let dst = order.get(dst).expect("Could not permute dst");
199            let partition_id = src / nodes_per_partition;
200            buffer.insert_labeled(partition_id, src, dst, label)?;
201            Ok(())
202        },
203    )?;
204    let total_labeled_arcs = pl.count();
205    pl.done();
206
207    let arc_list_graphs =
208        sorted_arcs
209            .into_iter()
210            .enumerate()
211            .map(|(partition_id, sorted_arcs_partition)| {
212                // no sorted_arcs_partition.dedup() on labels
213                ArcListGraph::new_labeled(num_nodes, sorted_arcs_partition)
214                    .iter_from(partition_id * nodes_per_partition)
215                    .take(nodes_per_partition)
216            });
217
218    let mut labels_path = target_dir.to_owned();
219    labels_path.as_mut_os_string().push("-labelled.labels");
220    let mut labels_writer =
221        BufBitWriter::<BE, _, _>::new(WordAdapter::<u8, _>::new(BufWriter::new(
222            File::create(&labels_path)
223                .with_context(|| format!("Could not create {}", labels_path.display()))?,
224        )));
225
226    let mut offsets_path = target_dir.to_owned();
227    offsets_path
228        .as_mut_os_string()
229        .push("-labelled.labeloffsets");
230    let mut offsets_writer =
231        BufBitWriter::<BE, _, _>::new(WordAdapter::<u8, _>::new(BufWriter::new(
232            File::create(&offsets_path)
233                .with_context(|| format!("Could not create {}", offsets_path.display()))?,
234        )));
235
236    let mut pl = progress_logger!(
237        display_memory = true,
238        item_name = "arc",
239        local_speed = true,
240        expected_updates = Some(total_labeled_arcs),
241    );
242    pl.start("Writing arc labels");
243
244    // Write offset (in *bits*) of the adjacency list of the first node
245    offsets_writer
246        .write_gamma(0)
247        .context("Could not write initial offset")?;
248
249    for partition_graph in arc_list_graphs {
250        for_!( (_src, successors) in partition_graph {
251            let mut offset_bits = 0u64;
252            for (_dst, labels) in &successors.group_by(|(dst, _label)| *dst) {
253                let mut labels: Vec<u64> = labels
254                    .flat_map(|(_dst, label)| label)
255                    .map(|label: NonMaxU64| u64::from(label))
256                    .collect();
257                labels.par_sort_unstable();
258                pl.update_with_count(labels.len());
259
260                // Write length-prefixed list of labels
261                offset_bits = offset_bits
262                    .checked_add(
263                        labels_writer
264                            .write_gamma(labels.len() as u64)
265                            .context("Could not write number of labels")?
266                            as u64,
267                    )
268                    .context("offset overflowed u64")?;
269                for label in labels {
270                    offset_bits = offset_bits
271                        .checked_add(
272                            labels_writer
273                                .write_bits(label, label_width)
274                                .context("Could not write label")?
275                                as u64,
276                        )
277                        .context("offset overflowed u64")?;
278                }
279            }
280
281            // Write offset of the end of this edge's label list (and start of the next one)
282            offsets_writer
283                .write_gamma(offset_bits)
284                .context("Could not write offset")?;
285        });
286    }
287
288    drop(
289        labels_writer
290            .into_inner()
291            .context("Could not flush labels writer")?
292            .into_inner()
293            .into_inner()
294            .context("Could not flush labels bufwriter")?,
295    );
296    drop(
297        offsets_writer
298            .into_inner()
299            .context("Could not close label offsets writer")?
300            .into_inner()
301            .into_inner()
302            .context("Could not flush label offsets bufwriter"),
303    );
304
305    pl.done();
306
307    drop(temp_dir); // Prevent early deletion
308
309    Ok(label_width)
310}
311
312fn label_width(mphf: &LabelNameMphf) -> usize {
313    use crate::labels::{
314        Branch, DirEntry, EdgeLabel, LabelNameId, Permission, UntypedEdgeLabel, Visit, VisitStatus,
315    };
316    let num_label_names = mphf.num_keys();
317
318    // Visit timestamps cannot be larger than the current timestamp
319    let max_visit_timestamp = SystemTime::now()
320        .duration_since(UNIX_EPOCH)
321        .expect("Could not get current time")
322        .as_secs();
323
324    let max_label = [
325        EdgeLabel::Branch(Branch::new(LabelNameId(num_label_names)).unwrap()),
326        EdgeLabel::DirEntry(DirEntry::new(Permission::None, LabelNameId(num_label_names)).unwrap()),
327        EdgeLabel::Visit(Visit::new(VisitStatus::Full, max_visit_timestamp).unwrap()),
328    ]
329    .into_iter()
330    .map(|label| UntypedEdgeLabel::from(label).0) // Convert to on-disk representation
331    .max()
332    .unwrap();
333    width_for_max_label_value(max_label)
334}
335
336/// Given the maximum label, returns the number of bits needed to represent labels
337fn width_for_max_label_value(max_label: u64) -> usize {
338    let num_label_values = max_label + 1; // because we want to represent all values from 0 to max_label inclusive
339    let num_values = num_label_values + 1; // because the max value is used to represent the lack of value (ie. None)
340    num_values
341        .next_power_of_two() // because checked_ilog2() rounds down
342        .checked_ilog2()
343        .unwrap() as usize
344}
345
346#[test]
347fn test_width_for_max_label_value() {
348    assert_eq!(width_for_max_label_value(0), 1); // values are 0 and None
349    assert_eq!(width_for_max_label_value(1), 2); // values are 0, 1, and None
350    assert_eq!(width_for_max_label_value(2), 2); // values are 0, 1, 2, and None
351    for i in 3..=6 {
352        assert_eq!(width_for_max_label_value(i), 3);
353    }
354    for i in 7..=14 {
355        assert_eq!(width_for_max_label_value(i), 4);
356    }
357    assert_eq!(width_for_max_label_value(15), 5);
358}
359
360#[derive(Clone, Copy)]
361struct LabelDeserializer {
362    label_width: usize,
363}
364#[derive(Clone, Copy)]
365struct LabelSerializer {
366    label_width: usize,
367}
368
369impl BitDeserializer<NE, BitReader> for LabelDeserializer {
370    type DeserType = Option<NonMaxU64>;
371    fn deserialize(
372        &self,
373        bitstream: &mut BitReader,
374    ) -> Result<Self::DeserType, <BitReader as BitRead<NE>>::Error> {
375        assert_ne!(self.label_width, 64, "label_width = 64 is not implemented");
376        let max = (1u64 << self.label_width) - 1; // Largest value that fits in the given width
377        let value = bitstream.read_bits(self.label_width)?;
378        assert!(value <= max, "Read unexpectedly large value");
379        if value == max {
380            Ok(None)
381        } else {
382            Ok(Some(NonMaxU64::try_from(value).unwrap()))
383        }
384    }
385}
386
387impl BitSerializer<NE, BitWriter> for LabelSerializer {
388    type SerType = Option<NonMaxU64>;
389    fn serialize(
390        &self,
391        value: &Self::SerType,
392        bitstream: &mut BitWriter,
393    ) -> Result<usize, <BitWriter as BitWrite<NE>>::Error> {
394        assert_ne!(self.label_width, 64, "label_width = 64 is not implemented");
395        let max = (1u64 << self.label_width) - 1;
396        match *value {
397            Some(value) => {
398                assert!(u64::from(value) < max, "value does not fit in label width");
399                bitstream.write_bits(u64::from(value), self.label_width)
400            }
401            None => bitstream.write_bits(max, self.label_width),
402        }
403    }
404}