webgraph/utils/par_sort_pairs.rs
1/*
2 * SPDX-FileCopyrightText: 2025 Inria
3 * SPDX-FileCopyrightText: 2025 Tommaso Fontana
4 *
5 * SPDX-License-Identifier: Apache-2.0 OR LGPL-2.1-or-later
6 */
7
8//! Facilities to sort in parallel externally (labelled) pairs of nodes returned
9//! by a [`ParallelIterator`], returning a [`SplitIters`] structure.
10//!
11//! The typical use of [`ParSortPairs`] is to sort (labelled) pairs of nodes
12//! representing a (labelled) graph; the resulting [`SplitIters`] structure can
13//! be then used to build a compressed representation of the graph using, for
14//! example,
15//! [`BvCompConfig::par_comp_lenders`](crate::graphs::bvgraph::BvCompConfig::par_comp_lenders).
16//!
17//! For example, when reading a graph from a file containing an arc list one
18//! typically is able to produce a parallel iterator of (labelled) pairs of
19//! nodes.
20//!
21//! If your pairs are emitted by a sequence of sequential iterators, consider
22//! using [`ParSortIters`](crate::utils::par_sort_iters::ParSortIters) instead.
23
24use std::num::NonZeroUsize;
25use std::path::Path;
26use std::sync::Arc;
27use std::sync::atomic::{AtomicUsize, Ordering};
28
29use anyhow::{Context, Result, ensure};
30use crossbeam_queue::SegQueue;
31use dsi_progress_logger::{ProgressLog, concurrent_progress_logger};
32use rayon::prelude::*;
33
34use crate::utils::DefaultBatchCodec;
35
36use super::MemoryUsage;
37use super::sort_pairs::KMergeIters;
38use super::{BatchCodec, CodecIter};
39use crate::utils::SplitIters;
40
41/// Takes a parallel iterator of (labelled) pairs as input, and turns them into
42/// a [`SplitIters`] structure which is suitable for
43/// [`BvCompConfig::par_comp_lenders`](crate::graphs::bvgraph::BvCompConfig::par_comp_lenders).
44///
45/// Note that batches will be memory-mapped. If you encounter OS-level errors
46/// using this class (e.g., `ENOMEM: Out of memory` under Linux), please review
47/// the limitations of your OS regarding memory-mapping (e.g.,
48/// `/proc/sys/vm/max_map_count` under Linux).
49///
50/// # Examples
51///
52/// ```
53/// use std::num::NonZeroUsize;
54///
55/// use dsi_bitstream::traits::BE;
56/// use lender::Lender;
57/// use rayon::prelude::*;
58/// use webgraph::traits::SequentialLabeling;
59/// use webgraph::graphs::bvgraph::{BvComp, CompFlags};
60/// use webgraph::graphs::arc_list_graph;
61/// use webgraph::utils::par_sort_pairs::ParSortPairs;
62///
63/// let num_partitions = 2;
64/// let num_nodes: usize = 5;
65/// let unsorted_pairs = vec![(1, 3), (3, 2), (2, 1), (1, 0), (0, 4)];
66///
67/// let pair_sorter = ParSortPairs::new(num_nodes)?
68/// .expected_num_pairs(unsorted_pairs.len())
69/// .num_partitions(NonZeroUsize::new(num_partitions).unwrap());
70///
71/// let split_iters = pair_sorter.sort(
72/// unsorted_pairs.par_iter().copied()
73/// )?;
74///
75/// assert_eq!(split_iters.boundaries.len(), num_partitions + 1);
76/// assert_eq!(split_iters.boundaries[0], 0);
77/// assert_eq!(split_iters.boundaries[2], num_nodes);
78///
79/// let collected: Vec<_> = split_iters.iters
80/// .into_vec()
81/// .into_iter()
82/// .map(|iter| iter.into_iter().collect::<Vec<_>>())
83/// .collect();
84///
85/// assert_eq!(
86/// collected,
87/// vec![
88/// vec![(0, 4), (1, 0), (1, 3), (2, 1)], // nodes 0, 1, and 2 are in partition 0
89/// vec![(3, 2)], // nodes 3 and 4 are in partition 1
90/// ],
91/// );
92///
93/// let bvcomp_tmp_dir = tempfile::tempdir()?;
94/// let bvcomp_out_dir = tempfile::tempdir()?;
95///
96/// // Convert pairs to labeled form and compress
97/// let split_iters = pair_sorter.sort(
98/// unsorted_pairs.par_iter().copied()
99/// )?;
100///
101/// // Convert to (node, lender) pairs using From trait
102/// let pairs: Vec<_> = split_iters.into();
103///
104/// // compress with a parallel iter
105/// BvComp::with_basename(bvcomp_out_dir.path().join("graph")).
106/// par_comp_lenders::<BE, _>(pairs, num_nodes)?;
107/// # Ok::<(), Box<dyn std::error::Error>>(())
108/// ```
109pub struct ParSortPairs {
110 num_nodes: usize,
111 expected_num_pairs: Option<usize>,
112 num_partitions: NonZeroUsize,
113 memory_usage: MemoryUsage,
114}
115
116impl ParSortPairs {
117 /// See [`try_sort`](ParSortPairs::try_sort).
118 pub fn sort(
119 &self,
120 pairs: impl ParallelIterator<Item = (usize, usize)>,
121 ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>>
122 {
123 self.try_sort::<std::convert::Infallible>(pairs.map(Ok))
124 }
125
126 /// Sorts the output of the provided parallel iterator, returning a
127 /// [`SplitIters`] structure.
128 pub fn try_sort<E: Into<anyhow::Error>>(
129 &self,
130 pairs: impl ParallelIterator<Item = Result<(usize, usize), E>>,
131 ) -> Result<SplitIters<impl IntoIterator<Item = (usize, usize), IntoIter: Clone + Send + Sync>>>
132 {
133 let split = self.try_sort_labeled(
134 &DefaultBatchCodec::default(),
135 pairs.map(|pair| -> Result<_> {
136 let (src, dst) = pair.map_err(Into::into)?;
137 Ok(((src, dst), ()))
138 }),
139 )?;
140
141 let iters_without_labels: Vec<_> = split
142 .iters
143 .into_vec()
144 .into_iter()
145 .map(|into_iter| into_iter.into_iter().map(|(pair, _)| pair))
146 .collect();
147
148 Ok(SplitIters::new(
149 split.boundaries,
150 iters_without_labels.into_boxed_slice(),
151 ))
152 }
153}
154
155impl ParSortPairs {
156 /// Creates a new [`ParSortPairs`] instance.
157 ///
158 /// The methods [`num_partitions`](ParSortPairs::num_partitions) (which sets
159 /// the number of iterators in the resulting [`SplitIters`]),
160 /// [`memory_usage`](ParSortPairs::memory_usage), and
161 /// [`expected_num_pairs`](ParSortPairs::expected_num_pairs) can be used to
162 /// customize the instance.
163 ///
164 /// This method will return an error if the number of CPUs
165 /// returned by [`num_cpus::get()`](num_cpus::get()) is zero.
166 pub fn new(num_nodes: usize) -> Result<Self> {
167 Ok(Self {
168 num_nodes,
169 expected_num_pairs: None,
170 num_partitions: NonZeroUsize::new(num_cpus::get()).context("zero CPUs")?,
171 memory_usage: MemoryUsage::default(),
172 })
173 }
174
175 /// Approximate number of pairs to be sorted.
176 ///
177 /// Used only for progress reporting.
178 pub fn expected_num_pairs(self, expected_num_pairs: usize) -> Self {
179 Self {
180 expected_num_pairs: Some(expected_num_pairs),
181 ..self
182 }
183 }
184
185 /// How many partitions to split the nodes into.
186 ///
187 /// This is the number of iterators in the resulting [`SplitIters`].
188 ///
189 /// Defaults to `num_cpus::get()`.
190 pub fn num_partitions(self, num_partitions: NonZeroUsize) -> Self {
191 Self {
192 num_partitions,
193 ..self
194 }
195 }
196
197 /// How much memory to use for in-memory sorts.
198 ///
199 /// Larger values yield faster merges (by reducing logarithmically the
200 /// number of batches to merge) but consume linearly more memory. We suggest
201 /// to set this parameter as large as possible, depending on the available
202 /// memory. The default is the default of [`MemoryUsage`].
203 pub fn memory_usage(self, memory_usage: MemoryUsage) -> Self {
204 Self {
205 memory_usage,
206 ..self
207 }
208 }
209
210 /// See [`try_sort_labeled`](ParSortPairs::try_sort_labeled).
211 ///
212 /// This is a convenience method for parallel iterators that cannot fail.
213 pub fn sort_labeled<C: BatchCodec, P: ParallelIterator<Item = ((usize, usize), C::Label)>>(
214 &self,
215 batch_codec: &C,
216 pairs: P,
217 ) -> Result<
218 SplitIters<
219 impl IntoIterator<Item = ((usize, usize), C::Label), IntoIter: Clone + Send + Sync>
220 + use<C, P>,
221 >,
222 > {
223 self.try_sort_labeled::<C, std::convert::Infallible, _>(batch_codec, pairs.map(Ok))
224 }
225
226 /// Sorts the output of the provided parallel iterator,
227 /// returning a [`SplitIters`] structure.
228 ///
229 /// This method accept as type parameter a
230 /// [`BitSerializer`](crate::traits::BitSerializer) and a
231 /// [`BitDeserializer`](crate::traits::BitDeserializer) that are used to
232 /// serialize and deserialize the labels.
233 ///
234 /// The bit deserializer must be [`Clone`] because we need one for each
235 /// `BatchIterator`, and there are possible
236 /// scenarios in which the deserializer might be stateful.
237 pub fn try_sort_labeled<
238 C: BatchCodec,
239 E: Into<anyhow::Error>,
240 P: ParallelIterator<Item = Result<((usize, usize), C::Label), E>>,
241 >(
242 &self,
243 batch_codec: &C,
244 pairs: P,
245 ) -> Result<
246 SplitIters<
247 impl IntoIterator<Item = ((usize, usize), C::Label), IntoIter: Clone + Send + Sync>
248 + use<C, E, P>,
249 >,
250 > {
251 let unsorted_pairs = pairs;
252
253 let num_partitions = self.num_partitions.into();
254 let num_buffers = rayon::current_num_threads() * num_partitions;
255 let batch_size = self
256 .memory_usage
257 .batch_size::<((usize, usize), C::Label)>()
258 .div_ceil(num_buffers);
259 let num_nodes_per_partition = self.num_nodes.div_ceil(num_partitions);
260
261 let mut pl = concurrent_progress_logger!(
262 display_memory = true,
263 item_name = "pair",
264 local_speed = true,
265 expected_updates = self.expected_num_pairs,
266 );
267 pl.start("Reading and sorting pairs");
268
269 let worker_id = AtomicUsize::new(0);
270 let presort_tmp_dir =
271 tempfile::tempdir().context("Could not create temporary directory")?;
272
273 let sorter_thread_states = Arc::new(SegQueue::<SorterThreadState<C>>::new());
274
275 // iterators in partitioned_presorted_pairs[partition_id] contain all pairs (src, dst, label)
276 // where num_nodes_per_partition*partition_id <= src < num_nodes_per_partition*(partition_id+1)
277 unsorted_pairs.try_for_each_init(
278 // Rayon calls this initializer on every sequential iterator inside the parallel
279 // iterator. Depending on how the parallel iterator was constructed (and if
280 // IndexedParallelIterator::with_min_len was not used) this can result in lots of:
281 // * tiny iterators, and we don't want to create as many tiny BatchIterators because that's
282 // extremely inefficient.
283 // * unsorted_buffers arrays with batch_size as capacity, but are mostly empty and sit
284 // in memory until we flush them
285 // Thus, we use ThreadLocal to have one SorterThreadState per thread, which is reused
286 // across multiple sequential iterators.
287 || {
288 let mut state = sorter_thread_states
289 .pop()
290 .unwrap_or_else(|| SorterThreadState {
291 worker_id: worker_id.fetch_add(1, Ordering::Relaxed),
292 unsorted_buffers: (0..num_partitions)
293 .map(|_| Vec::with_capacity(batch_size))
294 .collect(),
295 sorted_pairs: (0..num_partitions).map(|_| Vec::new()).collect(),
296 queue: None,
297 });
298
299 // So it adds itself back to the queue when dropped
300 state.queue = Some(Arc::clone(&sorter_thread_states));
301 (pl.clone(), state)
302 },
303 |(pl, thread_state), pair| -> Result<_> {
304 let ((src, dst), label) = pair.map_err(Into::into)?;
305 ensure!(
306 src < self.num_nodes,
307 "Expected {} nodes, but got node id {src}",
308 self.num_nodes
309 );
310 let partition_id = src / num_nodes_per_partition;
311 let SorterThreadState {
312 worker_id,
313 sorted_pairs,
314 unsorted_buffers,
315 queue: _,
316 } = thread_state;
317
318 let sorted_pairs = &mut sorted_pairs[partition_id];
319 let buf = &mut unsorted_buffers[partition_id];
320 if buf.len() >= buf.capacity() {
321 let buf_len = buf.len();
322 flush_buffer(
323 presort_tmp_dir.path(),
324 batch_codec,
325 *worker_id,
326 partition_id,
327 sorted_pairs,
328 buf,
329 )
330 .context("Could not flush buffer")?;
331 assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
332 pl.update_with_count(buf_len);
333 }
334
335 buf.push(((src, dst), label));
336 Ok(())
337 },
338 )?;
339
340 // Collect them into an iterable
341 let sorter_thread_states: Vec<_> = std::iter::repeat(())
342 .map_while(|()| sorter_thread_states.pop())
343 .collect();
344
345 // flush remaining buffers
346 let partitioned_presorted_pairs: Vec<Vec<CodecIter<C>>> = sorter_thread_states
347 .into_par_iter()
348 .map_with(pl.clone(), |pl, mut thread_state: SorterThreadState<C>| {
349 let mut sorted_pairs = Vec::new();
350 std::mem::swap(&mut sorted_pairs, &mut thread_state.sorted_pairs);
351 let mut unsorted_buffers = Vec::new();
352 std::mem::swap(&mut unsorted_buffers, &mut thread_state.unsorted_buffers);
353
354 let mut partitioned_sorted_pairs = Vec::with_capacity(num_partitions);
355 assert_eq!(sorted_pairs.len(), num_partitions);
356 assert_eq!(unsorted_buffers.len(), num_partitions);
357 for (partition_id, (mut sorted_pairs, mut buf)) in sorted_pairs.into_iter().zip(unsorted_buffers.into_iter()).enumerate() {
358 let buf_len = buf.len();
359 flush_buffer(presort_tmp_dir.path(), batch_codec, thread_state.worker_id, partition_id, &mut sorted_pairs, &mut buf).context("Could not flush buffer at the end")?;
360 assert!(buf.is_empty(), "flush_buffer did not empty the buffer");
361 pl.update_with_count(buf_len);
362
363 partitioned_sorted_pairs.push(sorted_pairs);
364 }
365 Ok(partitioned_sorted_pairs)
366 })
367 // At this point, the iterator could be collected into
368 // {worker_id -> {partition_id -> [iterators]}}
369 // ie. Vec<Vec<Vec<BatchIterator>>>>.
370 //
371 // Let's merge the {partition_id -> [iterators]} maps of each worker
372 .try_reduce(
373 || (0..num_partitions).map(|_| Vec::new()).collect(),
374 |mut pair_partitions1: Vec<Vec<CodecIter<C>>>, pair_partitions2: Vec<Vec<CodecIter<C>>>| -> Result<Vec<Vec<CodecIter<C>>>> {
375 assert_eq!(pair_partitions1.len(), num_partitions);
376 assert_eq!(pair_partitions2.len(), num_partitions);
377 for (partition1, partition2) in pair_partitions1.iter_mut().zip(pair_partitions2.into_iter()) {
378 partition1.extend(partition2.into_iter());
379 }
380 Ok(pair_partitions1)
381 })?
382 // At this point, the iterator was turned into
383 // {partition_id -> [iterators]}
384 // ie. Vec<Vec<BatchIterator>>>.
385 ;
386 pl.done();
387
388 // Build boundaries array: [0, nodes_per_partition, 2*nodes_per_partition, ..., num_nodes]
389 let boundaries: Vec<usize> = (0..=num_partitions)
390 .map(|i| (i * num_nodes_per_partition).min(self.num_nodes))
391 .collect();
392
393 // Build iterators array
394 let iters: Vec<_> = partitioned_presorted_pairs
395 .into_iter()
396 .map(|partition| {
397 // 'partition' contains N iterators that are not sorted with respect to each other.
398 // We merge them and turn them into a single sorted iterator.
399 KMergeIters::new(partition)
400 })
401 .collect();
402
403 Ok(SplitIters::new(
404 boundaries.into_boxed_slice(),
405 iters.into_boxed_slice(),
406 ))
407 }
408}
409
410struct SorterThreadState<C: BatchCodec> {
411 worker_id: usize,
412 sorted_pairs: Vec<Vec<CodecIter<C>>>,
413 unsorted_buffers: Vec<Vec<((usize, usize), C::Label)>>,
414 /// Where should this SorterThreadState put itself back to when dropped
415 queue: Option<Arc<SegQueue<Self>>>,
416}
417
418impl<C: BatchCodec> SorterThreadState<C> {
419 fn new_empty() -> Self {
420 SorterThreadState {
421 worker_id: usize::MAX,
422 sorted_pairs: Vec::new(),
423 unsorted_buffers: Vec::new(),
424 queue: None,
425 }
426 }
427}
428
429impl<C: BatchCodec> Drop for SorterThreadState<C> {
430 fn drop(&mut self) {
431 match self.queue.take() {
432 Some(queue) => {
433 // Put self back on the queue
434 let mut other_self = Self::new_empty();
435 std::mem::swap(&mut other_self, self);
436 queue.push(other_self);
437 }
438 None => {
439 assert!(
440 self.sorted_pairs.iter().all(|vec| vec.is_empty()),
441 "Dropped SorterThreadState without consuming sorted_pairs"
442 );
443 assert!(
444 self.unsorted_buffers.iter().all(|vec| vec.is_empty()),
445 "Dropped SorterThreadState without consuming unsorted_buffers"
446 );
447 }
448 }
449 }
450}
451
452pub(crate) fn flush_buffer<C: BatchCodec>(
453 tmp_dir: &Path,
454 batch_codec: &C,
455 worker_id: usize,
456 partition_id: usize,
457 sorted_pairs: &mut Vec<CodecIter<C>>,
458 buf: &mut Vec<((usize, usize), C::Label)>,
459) -> Result<()> {
460 let path = tmp_dir.join(format!(
461 "sorted_batch_{worker_id}_{partition_id}_{}",
462 sorted_pairs.len()
463 ));
464
465 // Safety check. It's not foolproof (TOCTOU) but should catch most programming errors.
466 ensure!(
467 !path.exists(),
468 "Can't create temporary file {}, it already exists",
469 path.display()
470 );
471
472 batch_codec
473 .encode_batch(&path, buf)
474 .with_context(|| format!("Could not write sorted batch to {}", path.display()))?;
475 sorted_pairs.push(
476 batch_codec
477 .decode_batch(&path)
478 .with_context(|| format!("Could not read sorted batch from {}", path.display()))?
479 .into_iter(),
480 );
481 buf.clear();
482 Ok(())
483}