Skip to main content

compressed_intvec/seq/
parallel.rs

1//! Parallel operations for [`SeqVec`].
2//!
3//! This module provides parallel implementations for [`SeqVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate sequence retrieval and decompression.
7//!
8//! # API Overview
9//!
10//! | Method | Allocation | Returns | Best For |
11//! |--------|------------|---------|----------|
12//! | [`par_iter`] | `Vec<T>` per sequence | `impl ParallelIterator<Item = Vec<T>>` | Retaining decoded sequences |
13//! | [`par_for_each`] | None | `Vec<R>` | Consumptive ops (sum, count, fold) |
14//! | [`par_into_vecs`] | `Vec<T>` per sequence | `Vec<Vec<T>>` | Bulk decode with ownership transfer |
15//! | [`par_decode_many`] | `Vec<T>` per index | `Vec<Vec<T>>` | Sparse random access |
16//!
17//! # Performance Considerations
18//!
19//! Parallel iteration introduces thread dispatch overhead and reduces cache
20//! locality compared to sequential iteration. For small datasets or very fast codecs, sequential [`iter`](super::SeqVec::iter)
21//! is typically faster.
22//!
23//! # SeqVec-Specific Methods
24//!
25//! The [`par_for_each`] family of methods is unique to [`SeqVec`] and does not
26//! exist in [`FixedVec`](crate::fixed::FixedVec) or [`VarVec`](crate::variable::VarVec).
27//! This is because:
28//!
29//! - `FixedVec::par_iter()` and `VarVec::par_iter()` yield `T` directly, so
30//!   standard Rayon combinators (`.map()`, `.for_each()`) provide zero-allocation
31//!   processing.
32//! - `SeqVec::par_iter()` must yield `Vec<T>` (materialized sequences) due to
33//!   Rayon's ownership model. The `par_for_each` family provides a zero-allocation
34//!   path by passing [`SeqIter`] directly to the closure.
35//!
36//! [Rayon]: https://github.com/rayon-rs/rayon
37//! [`SeqVec`]: super::SeqVec
38//! [`SeqIter`]: super::iter::SeqIter
39//! [`par_iter`]: super::SeqVec::par_iter
40//! [`par_for_each`]: super::SeqVec::par_for_each
41//! [`par_into_vecs`]: super::SeqVec::par_into_vecs
42//! [`par_decode_many`]: super::SeqVec::par_decode_many
43
44use super::iter::SeqIter;
45use super::{SeqVec, SeqVecBitReader, SeqVecError};
46use crate::variable::traits::Storable;
47use dsi_bitstream::dispatch::CodesRead;
48use dsi_bitstream::prelude::{BitRead, BitSeek, Endianness};
49use rayon::prelude::{
50    IndexedParallelIterator, IntoParallelIterator, IntoParallelRefIterator,
51    IntoParallelRefMutIterator, ParallelIterator,
52};
53
54impl<T, E, B> SeqVec<T, E, B>
55where
56    T: Storable + Send + Sync,
57    E: Endianness + Send + Sync,
58    B: AsRef<[u64]> + Send + Sync,
59    for<'a> SeqVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
60        + CodesRead<E>
61        + BitSeek<Error = core::convert::Infallible>
62        + Send,
63{
64    /// Returns a parallel iterator over all sequences, materializing each as `Vec<T>`.
65    ///
66    /// This method uses Rayon to decompress and collect all sequences in
67    /// parallel. Each sequence is fully decoded into a `Vec<T>` by its
68    /// assigned thread before being yielded.
69    ///
70    /// # Performance
71    ///
72    /// Parallelization is beneficial when the dataset contains enough sequences and those
73    /// sequences are sufficiently large to amortize thread overhead.
74    ///
75    /// For consumptive operations (sum, count, fold) where the decoded data
76    /// is not retained, prefer [`par_for_each`](Self::par_for_each) which
77    /// avoids allocation overhead.
78    ///
79    /// For small datasets or very fast codecs, the sequential [`iter`](Self::iter)
80    /// method may be faster due to better cache locality.
81    ///
82    /// # Examples
83    ///
84    /// ```
85    /// # #[cfg(feature = "parallel")]
86    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
87    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
88    /// use rayon::prelude::*;
89    ///
90    /// let sequences: &[&[u32]] = &[
91    ///     &[1, 2, 3],
92    ///     &[10, 20],
93    ///     &[100, 200, 300],
94    /// ];
95    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
96    ///
97    /// // Collect all sequences in parallel
98    /// let all_sequences: Vec<Vec<u32>> = vec.par_iter().collect();
99    ///
100    /// assert_eq!(all_sequences.len(), 3);
101    /// assert_eq!(all_sequences[0], vec![1, 2, 3]);
102    /// #     Ok(())
103    /// # }
104    /// # #[cfg(not(feature = "parallel"))]
105    /// # fn main() {}
106    /// ```
107    pub fn par_iter(&self) -> impl ParallelIterator<Item = Vec<T>> + '_ {
108        let num_sequences = self.num_sequences();
109
110        (0..num_sequences).into_par_iter().map_init(
111            || self.reader(),
112            move |reader, i| {
113                // Pre-allocate buffer when sequence length is known.
114                let capacity = self
115                    .seq_lengths
116                    .as_ref()
117                    .map(|l| unsafe { l.get_unchecked(i) as usize })
118                    .unwrap_or(0);
119
120                let mut buf = Vec::with_capacity(capacity);
121                reader.decode_into(i, &mut buf).unwrap();
122                buf
123            },
124        )
125    }
126
127    /// Applies a function to each sequence in parallel without materialization.
128    ///
129    /// Unlike [`par_iter`](Self::par_iter), this method does not allocate a
130    /// `Vec<T>` for each sequence. Instead, the closure receives a streaming
131    /// [`SeqIter`] directly, enabling zero-allocation parallel processing.
132    ///
133    /// # Performance
134    ///
135    /// This method is optimal when:
136    /// - The operation is purely consumptive (fold, sum, count, predicate check)
137    /// - Memory allocation overhead is significant relative to decode time
138    /// - Sequences are short enough that materialization cost matters
139    ///
140    /// For operations that need to retain sequence data (collect, sort, store),
141    /// use [`par_iter`](Self::par_iter) instead.
142    ///
143    /// # Type Parameters
144    ///
145    /// - `F`: Closure type that processes a [`SeqIter`] and produces a result
146    /// - `R`: Result type produced by the closure, must be [`Send`]
147    ///
148    /// # Examples
149    ///
150    /// ```
151    /// # #[cfg(feature = "parallel")]
152    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
153    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
154    ///
155    /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100]];
156    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
157    ///
158    /// // Sum each sequence without allocating intermediate Vecs
159    /// let sums: Vec<u64> = vec.par_for_each(|seq| seq.map(|v| v as u64).sum());
160    /// assert_eq!(sums, vec![6, 30, 100]);
161    ///
162    /// // Count elements per sequence
163    /// let counts: Vec<usize> = vec.par_for_each(|seq| seq.count());
164    /// assert_eq!(counts, vec![3, 2, 1]);
165    ///
166    /// // Check if any sequence contains a value > 50
167    /// let has_large: Vec<bool> = vec.par_for_each(|mut seq| seq.any(|v| v > 50));
168    /// assert_eq!(has_large, vec![false, false, true]);
169    /// #     Ok(())
170    /// # }
171    /// # #[cfg(not(feature = "parallel"))]
172    /// # fn main() {}
173    /// ```
174    ///
175    /// # Comparison with [`par_iter`](Self::par_iter)
176    ///
177    /// ```
178    /// # #[cfg(feature = "parallel")]
179    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
180    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
181    /// use rayon::prelude::*;
182    ///
183    /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20]];
184    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
185    ///
186    /// // par_iter: allocates Vec<T> per sequence, then sums
187    /// let sums_alloc: Vec<u64> = vec
188    ///     .par_iter()
189    ///     .map(|s| s.iter().map(|&v| v as u64).sum())
190    ///     .collect();
191    ///
192    /// // par_for_each: zero allocation, sums directly from iterator
193    /// let sums_noalloc: Vec<u64> = vec.par_for_each(|seq| seq.map(|v| v as u64).sum());
194    ///
195    /// assert_eq!(sums_alloc, sums_noalloc);
196    /// #     Ok(())
197    /// # }
198    /// # #[cfg(not(feature = "parallel"))]
199    /// # fn main() {}
200    /// ```
201    pub fn par_for_each<F, R>(&self, f: F) -> Vec<R>
202    where
203        F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
204        R: Send,
205    {
206        let num_sequences = self.num_sequences();
207        let data = self.data.as_ref();
208        let bit_offsets = &self.bit_offsets;
209        let seq_lengths = self.seq_lengths.as_ref();
210        let encoding = self.encoding;
211
212        (0..num_sequences)
213            .into_par_iter()
214            .map(|i| {
215                // SAFETY: i < num_sequences by construction of the range.
216                let start_bit = unsafe { bit_offsets.get_unchecked(i) };
217                let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
218                let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
219
220                let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
221                f(iter)
222            })
223            .collect()
224    }
225
226    /// Applies a function to each sequence in parallel and reduces results.
227    ///
228    /// This is a convenience method combining [`par_for_each`](Self::par_for_each)
229    /// with a parallel reduction. Useful when the final result is a single
230    /// aggregated value rather than a collection.
231    ///
232    /// # Examples
233    ///
234    /// ```
235    /// # #[cfg(feature = "parallel")]
236    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
237    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
238    ///
239    /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100]];
240    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
241    ///
242    /// // Total sum across all sequences
243    /// let total: u64 = vec.par_for_each_reduce(
244    ///     |seq| seq.map(|v| v as u64).sum::<u64>(),
245    ///     || 0u64,
246    ///     |a, b| a + b,
247    /// );
248    /// assert_eq!(total, 136); // 6 + 30 + 100
249    ///
250    /// // Maximum element across all sequences
251    /// let max: u32 = vec.par_for_each_reduce(
252    ///     |seq| seq.max().unwrap_or(0),
253    ///     || 0u32,
254    ///     |a, b| a.max(b),
255    /// );
256    /// assert_eq!(max, 100);
257    /// #     Ok(())
258    /// # }
259    /// # #[cfg(not(feature = "parallel"))]
260    /// # fn main() {}
261    /// ```
262    pub fn par_for_each_reduce<F, R, ID, OP>(&self, f: F, identity: ID, op: OP) -> R
263    where
264        F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
265        R: Send,
266        ID: Fn() -> R + Sync + Send,
267        OP: Fn(R, R) -> R + Sync + Send,
268    {
269        let num_sequences = self.num_sequences();
270        let data = self.data.as_ref();
271        let bit_offsets = &self.bit_offsets;
272        let seq_lengths = self.seq_lengths.as_ref();
273        let encoding = self.encoding;
274
275        (0..num_sequences)
276            .into_par_iter()
277            .map(|i| {
278                let start_bit = unsafe { bit_offsets.get_unchecked(i) };
279                let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
280                let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
281
282                let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
283                f(iter)
284            })
285            .reduce(identity, op)
286    }
287
288    /// Consumes the [`SeqVec`] and decodes all sequences into separate vectors
289    /// in parallel.
290    ///
291    /// This method is a parallel version of [`into_vecs`](super::SeqVec::into_vecs),
292    /// leveraging Rayon to decompress multiple sequences concurrently. Each
293    /// sequence is fully decompressed by its assigned thread.
294    ///
295    /// # Performance
296    ///
297    /// Parallelization is beneficial when:
298    /// - The dataset is large enough to amortize thread overhead
299    /// - Sequences are reasonably sized
300    ///
301    /// For small datasets or very fast codecs, the sequential
302    /// [`into_vecs`](super::SeqVec::into_vecs) method may be faster due to
303    /// better cache locality.
304    ///
305    /// # Examples
306    ///
307    /// ```
308    /// # #[cfg(feature = "parallel")]
309    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
310    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
311    ///
312    /// let sequences: &[&[u32]] = &[
313    ///     &[1, 2, 3],
314    ///     &[10, 20],
315    ///     &[100, 200, 300],
316    /// ];
317    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
318    ///
319    /// // Decode all sequences in parallel
320    /// let all_sequences: Vec<Vec<u32>> = vec.par_into_vecs();
321    ///
322    /// assert_eq!(all_sequences.len(), 3);
323    /// assert_eq!(all_sequences[0], vec![1, 2, 3]);
324    /// assert_eq!(all_sequences[1], vec![10, 20]);
325    /// #     Ok(())
326    /// # }
327    /// # #[cfg(not(feature = "parallel"))]
328    /// # fn main() {}
329    /// ```
330    ///
331    /// [`into_vecs`]: super::SeqVec::into_vecs
332    pub fn par_into_vecs(self) -> Vec<Vec<T>> {
333        let num_sequences = self.num_sequences();
334        let seqvec = &self;
335
336        (0..num_sequences)
337            .into_par_iter()
338            .map_init(
339                || seqvec.reader(),
340                move |reader, i| {
341                    // Pre-allocate buffer when sequence length is known.
342                    let capacity = seqvec
343                        .seq_lengths
344                        .as_ref()
345                        .map(|l| unsafe { l.get_unchecked(i) as usize })
346                        .unwrap_or(0);
347
348                    let mut buf = Vec::with_capacity(capacity);
349                    reader.decode_into(i, &mut buf).unwrap();
350                    buf
351                },
352            )
353            .collect()
354    }
355
356    /// Retrieves multiple sequences in parallel.
357    ///
358    /// This method uses Rayon to parallelize the retrieval of multiple sequences
359    /// by index. It is particularly useful when accessing a large subset of
360    /// sequences that are not contiguous.
361    ///
362    /// # Errors
363    ///
364    /// Returns [`SeqVecError::IndexOutOfBounds`] if any index is out of bounds.
365    ///
366    /// # Examples
367    ///
368    /// ```
369    /// # #[cfg(feature = "parallel")]
370    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
371    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
372    ///
373    /// let sequences: &[&[u32]] = &[
374    ///     &[1, 2, 3],
375    ///     &[10, 20],
376    ///     &[100, 200, 300],
377    ///     &[1000],
378    /// ];
379    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
380    ///
381    /// let indices = [3, 0, 2];
382    /// let sequences = vec.par_decode_many(&indices)?;
383    /// assert_eq!(sequences.len(), 3);
384    /// assert_eq!(sequences[0], vec![1000]);  // Index 3
385    /// assert_eq!(sequences[1], vec![1, 2, 3]); // Index 0
386    /// assert_eq!(sequences[2], vec![100, 200, 300]); // Index 2
387    /// #     Ok(())
388    /// # }
389    /// # #[cfg(not(feature = "parallel"))]
390    /// # fn main() {}
391    /// ```
392    pub fn par_decode_many(&self, indices: &[usize]) -> Result<Vec<Vec<T>>, SeqVecError> {
393        if indices.is_empty() {
394            return Ok(Vec::new());
395        }
396
397        let num_sequences = self.num_sequences();
398
399        // Bounds checking
400        for &index in indices {
401            if index >= num_sequences {
402                return Err(SeqVecError::IndexOutOfBounds(index));
403            }
404        }
405
406        // SAFETY: We have pre-checked the bounds of all indices.
407        Ok(unsafe { self.par_decode_many_unchecked(indices) })
408    }
409
410    /// Retrieves multiple sequences in parallel without bounds checking.
411    ///
412    /// # Safety
413    ///
414    /// Calling this method with any out-of-bounds index in the `indices` slice
415    /// is undefined behavior. In debug builds, assertions will panic.
416    pub unsafe fn par_decode_many_unchecked(&self, indices: &[usize]) -> Vec<Vec<T>> {
417        #[cfg(debug_assertions)]
418        {
419            let num_sequences = self.num_sequences();
420            for &index in indices {
421                debug_assert!(
422                    index < num_sequences,
423                    "Index out of bounds: index was {} but num_sequences was {}",
424                    index,
425                    num_sequences
426                );
427            }
428        }
429
430        if indices.is_empty() {
431            return Vec::new();
432        }
433
434        let mut results = vec![Vec::new(); indices.len()];
435
436        results.par_iter_mut().enumerate().for_each_init(
437            || self.reader(),
438            |reader, (original_pos, result)| {
439                let target_index = indices[original_pos];
440
441                // Pre-allocate when sequence length is known.
442                if let Some(lengths) = &self.seq_lengths {
443                    let capacity = unsafe { lengths.get_unchecked(target_index) as usize };
444                    result.reserve(capacity);
445                }
446
447                // SAFETY: bounds are guaranteed by the caller.
448                reader.decode_into(target_index, result).unwrap();
449            },
450        );
451
452        results
453    }
454
455    /// Applies a function to selected sequences in parallel without materialization.
456    ///
457    /// This is the sparse-access equivalent of [`par_for_each`](Self::par_for_each),
458    /// allowing zero-allocation processing of a subset of sequences by index.
459    ///
460    /// # Errors
461    ///
462    /// Returns [`SeqVecError::IndexOutOfBounds`] if any index is out of bounds.
463    ///
464    /// # Examples
465    ///
466    /// ```
467    /// # #[cfg(feature = "parallel")]
468    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
469    /// use compressed_intvec::seq::{SeqVec, LESeqVec};
470    ///
471    /// let sequences: &[&[u32]] = &[&[1, 2, 3], &[10, 20], &[100], &[1000, 2000]];
472    /// let vec: LESeqVec<u32> = SeqVec::from_slices(sequences)?;
473    ///
474    /// // Sum only sequences at indices 0 and 2
475    /// let sums = vec.par_for_each_many(&[0, 2], |seq| seq.map(|v| v as u64).sum::<u64>())?;
476    /// assert_eq!(sums, vec![6, 100]);
477    /// #     Ok(())
478    /// # }
479    /// # #[cfg(not(feature = "parallel"))]
480    /// # fn main() {}
481    /// ```
482    pub fn par_for_each_many<F, R>(&self, indices: &[usize], f: F) -> Result<Vec<R>, SeqVecError>
483    where
484        F: Fn(SeqIter<'_, T, E>) -> R + Sync + Send,
485        R: Send,
486    {
487        if indices.is_empty() {
488            return Ok(Vec::new());
489        }
490
491        let num_sequences = self.num_sequences();
492
493        // Bounds checking
494        for &index in indices {
495            if index >= num_sequences {
496                return Err(SeqVecError::IndexOutOfBounds(index));
497            }
498        }
499
500        let data = self.data.as_ref();
501        let bit_offsets = &self.bit_offsets;
502        let seq_lengths = self.seq_lengths.as_ref();
503        let encoding = self.encoding;
504
505        let results = indices
506            .par_iter()
507            .map(|&i| {
508                // SAFETY: bounds checked above.
509                let start_bit = unsafe { bit_offsets.get_unchecked(i) };
510                let end_bit = unsafe { bit_offsets.get_unchecked(i + 1) };
511                let len = seq_lengths.map(|l| unsafe { l.get_unchecked(i) as usize });
512
513                let iter = SeqIter::new_with_len(data, start_bit, end_bit, encoding, len);
514                f(iter)
515            })
516            .collect();
517
518        Ok(results)
519    }
520}