Skip to main content

webgraph/utils/
par_sort_pairs.rs

1/*
2 * SPDX-FileCopyrightText: 2025 Inria
3 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8//! Facilities to sort in parallel externally (labelled) pairs of nodes returned
9//! by a [`ParallelIterator`], returning a [`SplitIters`] structure.
10//!
11//! The typical use of [`ParSortPairs`] is to sort (labelled) pairs of nodes
12//! representing a (labelled) graph; the resulting [`SplitIters`] structure can
13//! be then used to build a compressed representation of the graph using, for
14//! example,
15//! [`BvCompConfig::par_comp_lenders`](crate::graphs::bvgraph::BvCompConfig::par_comp_lenders).
16//!
17//! For example, when reading a graph from a file containing an arc list one
18//! typically is able to produce a parallel iterator of (labelled) pairs of
19//! nodes.
20//!
21//! If your pairs are emitted by a sequence of sequential iterators, consider
22//! using [`ParSortIters`](crate::utils::par_sort_iters::ParSortIters) instead.
23
24use std::num::NonZeroUsize;
25use std::path::Path;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicUsize, Ordering};
28
29use anyhow::{Context, Result, ensure};
30use crossbeam_queue::SegQueue;
31use dsi_progress_logger::{ProgressLog, concurrent_progress_logger};
32use rayon::prelude::*;
33
34use crate::utils::DefaultBatchCodec;
35
36use super::MemoryUsage;
37use super::sort_pairs::KMergeIters;
38use super::{BatchCodec, CodecIter};
39use crate::utils::SplitIters;
40
41/// Takes a parallel iterator of (labelled) pairs as input, and turns them into
42/// a [`SplitIters`] structure which is suitable for
43/// [`BvCompConfig::par_comp_lenders`](crate::graphs::bvgraph::BvCompConfig::par_comp_lenders).
44///
45/// Note that batches will be memory-mapped. If you encounter OS-level errors
46/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
47/// the limitations of your OS regarding memory-mapping (e.g.,
48/// `/proc/sys/vm/max_map_count` under Linux).
49///
50/// # Examples
51///
52/// ```
53/// use std::num::NonZeroUsize;
54///
55/// use dsi_bitstream::traits::BE;
56/// use lender::Lender;
57/// use rayon::prelude::*;
58/// use webgraph::traits::SequentialLabeling;
59/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
60/// use webgraph::graphs::arc_list_graph;
61/// use webgraph::utils::par_sort_pairs::ParSortPairs;
62///
63/// let num_partitions = 2;
64/// let num_nodes: usize = 5;
65/// let unsorted_pairs = vec![(1, 3), (3, 2), (2, 1), (1, 0), (0, 4)];
66///
67/// let pair_sorter = ParSortPairs::new(num_nodes)?
68///     .expected_num_pairs(unsorted_pairs.len())
69///     .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
70///
71/// let split_iters = pair_sorter.sort(
72///     unsorted_pairs.par_iter().copied()
73/// )?;
74///
75/// assert_eq!(split_iters.boundaries.len(), num_partitions + 1);
76/// assert_eq!(split_iters.boundaries[0], 0);
77/// assert_eq!(split_iters.boundaries[2], num_nodes);
78///
79/// let collected: Vec<_> = split_iters.iters
80///     .into_vec()
81///     .into_iter()
82///     .map(|iter| iter.into_iter().collect::<Vec<_>>())
83///     .collect();
84///
85/// assert_eq!(
86///     collected,
87///     vec![
88///         vec![(0, 4), (1, 0), (1, 3), (2, 1)], // nodes 0, 1, and 2 are in partition 0
89///         vec![(3, 2)], // nodes 3 and 4 are in partition 1
90///     ],
91/// );
92///
93/// let bvcomp_tmp_dir = tempfile::tempdir()?;
94/// let bvcomp_out_dir = tempfile::tempdir()?;
95///
96/// // Convert pairs to labeled form and compress
97/// let split_iters = pair_sorter.sort(
98///     unsorted_pairs.par_iter().copied()
99/// )?;
100///
101/// // Convert to (node, lender) pairs using From trait
102/// let pairs: Vec<_> = split_iters.into();
103///
104/// // compress with a parallel iter
105/// BvComp::with_basename(bvcomp_out_dir.path().join("graph")).
106///     par_comp_lenders::<BE, _>(pairs, num_nodes)?;
107/// # Ok::<(), Box<dyn std::error::Error>>(())
108/// ```
109pub struct ParSortPairs {
110    num_nodes: usize,
111    expected_num_pairs: Option<usize>,
112    num_partitions: NonZeroUsize,
113    memory_usage: MemoryUsage,
114}
115
116impl ParSortPairs {
117    /// See [`try_sort`](ParSortPairs::try_sort).
118    pub fn sort(
119        &self,
120        pairs: impl ParallelIterator<Item = (usize, usize)>,
121    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>>
122    {
123        self.try_sort::<std::convert::Infallible>(pairs.map(Ok))
124    }
125
126    /// Sorts the output of the provided parallel iterator, returning a
127    /// [`SplitIters`] structure.
128    pub fn try_sort<E: Into<anyhow::Error>>(
129        &self,
130        pairs: impl ParallelIterator<Item = Result<(usize, usize), E>>,
131    ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>>
132    {
133        let split = self.try_sort_labeled(
134            &DefaultBatchCodec::default(),
135            pairs.map(|pair| -> Result<_> {
136                let (src, dst) = pair.map_err(Into::into)?;
137                Ok(((src, dst), ()))
138            }),
139        )?;
140
141        let iters_without_labels: Vec<_> = split
142            .iters
143            .into_vec()
144            .into_iter()
145            .map(|into_iter| into_iter.into_iter().map(|(pair, _)| pair))
146            .collect();
147
148        Ok(SplitIters::new(
149            split.boundaries,
150            iters_without_labels.into_boxed_slice(),
151        ))
152    }
153}
154
155impl ParSortPairs {
156    /// Creates a new [`ParSortPairs`] instance.
157    ///
158    /// The methods [`num_partitions`](ParSortPairs::num_partitions) (which sets
159    /// the number of iterators in the resulting [`SplitIters`]),
160    /// [`memory_usage`](ParSortPairs::memory_usage), and
161    /// [`expected_num_pairs`](ParSortPairs::expected_num_pairs) can be used to
162    /// customize the instance.
163    ///
164    /// This method will return an error if the number of CPUs
165    /// returned by [`num_cpus::get()`](num_cpus::get()) is zero.
166    pub fn new(num_nodes: usize) -> Result<Self> {
167        Ok(Self {
168            num_nodes,
169            expected_num_pairs: None,
170            num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
171            memory_usage: MemoryUsage::default(),
172        })
173    }
174
175    /// Approximate number of pairs to be sorted.
176    ///
177    /// Used only for progress reporting.
178    pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
179        Self {
180            expected_num_pairs: Some(expected_num_pairs),
181            ..self
182        }
183    }
184
185    /// How many partitions to split the nodes into.
186    ///
187    /// This is the number of iterators in the resulting [`SplitIters`].
188    ///
189    /// Defaults to `num_cpus::get()`.
190    pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
191        Self {
192            num_partitions,
193            ..self
194        }
195    }
196
197    /// How much memory to use for in-memory sorts.
198    ///
199    /// Larger values yield faster merges (by reducing logarithmically the
200    /// number of batches to merge) but consume linearly more memory. We suggest
201    /// to set this parameter as large as possible, depending on the available
202    /// memory. The default is the default of [`MemoryUsage`].
203    pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
204        Self {
205            memory_usage,
206            ..self
207        }
208    }
209
210    /// See [`try_sort_labeled`](ParSortPairs::try_sort_labeled).
211    ///
212    /// This is a convenience method for parallel iterators that cannot fail.
213    pub fn sort_labeled<C: BatchCodec, P: ParallelIterator<Item = ((usize, usize), C::Label)>>(
214        &self,
215        batch_codec: &C,
216        pairs: P,
217    ) -> Result<
218        SplitIters<
219            impl IntoIterator<Item = ((usize, usize), C::Label), IntoIter: Clone + Send + Sync>
220            + use<C, P>,
221        >,
222    > {
223        self.try_sort_labeled::<C, std::convert::Infallible, _>(batch_codec, pairs.map(Ok))
224    }
225
226    /// Sorts the output of the provided parallel iterator,
227    /// returning a [`SplitIters`] structure.
228    ///
229    /// This method accept as type parameter a
230    /// [`BitSerializer`](crate::traits::BitSerializer) and a
231    /// [`BitDeserializer`](crate::traits::BitDeserializer) that are used to
232    /// serialize and deserialize the labels.
233    ///
234    /// The bit deserializer must be [`Clone`] because we need one for each
235    /// `BatchIterator`, and there are possible
236    /// scenarios in which the deserializer might be stateful.
237    pub fn try_sort_labeled<
238        C: BatchCodec,
239        E: Into<anyhow::Error>,
240        P: ParallelIterator<Item = Result<((usize, usize), C::Label), E>>,
241    >(
242        &self,
243        batch_codec: &C,
244        pairs: P,
245    ) -> Result<
246        SplitIters<
247            impl IntoIterator<Item = ((usize, usize), C::Label), IntoIter: Clone + Send + Sync>
248            + use<C, E, P>,
249        >,
250    > {
251        let unsorted_pairs = pairs;
252
253        let num_partitions = self.num_partitions.into();
254        let num_buffers = rayon::current_num_threads() * num_partitions;
255        let batch_size = self
256            .memory_usage
257            .batch_size::<((usize, usize), C::Label)>()
258            .div_ceil(num_buffers);
259        let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
260
261        let mut pl = concurrent_progress_logger!(
262            display_memory = true,
263            item_name = "pair",
264            local_speed = true,
265            expected_updates = self.expected_num_pairs,
266        );
267        pl.start("Reading and sorting pairs");
268
269        let worker_id = AtomicUsize::new(0);
270        let presort_tmp_dir =
271            tempfile::tempdir().context("Could not create temporary directory")?;
272
273        let sorter_thread_states = Arc::new(SegQueue::<SorterThreadState<C>>::new());
274
275        // iterators in partitioned_presorted_pairs[partition_id] contain all pairs (src, dst, label)
276        // where num_nodes_per_partition*partition_id <= src < num_nodes_per_partition*(partition_id+1)
277        unsorted_pairs.try_for_each_init(
278            // Rayon calls this initializer on every sequential iterator inside the parallel
279            // iterator. Depending on how the parallel iterator was constructed (and if
280            // IndexedParallelIterator::with_min_len was not used) this can result in lots of:
281            // * tiny iterators, and we don't want to create as many tiny BatchIterators because that's
282            //   extremely inefficient.
283            // * unsorted_buffers arrays with batch_size as capacity, but are mostly empty and sit
284            //   in memory until we flush them
285            // Thus, we use ThreadLocal to have one SorterThreadState per thread, which is reused
286            // across multiple sequential iterators.
287            || {
288                let mut state = sorter_thread_states
289                    .pop()
290                    .unwrap_or_else(|| SorterThreadState {
291                        worker_id: worker_id.fetch_add(1, Ordering::Relaxed),
292                        unsorted_buffers: (0..num_partitions)
293                            .map(|_| Vec::with_capacity(batch_size))
294                            .collect(),
295                        sorted_pairs: (0..num_partitions).map(|_| Vec::new()).collect(),
296                        queue: None,
297                    });
298
299                // So it adds itself back to the queue when dropped
300                state.queue = Some(Arc::clone(&sorter_thread_states));
301                (pl.clone(), state)
302            },
303            |(pl, thread_state), pair| -> Result<_> {
304                let ((src, dst), label) = pair.map_err(Into::into)?;
305                ensure!(
306                    src < self.num_nodes,
307                    "Expected {} nodes, but got node id {src}",
308                    self.num_nodes
309                );
310                let partition_id = src / num_nodes_per_partition;
311                let SorterThreadState {
312                    worker_id,
313                    sorted_pairs,
314                    unsorted_buffers,
315                    queue: _,
316                } = thread_state;
317
318                let sorted_pairs = &mut sorted_pairs[partition_id];
319                let buf = &mut unsorted_buffers[partition_id];
320                if buf.len() >= buf.capacity() {
321                    let buf_len = buf.len();
322                    flush_buffer(
323                        presort_tmp_dir.path(),
324                        batch_codec,
325                        *worker_id,
326                        partition_id,
327                        sorted_pairs,
328                        buf,
329                    )
330                    .context("Could not flush buffer")?;
331                    assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
332                    pl.update_with_count(buf_len);
333                }
334
335                buf.push(((src, dst), label));
336                Ok(())
337            },
338        )?;
339
340        // Collect them into an iterable
341        let sorter_thread_states: Vec<_> = std::iter::repeat(())
342            .map_while(|()| sorter_thread_states.pop())
343            .collect();
344
345        // flush remaining buffers
346        let partitioned_presorted_pairs: Vec<Vec<CodecIter<C>>> = sorter_thread_states
347        .into_par_iter()
348        .map_with(pl.clone(), |pl, mut thread_state: SorterThreadState<C>| {
349            let mut sorted_pairs = Vec::new();
350            std::mem::swap(&mut sorted_pairs, &mut thread_state.sorted_pairs);
351            let mut unsorted_buffers = Vec::new();
352            std::mem::swap(&mut unsorted_buffers, &mut thread_state.unsorted_buffers);
353
354            let mut partitioned_sorted_pairs = Vec::with_capacity(num_partitions);
355            assert_eq!(sorted_pairs.len(), num_partitions);
356            assert_eq!(unsorted_buffers.len(), num_partitions);
357            for (partition_id, (mut sorted_pairs, mut buf)) in sorted_pairs.into_iter().zip(unsorted_buffers.into_iter()).enumerate() {
358                let buf_len = buf.len();
359                flush_buffer(presort_tmp_dir.path(), batch_codec, thread_state.worker_id, partition_id, &mut sorted_pairs, &mut buf).context("Could not flush buffer at the end")?;
360                assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
361                pl.update_with_count(buf_len);
362
363                partitioned_sorted_pairs.push(sorted_pairs);
364            }
365            Ok(partitioned_sorted_pairs)
366        })
367        // At this point, the iterator could be collected into
368        // {worker_id -> {partition_id -> [iterators]}}
369        // ie. Vec<Vec<Vec<BatchIterator>>>>.
370        //
371        // Let's merge the {partition_id -> [iterators]} maps of each worker
372        .try_reduce(
373            || (0..num_partitions).map(|_| Vec::new()).collect(),
374            |mut pair_partitions1: Vec<Vec<CodecIter<C>>>, pair_partitions2: Vec<Vec<CodecIter<C>>>| -> Result<Vec<Vec<CodecIter<C>>>> {
375            assert_eq!(pair_partitions1.len(), num_partitions);
376            assert_eq!(pair_partitions2.len(), num_partitions);
377            for (partition1, partition2) in pair_partitions1.iter_mut().zip(pair_partitions2.into_iter()) {
378                partition1.extend(partition2.into_iter());
379            }
380            Ok(pair_partitions1)
381        })?
382        // At this point, the iterator was turned into
383        // {partition_id -> [iterators]}
384        // ie. Vec<Vec<BatchIterator>>>.
385        ;
386        pl.done();
387
388        // Build boundaries array: [0, nodes_per_partition, 2*nodes_per_partition, ..., num_nodes]
389        let boundaries: Vec<usize> = (0..=num_partitions)
390            .map(|i| (i * num_nodes_per_partition).min(self.num_nodes))
391            .collect();
392
393        // Build iterators array
394        let iters: Vec<_> = partitioned_presorted_pairs
395            .into_iter()
396            .map(|partition| {
397                // 'partition' contains N iterators that are not sorted with respect to each other.
398                // We merge them and turn them into a single sorted iterator.
399                KMergeIters::new(partition)
400            })
401            .collect();
402
403        Ok(SplitIters::new(
404            boundaries.into_boxed_slice(),
405            iters.into_boxed_slice(),
406        ))
407    }
408}
409
410struct SorterThreadState<C: BatchCodec> {
411    worker_id: usize,
412    sorted_pairs: Vec<Vec<CodecIter<C>>>,
413    unsorted_buffers: Vec<Vec<((usize, usize), C::Label)>>,
414    /// Where should this SorterThreadState put itself back to when dropped
415    queue: Option<Arc<SegQueue<Self>>>,
416}
417
418impl<C: BatchCodec> SorterThreadState<C> {
419    fn new_empty() -> Self {
420        SorterThreadState {
421            worker_id: usize::MAX,
422            sorted_pairs: Vec::new(),
423            unsorted_buffers: Vec::new(),
424            queue: None,
425        }
426    }
427}
428
429impl<C: BatchCodec> Drop for SorterThreadState<C> {
430    fn drop(&mut self) {
431        match self.queue.take() {
432            Some(queue) => {
433                // Put self back on the queue
434                let mut other_self = Self::new_empty();
435                std::mem::swap(&mut other_self, self);
436                queue.push(other_self);
437            }
438            None => {
439                assert!(
440                    self.sorted_pairs.iter().all(|vec| vec.is_empty()),
441                    "Dropped SorterThreadState without consuming sorted_pairs"
442                );
443                assert!(
444                    self.unsorted_buffers.iter().all(|vec| vec.is_empty()),
445                    "Dropped SorterThreadState without consuming unsorted_buffers"
446                );
447            }
448        }
449    }
450}
451
452pub(crate) fn flush_buffer<C: BatchCodec>(
453    tmp_dir: &Path,
454    batch_codec: &C,
455    worker_id: usize,
456    partition_id: usize,
457    sorted_pairs: &mut Vec<CodecIter<C>>,
458    buf: &mut Vec<((usize, usize), C::Label)>,
459) -> Result<()> {
460    let path = tmp_dir.join(format!(
461        "sorted_batch_{worker_id}_{partition_id}_{}",
462        sorted_pairs.len()
463    ));
464
465    // Safety check. It's not foolproof (TOCTOU) but should catch most programming errors.
466    ensure!(
467        !path.exists(),
468        "Can't create temporary file {}, it already exists",
469        path.display()
470    );
471
472    batch_codec
473        .encode_batch(&path, buf)
474        .with_context(|| format!("Could not write sorted batch to {}", path.display()))?;
475    sorted_pairs.push(
476        batch_codec
477            .decode_batch(&path)
478            .with_context(|| format!("Could not read sorted batch from {}", path.display()))?
479            .into_iter(),
480    );
481    buf.clear();
482    Ok(())
483}