swh_graph/utils/sort/
swhids.rs1use std::fs::File;
11use std::io::{BufWriter, Write};
12use std::path::PathBuf;
13
14use anyhow::{ensure, Context, Result};
15use dsi_progress_logger::ProgressLog;
16use mmap_rs::MmapFlags;
17use rayon::prelude::*;
18use rdst::{RadixKey, RadixSort};
19
20use super::ParallelDeduplicatingExternalSorter;
21use crate::SWHID;
22
23#[derive(Copy, Clone)]
24struct SwhidExternalSorter {
25 buffer_size: usize,
26}
27
28impl ParallelDeduplicatingExternalSorter<SWHID> for SwhidExternalSorter {
29 fn buffer_capacity(&self) -> usize {
30 self.buffer_size.div_ceil(SWHID::LEVELS).next_power_of_two()
31 }
32
33 fn sort_vec(&self, vec: &mut Vec<SWHID>) -> Result<()> {
34 vec.radix_sort_unstable();
35 Ok(())
36 }
37
38 fn serialize(path: PathBuf, swhids: impl Iterator<Item = SWHID>) -> Result<()> {
39 let file = File::create_new(path)
40 .context("Could not create sorted file in temporary directory")?;
41 let mut writer = BufWriter::new(file);
42 for swhid in swhids {
43 writer
44 .write_all(&<[u8; SWHID::BYTES_SIZE]>::from(swhid))
45 .context("Could not write SWHID")?;
46 }
47 writer.flush().context("Could not flush sorted file")?;
48 Ok(())
49 }
50
51 fn deserialize(path: PathBuf) -> Result<impl Iterator<Item = SWHID>> {
52 let file_len = path
53 .metadata()
54 .with_context(|| format!("Could not stat {}", path.display()))?
55 .len();
56 ensure!(
57 file_len % (SWHID::BYTES_SIZE as u64) == 0,
58 "File size is not a multiple of a SWHID's binary size"
59 );
60 log::debug!("Reading {} bytes from {}", file_len, path.display());
61 let num_swhids = (file_len / (SWHID::BYTES_SIZE as u64)) as usize;
62 let file = std::fs::File::open(&path)
63 .with_context(|| format!("Could not open {}", path.display()))?;
64 let data = unsafe {
65 mmap_rs::MmapOptions::new(file_len as _)
66 .context("Could not initialize mmap")?
67 .with_flags(MmapFlags::TRANSPARENT_HUGE_PAGES | MmapFlags::SEQUENTIAL)
68 .with_file(&file, 0)
69 .map()
70 .with_context(|| format!("Could not mmap {}", path.display()))?
71 };
72 Ok((0..num_swhids).map(move |i| {
73 let buf = &data[i * SWHID::BYTES_SIZE..(i + 1) * SWHID::BYTES_SIZE];
74 let buf = <[u8; SWHID::BYTES_SIZE]>::try_from(buf).unwrap();
75 SWHID::try_from(buf).unwrap()
76 }))
77 }
78}
79
80pub fn par_sort_swhids<Iter: ParallelIterator<Item = SWHID>>(
81 iter: Iter,
82 pl: impl ProgressLog + Send,
83 buffer_size: usize,
84) -> Result<impl Iterator<Item = SWHID>> {
85 SwhidExternalSorter { buffer_size }.par_sort_dedup(iter, pl)
86}