swh_graph/utils/sort/
strings.rs1use std::io::{Read, Write};
12use std::path::PathBuf;
13
14use anyhow::{ensure, Context, Result};
15use dsi_progress_logger::ProgressLog;
16use rayon::prelude::*;
17
18use super::ParallelDeduplicatingExternalSorter;
19
20const AVERAGE_STRING_LENGTH: usize = 64;
22
23type Bytestring = Box<[u8]>;
24
25#[derive(Copy, Clone)]
26struct BytestringExternalSorter {
27 buffer_size: usize,
28}
29
30impl ParallelDeduplicatingExternalSorter<Bytestring> for BytestringExternalSorter {
31 fn buffer_capacity(&self) -> usize {
32 self.buffer_size
33 .div_ceil(AVERAGE_STRING_LENGTH)
34 .next_power_of_two()
35 }
36
37 #[allow(clippy::get_first)]
38 fn sort_vec(&self, vec: &mut Vec<Bytestring>) -> Result<()> {
39 let mut partitions: Vec<_> = (0..65536)
46 .map(|_| Vec::with_capacity(vec.len().div_ceil(65536)))
47 .collect();
48
49 for string in vec.drain(0..) {
51 let partition_id = ((string.get(0).copied().unwrap_or(0u8) as usize) << 8)
52 | string.get(1).copied().unwrap_or(0u8) as usize;
53 partitions[partition_id].push(string);
54 }
55
56 partitions
60 .par_iter_mut()
61 .for_each(|partition| partition.sort_unstable());
62
63 for partition in partitions {
64 vec.extend(partition);
65 }
66 Ok(())
67 }
68
69 fn serialize(path: PathBuf, strings: impl Iterator<Item = Bytestring>) -> Result<()> {
70 let file = std::fs::File::create_new(&path)
71 .with_context(|| format!("Could not create {}", path.display()))?;
72 let compression_level = 3;
73 let mut encoder = zstd::stream::write::Encoder::new(file, compression_level)
74 .with_context(|| format!("Could not create ZSTD encoder for {}", path.display()))?;
75 for string in strings {
76 let len: u32 = string
77 .len()
78 .try_into()
79 .context("String is 2^32 bytes or longer")?;
80 ensure!(len != u32::MAX, "String is 2^32 -1 bytes long");
81 encoder
82 .write_all(&len.to_ne_bytes())
83 .with_context(|| format!("Could not write string to {}", path.display()))?;
84 encoder
85 .write_all(&string)
86 .with_context(|| format!("Could not write string to {}", path.display()))?;
87 }
88 encoder
90 .write_all(&u32::MAX.to_ne_bytes())
91 .with_context(|| format!("Could not write string to {}", path.display()))?;
92
93 encoder
94 .finish()
95 .with_context(|| format!("Could not flush to {}", path.display()))?;
96 Ok(())
97 }
98
99 fn deserialize(path: PathBuf) -> Result<impl Iterator<Item = Bytestring>> {
100 let file = std::fs::File::open(&path)
101 .with_context(|| format!("Could not open {}", path.display()))?;
102 let mut decoder =
103 zstd::stream::read::Decoder::new(file).context("Could not decompress sorted file")?;
104 Ok(std::iter::repeat(()).map_while(move |()| {
105 let mut buf = [0u8; 4];
106 decoder
107 .read_exact(&mut buf)
108 .expect("Could not read string size");
109 let size = u32::from_ne_bytes(buf);
110 if size == u32::MAX {
111 return None;
113 }
114 let mut line = vec![0; size.try_into().unwrap()].into_boxed_slice();
115 decoder
116 .read_exact(&mut line)
117 .expect("Could not read string");
118 Some(line)
119 }))
120 }
121}
122
123pub fn par_sort_strings<Iter: ParallelIterator<Item = Bytestring>>(
124 iter: Iter,
125 pl: impl ProgressLog + Send,
126 buffer_size: usize,
127) -> Result<impl Iterator<Item = Bytestring>> {
128 BytestringExternalSorter { buffer_size }.par_sort_dedup(iter, pl)
129}