swh_graph/utils/sort/
swhids.rs

1/*
2 * Copyright (C) 2024  The Software Heritage developers
3 * See the AUTHORS file at the top-level directory of this distribution
4 * License: GNU General Public License version 3, or any later version
5 * See top-level LICENSE file for more information
6 */
7
8//! Parallel sorting and deduplication for lists of SWHIDS that don't fit in RAM
9
10use std::fs::File;
11use std::io::{BufWriter, Write};
12use std::path::PathBuf;
13
14use anyhow::{ensure, Context, Result};
15use dsi_progress_logger::ProgressLog;
16use mmap_rs::MmapFlags;
17use rayon::prelude::*;
18use rdst::{RadixKey, RadixSort};
19
20use super::ParallelDeduplicatingExternalSorter;
21use crate::SWHID;
22
23#[derive(Copy, Clone)]
24struct SwhidExternalSorter {
25    buffer_size: usize,
26}
27
28impl ParallelDeduplicatingExternalSorter<SWHID> for SwhidExternalSorter {
29    fn buffer_capacity(&self) -> usize {
30        self.buffer_size.div_ceil(SWHID::LEVELS).next_power_of_two()
31    }
32
33    fn sort_vec(&self, vec: &mut Vec<SWHID>) -> Result<()> {
34        vec.radix_sort_unstable();
35        Ok(())
36    }
37
38    fn serialize(path: PathBuf, swhids: impl Iterator<Item = SWHID>) -> Result<()> {
39        let file = File::create_new(path)
40            .context("Could not create sorted file in temporary directory")?;
41        let mut writer = BufWriter::new(file);
42        for swhid in swhids {
43            writer
44                .write_all(&<[u8; SWHID::BYTES_SIZE]>::from(swhid))
45                .context("Could not write SWHID")?;
46        }
47        writer.flush().context("Could not flush sorted file")?;
48        Ok(())
49    }
50
51    fn deserialize(path: PathBuf) -> Result<impl Iterator<Item = SWHID>> {
52        let file_len = path
53            .metadata()
54            .with_context(|| format!("Could not stat {}", path.display()))?
55            .len();
56        ensure!(
57            file_len % (SWHID::BYTES_SIZE as u64) == 0,
58            "File size is not a multiple of a SWHID's binary size"
59        );
60        log::debug!("Reading {} bytes from {}", file_len, path.display());
61        let num_swhids = (file_len / (SWHID::BYTES_SIZE as u64)) as usize;
62        let file = std::fs::File::open(&path)
63            .with_context(|| format!("Could not open {}", path.display()))?;
64        let data = unsafe {
65            mmap_rs::MmapOptions::new(file_len as _)
66                .context("Could not initialize mmap")?
67                .with_flags(MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL)
68                .with_file(&file, 0)
69                .map()
70                .with_context(|| format!("Could not mmap {}", path.display()))?
71        };
72        Ok((0..num_swhids).map(move |i| {
73            let buf = &data[i * SWHID::BYTES_SIZE..(i + 1) * SWHID::BYTES_SIZE];
74            let buf = <[u8; SWHID::BYTES_SIZE]>::try_from(buf).unwrap();
75            SWHID::try_from(buf).unwrap()
76        }))
77    }
78}
79
80pub fn par_sort_swhids<Iter: ParallelIterator<Item = SWHID>>(
81    iter: Iter,
82    pl: impl ProgressLog + Send,
83    buffer_size: usize,
84) -> Result<impl Iterator<Item = SWHID>> {
85    SwhidExternalSorter { buffer_size }.par_sort_dedup(iter, pl)
86}