compressed_intvec/seq/parallel.rs
1//! Parallel operations for [`SeqVec`].
2//!
3//! This module provides parallel implementations for [`SeqVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate sequence retrieval and decompression.
7//!
8//! # API Overview
9//!
10//! | Method | Allocation | Returns | Best For |
11//! |--------|------------|---------|----------|
12//! | [`par_iter`] | `Vec<T>` per sequence | `impl ParallelIterator<Item = Vec<T>>` | Retaining decoded sequences |
13//! | [`par_for_each`] | None | `Vec<R>` | Consumptive ops (sum, count, fold) |
14//! | [`par_into_vecs`] | `Vec<T>` per sequence | `Vec<Vec<T>>` | Bulk decode with ownership transfer |
15//! | [`par_decode_many`] | `Vec<T>` per index | `Vec<Vec<T>>` | Sparse random access |
16//!
17//! # Performance Considerations
18//!
19//! Parallel iteration introduces thread dispatch overhead and reduces cache
20//! locality compared to sequential iteration. For small datasets or very fast codecs, sequential [`iter`](super::SeqVec::iter)
21//! is typically faster.
22//!
23//! # SeqVec-Specific Methods
24//!
25//! The [`par_for_each`] family of methods is unique to [`SeqVec`] and does not
26//! exist in [`FixedVec`](crate::fixed::FixedVec) or [`VarVec`](crate::variable::VarVec).
27//! This is because:
28//!
29//! - `FixedVec::par_iter()` and `VarVec::par_iter()` yield `T` directly, so
30//! standard Rayon combinators (`.map()`, `.for_each()`) provide zero-allocation
31//! processing.
32//! - `SeqVec::par_iter()` must yield `Vec<T>` (materialized sequences) due to
33//! Rayon's ownership model. The `par_for_each` family provides a zero-allocation
34//! path by passing [`SeqIter`] directly to the closure.
35//!
36//! [Rayon]: https://github.com/rayon-rs/rayon
37//! [`SeqVec`]: super::SeqVec
38//! [`SeqIter`]: super::iter::SeqIter
39//! [`par_iter`]: super::SeqVec::par_iter
40//! [`par_for_each`]: super::SeqVec::par_for_each
41//! [`par_into_vecs`]: super::SeqVec::par_into_vecs
42//! [`par_decode_many`]: super::SeqVec::par_decode_many
43
44use super::iter::SeqIter;
45use super::{SeqVec, SeqVecBitReader, SeqVecError};
46use crate::variable::traits::Storable;
47use dsi_bitstream::dispatch::CodesRead;
48use dsi_bitstream::prelude::{BitRead, BitSeek, Endianness};
49use rayon::prelude::{
50 IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
51 IntoParallelRefMutIterator, ParallelIterator,
52};
53
54impl<T, E, B> SeqVec<T, E, B>
55where
56 T: Storable + Send + Sync,
57 E: Endianness + Send + Sync,
58 B: AsRef<[u64]> + Send + Sync,
59 for<'a> SeqVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
60 + CodesRead<E>
61 + BitSeek<Error = core::convert::Infallible>
62 + Send,
63{
64 /// Returns a parallel iterator over all sequences, materializing each as `Vec<T>`.
65 ///
66 /// This method uses Rayon to decompress and collect all sequences in
67 /// parallel. Each sequence is fully decoded into a `Vec<T>` by its
68 /// assigned thread before being yielded.
69 ///
70 /// # Performance
71 ///
72 /// Parallelization is beneficial when the dataset contains enough sequences and those
73 /// sequences are sufficiently large to amortize thread overhead.
74 ///
75 /// For consumptive operations (sum, count, fold) where the decoded data
76 /// is not retained, prefer [`par_for_each`](Self::par_for_each) which
77 /// avoids allocation overhead.
78 ///
79 /// For small datasets or very fast codecs, the sequential [`iter`](Self::iter)
80 /// method may be faster due to better cache locality.
81 ///
82 /// # Examples
83 ///
84 /// ```
85 /// # #[cfg(feature = "parallel")]
86 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
87 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
88 /// use rayon::prelude::*;
89 ///
90 /// let sequences: &[&[u32]] = &[
91 /// &[1, 2, 3],
92 /// &[10, 20],
93 /// &[100, 200, 300],
94 /// ];
95 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
96 ///
97 /// // Collect all sequences in parallel
98 /// let all_sequences: Vec<Vec<u32>> = vec.par_iter().collect();
99 ///
100 /// assert_eq!(all_sequences.len(), 3);
101 /// assert_eq!(all_sequences[0], vec![1, 2, 3]);
102 /// # Ok(())
103 /// # }
104 /// # #[cfg(not(feature = "parallel"))]
105 /// # fn main() {}
106 /// ```
107 pub fn par_iter(&self) -> impl ParallelIterator<Item = Vec<T>> + '_ {
108 let num_sequences = self.num_sequences();
109
110 (0..num_sequences).into_par_iter().map_init(
111 || self.reader(),
112 move |reader, i| {
113 // Pre-allocate buffer when sequence length is known.
114 let capacity = self
115 .seq_lengths
116 .as_ref()
117 .map(|l| unsafe { l.get_unchecked(i) as usize })
118 .unwrap_or(0);
119
120 let mut buf = Vec::with_capacity(capacity);
121 reader.decode_into(i, &mut buf).unwrap();
122 buf
123 },
124 )
125 }
126
127 /// Applies a function to each sequence in parallel without materialization.
128 ///
129 /// Unlike [`par_iter`](Self::par_iter), this method does not allocate a
130 /// `Vec<T>` for each sequence. Instead, the closure receives a streaming
131 /// [`SeqIter`] directly, enabling zero-allocation parallel processing.
132 ///
133 /// # Performance
134 ///
135 /// This method is optimal when:
136 /// - The operation is purely consumptive (fold, sum, count, predicate check)
137 /// - Memory allocation overhead is significant relative to decode time
138 /// - Sequences are short enough that materialization cost matters
139 ///
140 /// For operations that need to retain sequence data (collect, sort, store),
141 /// use [`par_iter`](Self::par_iter) instead.
142 ///
143 /// # Type Parameters
144 ///
145 /// - `F`: Closure type that processes a [`SeqIter`] and produces a result
146 /// - `R`: Result type produced by the closure, must be [`Send`]
147 ///
148 /// # Examples
149 ///
150 /// ```
151 /// # #[cfg(feature = "parallel")]
152 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
153 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
154 ///
155 /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100]];
156 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
157 ///
158 /// // Sum each sequence without allocating intermediate Vecs
159 /// let sums: Vec<u64> = vec.par_for_each(|seq| seq.map(|v| v as u64).sum());
160 /// assert_eq!(sums, vec![6, 30, 100]);
161 ///
162 /// // Count elements per sequence
163 /// let counts: Vec<usize> = vec.par_for_each(|seq| seq.count());
164 /// assert_eq!(counts, vec![3, 2, 1]);
165 ///
166 /// // Check if any sequence contains a value > 50
167 /// let has_large: Vec<bool> = vec.par_for_each(|mut seq| seq.any(|v| v > 50));
168 /// assert_eq!(has_large, vec![false, false, true]);
169 /// # Ok(())
170 /// # }
171 /// # #[cfg(not(feature = "parallel"))]
172 /// # fn main() {}
173 /// ```
174 ///
175 /// # Comparison with [`par_iter`](Self::par_iter)
176 ///
177 /// ```
178 /// # #[cfg(feature = "parallel")]
179 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
180 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
181 /// use rayon::prelude::*;
182 ///
183 /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20]];
184 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
185 ///
186 /// // par_iter: allocates Vec<T> per sequence, then sums
187 /// let sums_alloc: Vec<u64> = vec
188 /// .par_iter()
189 /// .map(|s| s.iter().map(|&v| v as u64).sum())
190 /// .collect();
191 ///
192 /// // par_for_each: zero allocation, sums directly from iterator
193 /// let sums_noalloc: Vec<u64> = vec.par_for_each(|seq| seq.map(|v| v as u64).sum());
194 ///
195 /// assert_eq!(sums_alloc, sums_noalloc);
196 /// # Ok(())
197 /// # }
198 /// # #[cfg(not(feature = "parallel"))]
199 /// # fn main() {}
200 /// ```
201 pub fn par_for_each<F, R>(&self, f: F) -> Vec<R>
202 where
203 F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
204 R: Send,
205 {
206 let num_sequences = self.num_sequences();
207 let data = self.data.as_ref();
208 let bit_offsets = &self.bit_offsets;
209 let seq_lengths = self.seq_lengths.as_ref();
210 let encoding = self.encoding;
211
212 (0..num_sequences)
213 .into_par_iter()
214 .map(|i| {
215 // SAFETY: i < num_sequences by construction of the range.
216 let start_bit = unsafe { bit_offsets.get_unchecked(i) };
217 let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
218 let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
219
220 let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
221 f(iter)
222 })
223 .collect()
224 }
225
226 /// Applies a function to each sequence in parallel and reduces results.
227 ///
228 /// This is a convenience method combining [`par_for_each`](Self::par_for_each)
229 /// with a parallel reduction. Useful when the final result is a single
230 /// aggregated value rather than a collection.
231 ///
232 /// # Examples
233 ///
234 /// ```
235 /// # #[cfg(feature = "parallel")]
236 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
237 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
238 ///
239 /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100]];
240 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
241 ///
242 /// // Total sum across all sequences
243 /// let total: u64 = vec.par_for_each_reduce(
244 /// |seq| seq.map(|v| v as u64).sum::<u64>(),
245 /// || 0u64,
246 /// |a, b| a + b,
247 /// );
248 /// assert_eq!(total, 136); // 6 + 30 + 100
249 ///
250 /// // Maximum element across all sequences
251 /// let max: u32 = vec.par_for_each_reduce(
252 /// |seq| seq.max().unwrap_or(0),
253 /// || 0u32,
254 /// |a, b| a.max(b),
255 /// );
256 /// assert_eq!(max, 100);
257 /// # Ok(())
258 /// # }
259 /// # #[cfg(not(feature = "parallel"))]
260 /// # fn main() {}
261 /// ```
262 pub fn par_for_each_reduce<F, R, ID, OP>(&self, f: F, identity: ID, op: OP) -> R
263 where
264 F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
265 R: Send,
266 ID: Fn() -> R + Sync + Send,
267 OP: Fn(R, R) -> R + Sync + Send,
268 {
269 let num_sequences = self.num_sequences();
270 let data = self.data.as_ref();
271 let bit_offsets = &self.bit_offsets;
272 let seq_lengths = self.seq_lengths.as_ref();
273 let encoding = self.encoding;
274
275 (0..num_sequences)
276 .into_par_iter()
277 .map(|i| {
278 let start_bit = unsafe { bit_offsets.get_unchecked(i) };
279 let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
280 let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
281
282 let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
283 f(iter)
284 })
285 .reduce(identity, op)
286 }
287
288 /// Consumes the [`SeqVec`] and decodes all sequences into separate vectors
289 /// in parallel.
290 ///
291 /// This method is a parallel version of [`into_vecs`](super::SeqVec::into_vecs),
292 /// leveraging Rayon to decompress multiple sequences concurrently. Each
293 /// sequence is fully decompressed by its assigned thread.
294 ///
295 /// # Performance
296 ///
297 /// Parallelization is beneficial when:
298 /// - The dataset is large enough to amortize thread overhead
299 /// - Sequences are reasonably sized
300 ///
301 /// For small datasets or very fast codecs, the sequential
302 /// [`into_vecs`](super::SeqVec::into_vecs) method may be faster due to
303 /// better cache locality.
304 ///
305 /// # Examples
306 ///
307 /// ```
308 /// # #[cfg(feature = "parallel")]
309 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
310 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
311 ///
312 /// let sequences: &[&[u32]] = &[
313 /// &[1, 2, 3],
314 /// &[10, 20],
315 /// &[100, 200, 300],
316 /// ];
317 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
318 ///
319 /// // Decode all sequences in parallel
320 /// let all_sequences: Vec<Vec<u32>> = vec.par_into_vecs();
321 ///
322 /// assert_eq!(all_sequences.len(), 3);
323 /// assert_eq!(all_sequences[0], vec![1, 2, 3]);
324 /// assert_eq!(all_sequences[1], vec![10, 20]);
325 /// # Ok(())
326 /// # }
327 /// # #[cfg(not(feature = "parallel"))]
328 /// # fn main() {}
329 /// ```
330 ///
331 /// [`into_vecs`]: super::SeqVec::into_vecs
332 pub fn par_into_vecs(self) -> Vec<Vec<T>> {
333 let num_sequences = self.num_sequences();
334 let seqvec = &self;
335
336 (0..num_sequences)
337 .into_par_iter()
338 .map_init(
339 || seqvec.reader(),
340 move |reader, i| {
341 // Pre-allocate buffer when sequence length is known.
342 let capacity = seqvec
343 .seq_lengths
344 .as_ref()
345 .map(|l| unsafe { l.get_unchecked(i) as usize })
346 .unwrap_or(0);
347
348 let mut buf = Vec::with_capacity(capacity);
349 reader.decode_into(i, &mut buf).unwrap();
350 buf
351 },
352 )
353 .collect()
354 }
355
356 /// Retrieves multiple sequences in parallel.
357 ///
358 /// This method uses Rayon to parallelize the retrieval of multiple sequences
359 /// by index. It is particularly useful when accessing a large subset of
360 /// sequences that are not contiguous.
361 ///
362 /// # Errors
363 ///
364 /// Returns [`SeqVecError::IndexOutOfBounds`] if any index is out of bounds.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// # #[cfg(feature = "parallel")]
370 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
371 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
372 ///
373 /// let sequences: &[&[u32]] = &[
374 /// &[1, 2, 3],
375 /// &[10, 20],
376 /// &[100, 200, 300],
377 /// &[1000],
378 /// ];
379 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
380 ///
381 /// let indices = [3, 0, 2];
382 /// let sequences = vec.par_decode_many(&indices)?;
383 /// assert_eq!(sequences.len(), 3);
384 /// assert_eq!(sequences[0], vec![1000]); // Index 3
385 /// assert_eq!(sequences[1], vec![1, 2, 3]); // Index 0
386 /// assert_eq!(sequences[2], vec![100, 200, 300]); // Index 2
387 /// # Ok(())
388 /// # }
389 /// # #[cfg(not(feature = "parallel"))]
390 /// # fn main() {}
391 /// ```
392 pub fn par_decode_many(&self, indices: &[usize]) -> Result<Vec<Vec<T>>, SeqVecError> {
393 if indices.is_empty() {
394 return Ok(Vec::new());
395 }
396
397 let num_sequences = self.num_sequences();
398
399 // Bounds checking
400 for &index in indices {
401 if index >= num_sequences {
402 return Err(SeqVecError::IndexOutOfBounds(index));
403 }
404 }
405
406 // SAFETY: We have pre-checked the bounds of all indices.
407 Ok(unsafe { self.par_decode_many_unchecked(indices) })
408 }
409
410 /// Retrieves multiple sequences in parallel without bounds checking.
411 ///
412 /// # Safety
413 ///
414 /// Calling this method with any out-of-bounds index in the `indices` slice
415 /// is undefined behavior. In debug builds, assertions will panic.
416 pub unsafe fn par_decode_many_unchecked(&self, indices: &[usize]) -> Vec<Vec<T>> {
417 #[cfg(debug_assertions)]
418 {
419 let num_sequences = self.num_sequences();
420 for &index in indices {
421 debug_assert!(
422 index < num_sequences,
423 "Index out of bounds: index was {} but num_sequences was {}",
424 index,
425 num_sequences
426 );
427 }
428 }
429
430 if indices.is_empty() {
431 return Vec::new();
432 }
433
434 let mut results = vec![Vec::new(); indices.len()];
435
436 results.par_iter_mut().enumerate().for_each_init(
437 || self.reader(),
438 |reader, (original_pos, result)| {
439 let target_index = indices[original_pos];
440
441 // Pre-allocate when sequence length is known.
442 if let Some(lengths) = &self.seq_lengths {
443 let capacity = unsafe { lengths.get_unchecked(target_index) as usize };
444 result.reserve(capacity);
445 }
446
447 // SAFETY: bounds are guaranteed by the caller.
448 reader.decode_into(target_index, result).unwrap();
449 },
450 );
451
452 results
453 }
454
455 /// Applies a function to selected sequences in parallel without materialization.
456 ///
457 /// This is the sparse-access equivalent of [`par_for_each`](Self::par_for_each),
458 /// allowing zero-allocation processing of a subset of sequences by index.
459 ///
460 /// # Errors
461 ///
462 /// Returns [`SeqVecError::IndexOutOfBounds`] if any index is out of bounds.
463 ///
464 /// # Examples
465 ///
466 /// ```
467 /// # #[cfg(feature = "parallel")]
468 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
469 /// use compressed_intvec::seq::{SeqVec, LESeqVec};
470 ///
471 /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100], &[1000, 2000]];
472 /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
473 ///
474 /// // Sum only sequences at indices 0 and 2
475 /// let sums = vec.par_for_each_many(&[0, 2], |seq| seq.map(|v| v as u64).sum::<u64>())?;
476 /// assert_eq!(sums, vec![6, 100]);
477 /// # Ok(())
478 /// # }
479 /// # #[cfg(not(feature = "parallel"))]
480 /// # fn main() {}
481 /// ```
482 pub fn par_for_each_many<F, R>(&self, indices: &[usize], f: F) -> Result<Vec<R>, SeqVecError>
483 where
484 F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
485 R: Send,
486 {
487 if indices.is_empty() {
488 return Ok(Vec::new());
489 }
490
491 let num_sequences = self.num_sequences();
492
493 // Bounds checking
494 for &index in indices {
495 if index >= num_sequences {
496 return Err(SeqVecError::IndexOutOfBounds(index));
497 }
498 }
499
500 let data = self.data.as_ref();
501 let bit_offsets = &self.bit_offsets;
502 let seq_lengths = self.seq_lengths.as_ref();
503 let encoding = self.encoding;
504
505 let results = indices
506 .par_iter()
507 .map(|&i| {
508 // SAFETY: bounds checked above.
509 let start_bit = unsafe { bit_offsets.get_unchecked(i) };
510 let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
511 let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
512
513 let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
514 f(iter)
515 })
516 .collect();
517
518 Ok(results)
519 }
520}