compressed_intvec/fixed/
parallel.rs

1//! # [`FixedVec`] Parallel Operations
2//!
3//! This module provides parallel implementations for [`FixedVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are based on the [Rayon]
5//! library.
6//!
7//! # Examples
8//!
9//! ```
10//! # #[cfg(feature = "parallel")]
11//! # {
12//! use compressed_intvec::fixed::{FixedVec, UFixedVec};
13//! use rayon::prelude::*;
14//!
15//! let data: Vec<u32> = (0..1000).collect();
16//! let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
17//!
18//! // Sum the elements in parallel.
19//! let sum: u32 = vec.par_iter().sum();
20//! assert_eq!(sum, (0..1000).sum());
21//! # }
22//! ```
23//!
24//! [Rayon]: https://docs.rs/rayon/latest/rayon/
25
26#![cfg(feature = "parallel")]
27
28use crate::fixed::{
29    traits::{Storable, Word},
30    Error as FixedVecError, FixedVec,
31};
32use dsi_bitstream::prelude::Endianness;
33use rayon::prelude::*;
34
35impl<T, W, E, B> FixedVec<T, W, E, B>
36where
37    T: Storable<W> + Send + Sync,
38    W: Word + Sync,
39    E: Endianness,
40    B: AsRef<[W]> + Sync,
41{
42    /// Returns a parallel iterator over the decompressed values.
43    ///
44    /// This operation is highly parallelizable because each element can be
45    /// decompressed independently from the others.
46    ///
47    /// # Examples
48    ///
49    /// ```
50    /// # #[cfg(feature = "parallel")]
51    /// # {
52    /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
53    /// use rayon::prelude::*;
54    ///
55    /// let data: Vec<u32> = (0..100).collect();
56    /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
57    ///
58    /// let count = vec.par_iter().filter(|&x| x % 2 == 0).count();
59    /// assert_eq!(count, 50);
60    /// # }
61    /// ```
62    pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = T> + '_ {
63        (0..self.len())
64            .into_par_iter()
65            .map(move |i| unsafe { self.get_unchecked(i) })
66    }
67
68    /// Retrieves multiple elements in parallel.
69    ///
70    /// # Errors
71    ///
72    /// Returns an error if any index is out of bounds.
73    ///
74    /// # Examples
75    ///
76    /// ```
77    /// # #[cfg(feature = "parallel")]
78    /// # {
79    /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
80    ///
81    /// let data: Vec<u32> = (0..100).collect();
82    /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
83    ///
84    /// let indices = vec![10, 50, 99];
85    /// let values = vec.par_get_many(&indices).unwrap();
86    /// assert_eq!(values, vec![10, 50, 99]);
87    ///
88    /// // An out-of-bounds index will result in an error.
89    /// let invalid_indices = vec![10, 100];
90    /// assert!(vec.par_get_many(&invalid_indices).is_err());
91    /// # }
92    /// ```
93    pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, FixedVecError> {
94        // Perform a single bounds check sequentially first for early failure.
95        if let Some(&index) = indices.iter().find(|&&idx| idx >= self.len()) {
96            return Err(FixedVecError::InvalidParameters(format!(
97                "Index {} out of bounds for vector of length {}",
98                index, self.len
99            )));
100        }
101        // SAFETY: We have pre-checked the bounds of all indices.
102        Ok(unsafe { self.par_get_many_unchecked(indices) })
103    }
104
105    /// Retrieves multiple elements in parallel without bounds checking.
106    ///
107    /// # Safety
108    ///
109    /// Calling this method with any out-of-bounds index is Undefined Behavior.
110    /// All indices must be less than `self.len()`.
111    pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
112        if indices.is_empty() {
113            return Vec::new();
114        }
115
116        // Allocate an uninitialized buffer to hold the results. This avoids
117        // the need for `T: Default` and prevents zero-initializing the memory.
118        let mut results: Vec<std::mem::MaybeUninit<T>> = Vec::with_capacity(indices.len());
119        results.resize_with(indices.len(), std::mem::MaybeUninit::uninit);
120
121        results
122            .par_iter_mut()
123            .zip(indices.par_iter())
124            .for_each(|(res_val, &index)| {
125                // Each thread performs a scalar lookup for its assigned indices.
126                // The `get_unchecked` call is thread-safe as it is read-only.
127                res_val.write(self.get_unchecked(index));
128            });
129
130        // SAFETY: The parallel iteration guarantees that all elements in the
131        // `results` vector have been initialized. We can therefore safely
132        // transmute the `Vec<MaybeUninit<T>>` to a `Vec<T>`.
133        std::mem::transmute(results)
134    }
135}