Skip to main content

compressed_intvec/fixed/
parallel.rs

1//! # [`FixedVec`] Parallel Operations
2//!
3//! This module provides parallel implementations for [`FixedVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are based on the [Rayon]
5//! library.
6//!
7//! # Examples
8//!
9//! ```
10//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
11//! # #[cfg(feature = "parallel")]
12//! # {
13//! use compressed_intvec::fixed::{FixedVec, UFixedVec};
14//! use rayon::prelude::*;
15//!
16//! let data: Vec<u32> = (0..1000).collect();
17//! let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
18//!
19//! // Sum the elements in parallel.
20//! let sum: u32 = vec.par_iter().sum();
21//! assert_eq!(sum, (0..1000).sum());
22//! # }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! [Rayon]: https://docs.rs/rayon/latest/rayon/
28
29use crate::fixed::{
30    Error as FixedVecError, FixedVec,
31    traits::{Storable, Word},
32};
33use dsi_bitstream::prelude::Endianness;
34use rayon::prelude::*;
35
36impl<T, W, E, B> FixedVec<T, W, E, B>
37where
38    T: Storable<W> + Send + Sync,
39    W: Word + Sync,
40    E: Endianness,
41    B: AsRef<[W]> + Sync,
42{
43    /// Returns a parallel iterator over the decompressed values.
44    ///
45    /// This operation is highly parallelizable because each element can be
46    /// decompressed independently from the others.
47    ///
48    /// # Examples
49    ///
50    /// ```
51    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
52    /// # #[cfg(feature = "parallel")]
53    /// # {
54    /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
55    /// use rayon::prelude::*;
56    ///
57    /// let data: Vec<u32> = (0..100).collect();
58    /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
59    ///
60    /// let count = vec.par_iter().filter(|&x| x % 2 == 0).count();
61    /// assert_eq!(count, 50);
62    /// # }
63    /// # Ok(())
64    /// # }
65    /// ```
66    pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = T> + '_ {
67        (0..self.len())
68            .into_par_iter()
69            .map(move |i| unsafe { self.get_unchecked(i) })
70    }
71
72    /// Retrieves multiple elements in parallel.
73    ///
74    /// # Errors
75    ///
76    /// Returns an error if any index is out of bounds.
77    ///
78    /// # Examples
79    ///
80    /// ```
81    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
82    /// # #[cfg(feature = "parallel")]
83    /// # {
84    /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
85    ///
86    /// let data: Vec<u32> = (0..100).collect();
87    /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
88    ///
89    /// let indices = vec![10, 50, 99];
90    /// let values = vec.par_get_many(&indices)?;
91    /// assert_eq!(values, vec![10, 50, 99]);
92    ///
93    /// // An out-of-bounds index will result in an error.
94    /// let invalid_indices = vec![10, 100];
95    /// assert!(vec.par_get_many(&invalid_indices).is_err());
96    /// # }
97    /// # Ok(())
98    /// # }
99    /// ```
100    pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, FixedVecError> {
101        // Perform a single bounds check sequentially first for early failure.
102        if let Some(&index) = indices.iter().find(|&&idx| idx >= self.len()) {
103            return Err(FixedVecError::InvalidParameters(format!(
104                "Index {} out of bounds for vector of length {}",
105                index, self.len
106            )));
107        }
108        // SAFETY: We have pre-checked the bounds of all indices.
109        Ok(unsafe { self.par_get_many_unchecked(indices) })
110    }
111
112    /// Retrieves multiple elements in parallel without bounds checking.
113    ///
114    /// # Safety
115    ///
116    /// Calling this method with any out-of-bounds index is Undefined Behavior.
117    /// All indices must be less than `self.len()`.
118    pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
119        if indices.is_empty() {
120            return Vec::new();
121        }
122
123        // Allocate an uninitialized buffer to hold the results. This avoids
124        // the need for `T: Default` and prevents zero-initializing the memory.
125        let mut results: Vec<std::mem::MaybeUninit<T>> = Vec::with_capacity(indices.len());
126        results.resize_with(indices.len(), std::mem::MaybeUninit::uninit);
127
128        results
129            .par_iter_mut()
130            .zip(indices.par_iter())
131            .for_each(|(res_val, &index)| {
132                // Each thread performs a scalar lookup for its assigned indices.
133                // The `get_unchecked` call is thread-safe as it is read-only.
134                unsafe { res_val.write(self.get_unchecked(index)) };
135            });
136
137        // SAFETY: The parallel iteration guarantees that all elements in the
138        // `results` vector have been initialized. We can therefore safely
139        // transmute the `Vec<MaybeUninit<T>>` to a `Vec<T>`.
140        unsafe { std::mem::transmute(results) }
141    }
142}