compressed_intvec/fixed/parallel.rs
1//! # [`FixedVec`] Parallel Operations
2//!
3//! This module provides parallel implementations for [`FixedVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are based on the [Rayon]
5//! library.
6//!
7//! # Examples
8//!
9//! ```
10//! # #[cfg(feature = "parallel")]
11//! # {
12//! use compressed_intvec::fixed::{FixedVec, UFixedVec};
13//! use rayon::prelude::*;
14//!
15//! let data: Vec<u32> = (0..1000).collect();
16//! let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
17//!
18//! // Sum the elements in parallel.
19//! let sum: u32 = vec.par_iter().sum();
20//! assert_eq!(sum, (0..1000).sum());
21//! # }
22//! ```
23//!
24//! [Rayon]: https://docs.rs/rayon/latest/rayon/
25
26#![cfg(feature = "parallel")]
27
28use crate::fixed::{
29 traits::{Storable, Word},
30 Error as FixedVecError, FixedVec,
31};
32use dsi_bitstream::prelude::Endianness;
33use rayon::prelude::*;
34
35impl<T, W, E, B> FixedVec<T, W, E, B>
36where
37 T: Storable<W> + Send + Sync,
38 W: Word + Sync,
39 E: Endianness,
40 B: AsRef<[W]> + Sync,
41{
42 /// Returns a parallel iterator over the decompressed values.
43 ///
44 /// This operation is highly parallelizable because each element can be
45 /// decompressed independently from the others.
46 ///
47 /// # Examples
48 ///
49 /// ```
50 /// # #[cfg(feature = "parallel")]
51 /// # {
52 /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
53 /// use rayon::prelude::*;
54 ///
55 /// let data: Vec<u32> = (0..100).collect();
56 /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
57 ///
58 /// let count = vec.par_iter().filter(|&x| x % 2 == 0).count();
59 /// assert_eq!(count, 50);
60 /// # }
61 /// ```
62 pub fn par_iter(&self) -> impl IndexedParallelIterator<Item = T> + '_ {
63 (0..self.len())
64 .into_par_iter()
65 .map(move |i| unsafe { self.get_unchecked(i) })
66 }
67
68 /// Retrieves multiple elements in parallel.
69 ///
70 /// # Errors
71 ///
72 /// Returns an error if any index is out of bounds.
73 ///
74 /// # Examples
75 ///
76 /// ```
77 /// # #[cfg(feature = "parallel")]
78 /// # {
79 /// use compressed_intvec::fixed::{FixedVec, UFixedVec};
80 ///
81 /// let data: Vec<u32> = (0..100).collect();
82 /// let vec: UFixedVec<u32> = FixedVec::builder().build(&data).unwrap();
83 ///
84 /// let indices = vec![10, 50, 99];
85 /// let values = vec.par_get_many(&indices).unwrap();
86 /// assert_eq!(values, vec![10, 50, 99]);
87 ///
88 /// // An out-of-bounds index will result in an error.
89 /// let invalid_indices = vec![10, 100];
90 /// assert!(vec.par_get_many(&invalid_indices).is_err());
91 /// # }
92 /// ```
93 pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, FixedVecError> {
94 // Perform a single bounds check sequentially first for early failure.
95 if let Some(&index) = indices.iter().find(|&&idx| idx >= self.len()) {
96 return Err(FixedVecError::InvalidParameters(format!(
97 "Index {} out of bounds for vector of length {}",
98 index, self.len
99 )));
100 }
101 // SAFETY: We have pre-checked the bounds of all indices.
102 Ok(unsafe { self.par_get_many_unchecked(indices) })
103 }
104
105 /// Retrieves multiple elements in parallel without bounds checking.
106 ///
107 /// # Safety
108 ///
109 /// Calling this method with any out-of-bounds index is Undefined Behavior.
110 /// All indices must be less than `self.len()`.
111 pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
112 if indices.is_empty() {
113 return Vec::new();
114 }
115
116 // Allocate an uninitialized buffer to hold the results. This avoids
117 // the need for `T: Default` and prevents zero-initializing the memory.
118 let mut results: Vec<std::mem::MaybeUninit<T>> = Vec::with_capacity(indices.len());
119 results.resize_with(indices.len(), std::mem::MaybeUninit::uninit);
120
121 results
122 .par_iter_mut()
123 .zip(indices.par_iter())
124 .for_each(|(res_val, &index)| {
125 // Each thread performs a scalar lookup for its assigned indices.
126 // The `get_unchecked` call is thread-safe as it is read-only.
127 res_val.write(self.get_unchecked(index));
128 });
129
130 // SAFETY: The parallel iteration guarantees that all elements in the
131 // `results` vector have been initialized. We can therefore safely
132 // transmute the `Vec<MaybeUninit<T>>` to a `Vec<T>`.
133 std::mem::transmute(results)
134 }
135}