swh_graph/utils/sort/
mod.rs

1/*
2 * Copyright (C) 2024  The Software Heritage developers
3 * See the AUTHORS file at the top-level directory of this distribution
4 * License: GNU General Public License version 3, or any later version
5 * See top-level LICENSE file for more information
6 */
7
8//! Parallel sorting and deduplication for data that doesn't fit in RAM
9// Adapted from https://archive.softwareheritage.org/swh:1:cnt:d5129fef934309da995a8895ba9509a6faae0bba;origin=https://github.com/vigna/webgraph-rs;visit=swh:1:snp:76b76a6b68240ad1ec27aed81f7cc30441b69d7c;anchor=swh:1:rel:ef30092122d472899fdfa361e784fc1e04495dab;path=/src/utils/sort_pairs.rs;lines=410-512
10
11use std::path::PathBuf;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use std::sync::{Arc, Mutex};
14
15use anyhow::{Context, Result};
16use dary_heap::{PeekMut, QuaternaryHeap};
17use dsi_progress_logger::{concurrent_progress_logger, ConcurrentProgressLog, ProgressLog};
18use itertools::Itertools;
19use rayon::prelude::*;
20use tempfile::TempDir;
21
22mod arcs;
23pub use arcs::{par_sort_arcs, PartitionedBuffer};
24mod strings;
25pub use strings::par_sort_strings;
26mod swhids;
27pub use swhids::par_sort_swhids;
28
29/// A pair of (Item, Iterator<Item=Item>) where comparison is the **reverse** of the head
30#[derive(Clone, Debug)]
31struct HeadTail<I: Iterator> {
32    head: I::Item,
33    tail: I,
34}
35
36impl<I: Iterator> PartialEq for HeadTail<I>
37where
38    I::Item: PartialEq,
39{
40    #[inline(always)]
41    fn eq(&self, other: &Self) -> bool {
42        self.head.eq(&other.head)
43    }
44}
45
46impl<I: Iterator> Eq for HeadTail<I> where I::Item: Eq {}
47
48impl<I: Iterator> PartialOrd for HeadTail<I>
49where
50    I::Item: PartialOrd,
51{
52    #[inline(always)]
53    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
54        self.head
55            .partial_cmp(&other.head)
56            .map(std::cmp::Ordering::reverse)
57    }
58}
59
60impl<I: Iterator> Ord for HeadTail<I>
61where
62    I::Item: Ord,
63{
64    #[inline(always)]
65    fn cmp(&self, other: &Self) -> std::cmp::Ordering {
66        self.head.cmp(&other.head).reverse()
67    }
68}
69
70/// A structure using a [quaternary heap](dary_heap::QuaternaryHeap) to merge sorted iterators,
71/// and yields in increasing order.
72struct KMergeIters<I: Iterator>
73where
74    I::Item: Eq + Ord,
75{
76    heap: QuaternaryHeap<HeadTail<I>>,
77}
78
79impl<I: Iterator> KMergeIters<I>
80where
81    I::Item: Eq + Ord,
82{
83    pub fn new(iters: impl IntoIterator<Item = I>) -> Self {
84        let iters = iters.into_iter();
85        let mut heap = QuaternaryHeap::with_capacity(iters.size_hint().1.unwrap_or(10));
86        for mut iter in iters {
87            if let Some(new_head) = iter.next() {
88                heap.push(HeadTail {
89                    head: new_head,
90                    tail: iter,
91                });
92            }
93        }
94        KMergeIters { heap }
95    }
96}
97
98impl<I: Iterator> Iterator for KMergeIters<I>
99where
100    I::Item: Eq + Ord,
101{
102    type Item = I::Item;
103
104    fn next(&mut self) -> Option<Self::Item> {
105        let mut head_tail = self.heap.peek_mut()?;
106
107        match head_tail.tail.next() {
108            None => Some(PeekMut::pop(head_tail).head),
109            Some(item) => Some(std::mem::replace(&mut head_tail.head, item)),
110        }
111    }
112}
113
114trait ParallelDeduplicatingExternalSorter<Item: Eq + Ord + Send>: Sync + Sized {
115    fn buffer_capacity(&self) -> usize;
116    fn sort_vec(&self, vec: &mut Vec<Item>) -> Result<()>;
117    fn serialize(path: PathBuf, items: impl Iterator<Item = Item>) -> Result<()>;
118    fn deserialize(path: PathBuf) -> Result<impl Iterator<Item = Item>>;
119
120    /// Takes an iterator if items, and returns an iterator of the same items,
121    /// sorted and deduplicated
122    fn par_sort_dedup<Iter: ParallelIterator<Item = Item>>(
123        self,
124        iter: Iter,
125        mut pl: impl ProgressLog + Send,
126    ) -> Result<impl Iterator<Item = Item>> {
127        let unmerged_tmpdir =
128            tempfile::tempdir().context("Could not create temporary directory for sorting")?;
129        let (num_items_estimate, unmerged_paths) = self
130            .par_sort_unmerged(iter, &unmerged_tmpdir, &mut pl)
131            .context("Sorting items failed before merging")?;
132        pl.done();
133
134        let mut pl = concurrent_progress_logger!(
135            display_memory = true,
136            item_name = "item",
137            local_speed = true,
138            expected_updates = Some(num_items_estimate),
139        );
140        pl.start("Pre-merging");
141        let pre_merged_tmpdir =
142            tempfile::tempdir().context("Could not create temporary directory for sorting")?;
143        let (num_items_estimate, pre_merged_paths) = self
144            .pre_merge_sorted(unmerged_paths, &pre_merged_tmpdir, &mut pl)
145            .context("Could not pre-merge")?;
146        pl.done();
147        log::info!("Removing sorted but unmerged files...");
148        drop(unmerged_tmpdir); // Free disk space
149        log::info!("Done");
150
151        let mut pl = concurrent_progress_logger!(
152            display_memory = true,
153            item_name = "item",
154            local_speed = true,
155            expected_updates = Some(num_items_estimate),
156        );
157        pl.start("Merging");
158        Self::merge_sorted(pre_merged_paths, pre_merged_tmpdir, pl).context("Could not merge")
159    }
160
161    #[doc(hidden)]
162    /// Given an iterator of items, returns a list of paths to files, each containing
163    /// a sorted list of items.
164    ///
165    /// The files are not sorted which respect to each other, so they need to be merged
166    /// before use.
167    fn par_sort_unmerged<Iter: ParallelIterator<Item = Item>>(
168        &self,
169        iter: Iter,
170        tmpdir: &TempDir,
171        mut pl: &mut (impl ProgressLog + Send),
172    ) -> Result<(usize, Vec<PathBuf>)> {
173        let num_flushed_buffers = AtomicU64::new(0);
174        let mut buffer_paths = Vec::new();
175        let num_items_estimate = AtomicUsize::new(0);
176        {
177            let buffer_paths = Arc::new(Mutex::new(&mut buffer_paths));
178            let pl = Arc::new(Mutex::new(&mut pl));
179            let flush = |buf: &mut Vec<Item>| -> Result<()> {
180                if buf.is_empty() {
181                    // Nothing to write; and mmaping a 0 bytes file would error
182                    return Ok(());
183                }
184
185                self.sort_vec(buf).context("Could not sort buffer")?;
186
187                let buffer_id = num_flushed_buffers.fetch_add(1, Ordering::Relaxed);
188                let buf_path = tmpdir.path().join(format!("step1_{buffer_id}"));
189
190                let buf_len = buf.len();
191
192                // early deduplication to save some space
193                Self::serialize(buf_path.clone(), buf.drain(0..).dedup())
194                    .context("Could not serialize sorted list")?;
195                log::debug!("Wrote {} items to {}", buf.len(), buf_path.display());
196
197                pl.lock().unwrap().update_with_count(buf_len);
198                num_items_estimate.fetch_add(buf_len, Ordering::Relaxed);
199                buf.clear();
200                buffer_paths.lock().unwrap().push(buf_path);
201                Ok(())
202            };
203
204            // Sort in parallel
205            iter.try_fold(
206                || Vec::with_capacity(self.buffer_capacity()),
207                |mut buf, item| -> Result<_> {
208                    if let Some(previous_item) = buf.last() {
209                        if *previous_item == item {
210                            // early deduplication to save some sorting time
211                            return Ok(buf);
212                        }
213                    }
214                    if buf.len() >= buf.capacity() {
215                        flush(&mut buf)?;
216                    }
217                    buf.push(item);
218                    Ok(buf)
219                },
220            )
221            .try_for_each(|buf| flush(&mut buf?))?;
222        }
223        let num_items_estimate = num_items_estimate.into_inner();
224
225        Ok((num_items_estimate, buffer_paths))
226    }
227
228    /// Turns a long list of sorted lists into a shorter list (one per thread)
229    fn pre_merge_sorted(
230        &self,
231        unmerged_paths: Vec<PathBuf>,
232        tmpdir: &TempDir,
233        pl: &mut impl ConcurrentProgressLog,
234    ) -> Result<(usize, Vec<PathBuf>)> {
235        let num_items_estimate = AtomicUsize::new(0);
236        let pre_merged_paths = std::thread::scope(|s| {
237            let tmpdir = &tmpdir;
238            let num_items_estimate = &num_items_estimate;
239            let chunks_size = unmerged_paths.len().div_ceil(num_cpus::get());
240            unmerged_paths
241                .into_iter()
242                .chunks(chunks_size)
243                .into_iter()
244                .map(|buffer_paths_chunk| buffer_paths_chunk.into_iter().collect::<Vec<_>>())
245                .enumerate()
246                .map(|(i, buffer_paths_chunk)| {
247                    let mut thread_pl = pl.clone();
248                    s.spawn(move || -> Result<PathBuf> {
249                        let mut num_items_in_thread = 0;
250                        let merged_items = KMergeIters::new(
251                            buffer_paths_chunk
252                                .into_iter()
253                                .map(|path| {
254                                    Self::deserialize(path).context("Could not read sorted list")
255                                })
256                                .collect::<Result<Vec<_>>>()?
257                                .into_iter(),
258                        );
259                        let merged_path = tmpdir.path().join(format!("step2_{i}"));
260                        Self::serialize(
261                            merged_path.clone(),
262                            merged_items
263                                .inspect(|_| thread_pl.light_update())
264                                .dedup()
265                                .inspect(|_| num_items_in_thread += 1),
266                        )?;
267                        log::debug!(
268                            "Wrote {} items to {}",
269                            num_items_in_thread,
270                            merged_path.display()
271                        );
272                        num_items_estimate.fetch_add(num_items_in_thread, Ordering::Relaxed);
273                        Ok(merged_path)
274                    })
275                })
276                .collect::<Vec<_>>()
277                .into_iter()
278                .map(|handle| handle.join().expect("Pre-merge thread failed"))
279                .collect::<Result<Vec<_>>>()
280        })?;
281        let num_items_estimate = num_items_estimate.into_inner();
282
283        Ok((num_items_estimate, pre_merged_paths))
284    }
285
286    /// Merge a list of sorted lists into a single sorted list
287    fn merge_sorted(
288        unmerged_paths: Vec<PathBuf>,
289        input_dir: TempDir,
290        mut pl: impl ConcurrentProgressLog,
291    ) -> Result<impl Iterator<Item = Item>> {
292        let buffers = unmerged_paths
293            .into_iter()
294            .map(|path| Self::deserialize(path).context("Could not read pre-merged buffer"))
295            .collect::<Result<Vec<_>>>()?;
296        drop(input_dir); // Prevent deletion before we opened the input files
297        Ok(KMergeIters::new(buffers)
298            .inspect(move |_| pl.light_update())
299            .dedup())
300    }
301}