compressed_intvec/fixed/parallel.rs
1//! # [`FixedVec`] Parallel Operations
2//!
3//! This module provides parallel implementations for [`FixedVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are based on the [Rayon]
5//! library.
6//!
7//! # Examples
8//!
9//! ```
10//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
11//! # #[cfg(feature = "parallel")]
12//! # {
13//! use compressed_intvec::fixed::{FixedVec, UFixedVec};
14//! use rayon::prelude::*;
15//!
16//! let data: Vec<u32> = (0..1000).collect();
17//! let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
18//!
19//! // Sum the elements in parallel.
20//! let sum: u32 = vec.par_iter().sum();
21//! assert_eq!(sum, (0..1000).sum());
22//! # }
23//! # Ok(())
24//! # }
25//! ```
26//!
27//! [Rayon]: https://docs.rs/rayon/latest/rayon/
28
29use crate::fixed::{
30 Error as FixedVecError, FixedVec,
31 traits::{Storable, Word},
32};
33use dsi_bitstream::prelude::Endianness;
34use rayon::prelude::*;
35
36impl<T, W, E, B> FixedVec<T, W, E, B>
37where
38 T: Storable<W> + Send + Sync,
39 W: Word + Sync,
40 E: Endianness,
41 B: AsRef<[W]> + Sync,
42{
43 /// Returns a parallel iterator over the decompressed values.
44 ///
45 /// This operation is highly parallelizable because each element can be
46 /// decompressed independently from the others.
47 ///
48 /// # Examples
49 ///
50 /// ```
51 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
52 /// # #[cfg(feature = "parallel")]
53 /// # {
54 /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
55 /// use rayon::prelude::*;
56 ///
57 /// let data: Vec<u32> = (0..100).collect();
58 /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
59 ///
60 /// let count = vec.par_iter().filter(|&x| x % 2 == 0).count();
61 /// assert_eq!(count, 50);
62 /// # }
63 /// # Ok(())
64 /// # }
65 /// ```
66 pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = T> + '_ {
67 (0..self.len())
68 .into_par_iter()
69 .map(move |i| unsafe { self.get_unchecked(i) })
70 }
71
72 /// Retrieves multiple elements in parallel.
73 ///
74 /// # Errors
75 ///
76 /// Returns an error if any index is out of bounds.
77 ///
78 /// # Examples
79 ///
80 /// ```
81 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
82 /// # #[cfg(feature = "parallel")]
83 /// # {
84 /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
85 ///
86 /// let data: Vec<u32> = (0..100).collect();
87 /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data)?;
88 ///
89 /// let indices = vec![10, 50, 99];
90 /// let values = vec.par_get_many(&indices)?;
91 /// assert_eq!(values, vec![10, 50, 99]);
92 ///
93 /// // An out-of-bounds index will result in an error.
94 /// let invalid_indices = vec![10, 100];
95 /// assert!(vec.par_get_many(&invalid_indices).is_err());
96 /// # }
97 /// # Ok(())
98 /// # }
99 /// ```
100 pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, FixedVecError> {
101 // Perform a single bounds check sequentially first for early failure.
102 if let Some(&index) = indices.iter().find(|&&idx| idx >= self.len()) {
103 return Err(FixedVecError::InvalidParameters(format!(
104 "Index {} out of bounds for vector of length {}",
105 index, self.len
106 )));
107 }
108 // SAFETY: We have pre-checked the bounds of all indices.
109 Ok(unsafe { self.par_get_many_unchecked(indices) })
110 }
111
112 /// Retrieves multiple elements in parallel without bounds checking.
113 ///
114 /// # Safety
115 ///
116 /// Calling this method with any out-of-bounds index is Undefined Behavior.
117 /// All indices must be less than `self.len()`.
118 pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
119 if indices.is_empty() {
120 return Vec::new();
121 }
122
123 // Allocate an uninitialized buffer to hold the results. This avoids
124 // the need for `T: Default` and prevents zero-initializing the memory.
125 let mut results: Vec<std::mem::MaybeUninit<T>> = Vec::with_capacity(indices.len());
126 results.resize_with(indices.len(), std::mem::MaybeUninit::uninit);
127
128 results
129 .par_iter_mut()
130 .zip(indices.par_iter())
131 .for_each(|(res_val, &index)| {
132 // Each thread performs a scalar lookup for its assigned indices.
133 // The `get_unchecked` call is thread-safe as it is read-only.
134 unsafe { res_val.write(self.get_unchecked(index)) };
135 });
136
137 // SAFETY: The parallel iteration guarantees that all elements in the
138 // `results` vector have been initialized. We can therefore safely
139 // transmute the `Vec<MaybeUninit<T>>` to a `Vec<T>`.
140 unsafe { std::mem::transmute(results) }
141 }
142}