swh_graph/utils/sort/
mod.rs1use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15use anyhow::{Context, Result};
16use dary_heap::{PeekMut, QuaternaryHeap};
17use dsi_progress_logger::{concurrent_progress_logger, ConcurrentProgressLog, ProgressLog};
18use itertools::Itertools;
19use rayon::prelude::*;
20use tempfile::TempDir;
21
22mod arcs;
23pub use arcs::{par_sort_arcs, PartitionedBuffer};
24mod strings;
25pub use strings::par_sort_strings;
26mod swhids;
27pub use swhids::par_sort_swhids;
28
29#[derive(Clone, Debug)]
31struct HeadTail<I: Iterator> {
32 head: I::Item,
33 tail: I,
34}
35
36impl<I: Iterator> PartialEq for HeadTail<I>
37where
38 I::Item: PartialEq,
39{
40 #[inline(always)]
41 fn eq(&self, other: &Self) -> bool {
42 self.head.eq(&other.head)
43 }
44}
45
46impl<I: Iterator> Eq for HeadTail<I> where I::Item: Eq {}
47
48impl<I: Iterator> PartialOrd for HeadTail<I>
49where
50 I::Item: PartialOrd,
51{
52 #[inline(always)]
53 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
54 self.head
55 .partial_cmp(&other.head)
56 .map(std::cmp::Ordering::reverse)
57 }
58}
59
60impl<I: Iterator> Ord for HeadTail<I>
61where
62 I::Item: Ord,
63{
64 #[inline(always)]
65 fn cmp(&self, other: &Self) -> std::cmp::Ordering {
66 self.head.cmp(&other.head).reverse()
67 }
68}
69
70struct KMergeIters<I: Iterator>
73where
74 I::Item: Eq + Ord,
75{
76 heap: QuaternaryHeap<HeadTail<I>>,
77}
78
79impl<I: Iterator> KMergeIters<I>
80where
81 I::Item: Eq + Ord,
82{
83 pub fn new(iters: impl IntoIterator<Item = I>) -> Self {
84 let iters = iters.into_iter();
85 let mut heap = QuaternaryHeap::with_capacity(iters.size_hint().1.unwrap_or(10));
86 for mut iter in iters {
87 if let Some(new_head) = iter.next() {
88 heap.push(HeadTail {
89 head: new_head,
90 tail: iter,
91 });
92 }
93 }
94 KMergeIters { heap }
95 }
96}
97
98impl<I: Iterator> Iterator for KMergeIters<I>
99where
100 I::Item: Eq + Ord,
101{
102 type Item = I::Item;
103
104 fn next(&mut self) -> Option<Self::Item> {
105 let mut head_tail = self.heap.peek_mut()?;
106
107 match head_tail.tail.next() {
108 None => Some(PeekMut::pop(head_tail).head),
109 Some(item) => Some(std::mem::replace(&mut head_tail.head, item)),
110 }
111 }
112}
113
114trait ParallelDeduplicatingExternalSorter<Item: Eq + Ord + Send>: Sync + Sized {
115 fn buffer_capacity(&self) -> usize;
116 fn sort_vec(&self, vec: &mut Vec<Item>) -> Result<()>;
117 fn serialize(path: PathBuf, items: impl Iterator<Item = Item>) -> Result<()>;
118 fn deserialize(path: PathBuf) -> Result<impl Iterator<Item = Item>>;
119
120 fn par_sort_dedup<Iter: ParallelIterator<Item = Item>>(
123 self,
124 iter: Iter,
125 mut pl: impl ProgressLog + Send,
126 ) -> Result<impl Iterator<Item = Item>> {
127 let unmerged_tmpdir =
128 tempfile::tempdir().context("Could not create temporary directory for sorting")?;
129 let (num_items_estimate, unmerged_paths) = self
130 .par_sort_unmerged(iter, &unmerged_tmpdir, &mut pl)
131 .context("Sorting items failed before merging")?;
132 pl.done();
133
134 let mut pl = concurrent_progress_logger!(
135 display_memory = true,
136 item_name = "item",
137 local_speed = true,
138 expected_updates = Some(num_items_estimate),
139 );
140 pl.start("Pre-merging");
141 let pre_merged_tmpdir =
142 tempfile::tempdir().context("Could not create temporary directory for sorting")?;
143 let (num_items_estimate, pre_merged_paths) = self
144 .pre_merge_sorted(unmerged_paths, &pre_merged_tmpdir, &mut pl)
145 .context("Could not pre-merge")?;
146 pl.done();
147 log::info!("Removing sorted but unmerged files...");
148 drop(unmerged_tmpdir); log::info!("Done");
150
151 let mut pl = concurrent_progress_logger!(
152 display_memory = true,
153 item_name = "item",
154 local_speed = true,
155 expected_updates = Some(num_items_estimate),
156 );
157 pl.start("Merging");
158 Self::merge_sorted(pre_merged_paths, pre_merged_tmpdir, pl).context("Could not merge")
159 }
160
161 #[doc(hidden)]
162 fn par_sort_unmerged<Iter: ParallelIterator<Item = Item>>(
168 &self,
169 iter: Iter,
170 tmpdir: &TempDir,
171 mut pl: &mut (impl ProgressLog + Send),
172 ) -> Result<(usize, Vec<PathBuf>)> {
173 let num_flushed_buffers = AtomicU64::new(0);
174 let mut buffer_paths = Vec::new();
175 let num_items_estimate = AtomicUsize::new(0);
176 {
177 let buffer_paths = Arc::new(Mutex::new(&mut buffer_paths));
178 let pl = Arc::new(Mutex::new(&mut pl));
179 let flush = |buf: &mut Vec<Item>| -> Result<()> {
180 if buf.is_empty() {
181 return Ok(());
183 }
184
185 self.sort_vec(buf).context("Could not sort buffer")?;
186
187 let buffer_id = num_flushed_buffers.fetch_add(1, Ordering::Relaxed);
188 let buf_path = tmpdir.path().join(format!("step1_{buffer_id}"));
189
190 let buf_len = buf.len();
191
192 Self::serialize(buf_path.clone(), buf.drain(0..).dedup())
194 .context("Could not serialize sorted list")?;
195 log::debug!("Wrote {} items to {}", buf.len(), buf_path.display());
196
197 pl.lock().unwrap().update_with_count(buf_len);
198 num_items_estimate.fetch_add(buf_len, Ordering::Relaxed);
199 buf.clear();
200 buffer_paths.lock().unwrap().push(buf_path);
201 Ok(())
202 };
203
204 iter.try_fold(
206 || Vec::with_capacity(self.buffer_capacity()),
207 |mut buf, item| -> Result<_> {
208 if let Some(previous_item) = buf.last() {
209 if *previous_item == item {
210 return Ok(buf);
212 }
213 }
214 if buf.len() >= buf.capacity() {
215 flush(&mut buf)?;
216 }
217 buf.push(item);
218 Ok(buf)
219 },
220 )
221 .try_for_each(|buf| flush(&mut buf?))?;
222 }
223 let num_items_estimate = num_items_estimate.into_inner();
224
225 Ok((num_items_estimate, buffer_paths))
226 }
227
228 fn pre_merge_sorted(
230 &self,
231 unmerged_paths: Vec<PathBuf>,
232 tmpdir: &TempDir,
233 pl: &mut impl ConcurrentProgressLog,
234 ) -> Result<(usize, Vec<PathBuf>)> {
235 let num_items_estimate = AtomicUsize::new(0);
236 let pre_merged_paths = std::thread::scope(|s| {
237 let tmpdir = &tmpdir;
238 let num_items_estimate = &num_items_estimate;
239 let chunks_size = unmerged_paths.len().div_ceil(num_cpus::get());
240 unmerged_paths
241 .into_iter()
242 .chunks(chunks_size)
243 .into_iter()
244 .map(|buffer_paths_chunk| buffer_paths_chunk.into_iter().collect::<Vec<_>>())
245 .enumerate()
246 .map(|(i, buffer_paths_chunk)| {
247 let mut thread_pl = pl.clone();
248 s.spawn(move || -> Result<PathBuf> {
249 let mut num_items_in_thread = 0;
250 let merged_items = KMergeIters::new(
251 buffer_paths_chunk
252 .into_iter()
253 .map(|path| {
254 Self::deserialize(path).context("Could not read sorted list")
255 })
256 .collect::<Result<Vec<_>>>()?
257 .into_iter(),
258 );
259 let merged_path = tmpdir.path().join(format!("step2_{i}"));
260 Self::serialize(
261 merged_path.clone(),
262 merged_items
263 .inspect(|_| thread_pl.light_update())
264 .dedup()
265 .inspect(|_| num_items_in_thread += 1),
266 )?;
267 log::debug!(
268 "Wrote {} items to {}",
269 num_items_in_thread,
270 merged_path.display()
271 );
272 num_items_estimate.fetch_add(num_items_in_thread, Ordering::Relaxed);
273 Ok(merged_path)
274 })
275 })
276 .collect::<Vec<_>>()
277 .into_iter()
278 .map(|handle| handle.join().expect("Pre-merge thread failed"))
279 .collect::<Result<Vec<_>>>()
280 })?;
281 let num_items_estimate = num_items_estimate.into_inner();
282
283 Ok((num_items_estimate, pre_merged_paths))
284 }
285
286 fn merge_sorted(
288 unmerged_paths: Vec<PathBuf>,
289 input_dir: TempDir,
290 mut pl: impl ConcurrentProgressLog,
291 ) -> Result<impl Iterator<Item = Item>> {
292 let buffers = unmerged_paths
293 .into_iter()
294 .map(|path| Self::deserialize(path).context("Could not read pre-merged buffer"))
295 .collect::<Result<Vec<_>>>()?;
296 drop(input_dir); Ok(KMergeIters::new(buffers)
298 .inspect(move |_| pl.light_update())
299 .dedup())
300 }
301}