compressed_intvec/variable/parallel.rs
1//! Parallel operations for [`VarVec`].
2//!
3//! This module provides parallel implementations for [`VarVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate data decompression and access.
7//!
8//! [Rayon]: https://github.com/rayon-rs/rayon
9//! [`VarVec`]: crate::variable::VarVec
10
11use super::{VarVec, VarVecBitReader, VarVecError, traits::Storable};
12use dsi_bitstream::{
13 dispatch::{CodesRead, StaticCodeRead},
14 prelude::{BitRead, BitSeek, Endianness},
15};
16use rayon::prelude::{
17 IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
18};
19
20impl<T, E, B> VarVec<T, E, B>
21where
22 T: Storable + Send + Sync,
23 E: Endianness + Send + Sync,
24 B: AsRef<[u64]> + Send + Sync,
25 for<'a> VarVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
26 + CodesRead<E>
27 + BitSeek<Error = core::convert::Infallible>
28 + Send,
29{
30 /// Returns a parallel iterator over the decompressed values.
31 ///
32 /// This method uses Rayon to decompress the entire vector in parallel. It
33 /// can provide a significant speedup on multi-core systems, especially when
34 /// using a computationally intensive compression codec.
35 ///
36 /// # Performance
37 ///
38 /// For the specific task of full decompression, this parallel version is not
39 /// always faster than the sequential [`iter`](super::VarVec::iter). If the
40 /// decoding operation is very fast (e.g., with [`VByte`](crate::variable::Codec::VByteLe) encoding), the
41 /// operation can be limited by memory bandwidth. In such cases, the
42 /// sequential iterator's better use of CPU caches may outperform this
43 /// parallel version.
44 ///
45 /// # Examples
46 ///
47 /// ```
48 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
49 /// # #[cfg(feature = "parallel")] {
50 /// use compressed_intvec::variable::{VarVec, UVarVec};
51 /// use rayon::prelude::*;
52 ///
53 /// let data: Vec<u32> = (0..1000).collect();
54 /// let vec: UVarVec<u32> = VarVec::from_slice(&data)?;
55 ///
56 /// // Use the parallel iterator to compute the sum in parallel
57 /// let sum: u32 = vec.par_iter().sum();
58 ///
59 /// assert_eq!(sum, (0..1000).sum());
60 /// # }
61 /// # Ok(())
62 /// # }
63 /// ```
64 pub fn par_iter(&self) -> impl ParallelIterator<Item = T> + '_ {
65 let k = self.k;
66 let num_samples = self.samples.len();
67 let num_threads = rayon::current_num_threads();
68 // Divide the sample blocks among the available threads.
69 let chunk_size = num_samples.div_ceil(num_threads).max(1);
70 let num_chunks = num_samples.div_ceil(chunk_size);
71
72 (0..num_chunks).into_par_iter().flat_map(move |chunk_idx| {
73 use crate::common::codec_reader::CodecReader;
74
75 let start_sample_idx = chunk_idx * chunk_size;
76 let end_sample_idx = (start_sample_idx + chunk_size).min(num_samples);
77 let mut bit_reader = VarVecBitReader::<E>::new(
78 dsi_bitstream::impls::MemWordReader::new_inf(self.data.as_ref()),
79 );
80 let mut values = Vec::new();
81 let code_reader = CodecReader::new(self.encoding);
82
83 // Each thread decodes its assigned range of sample blocks.
84 for sample_idx in start_sample_idx..end_sample_idx {
85 let (start_elem_index, end_elem_index) = if k.is_power_of_two() {
86 let k_exp = k.trailing_zeros();
87 (
88 sample_idx << k_exp,
89 ((sample_idx + 1) << k_exp).min(self.len),
90 )
91 } else {
92 (sample_idx * k, ((sample_idx + 1) * k).min(self.len))
93 };
94
95 unsafe {
96 bit_reader
97 .set_bit_pos(self.samples.get_unchecked(sample_idx))
98 .unwrap();
99 }
100
101 for _ in start_elem_index..end_elem_index {
102 let word = code_reader.read(&mut bit_reader).unwrap();
103 values.push(Storable::from_word(word));
104 }
105 }
106 values.into_par_iter()
107 })
108 }
109
110 /// Retrieves multiple elements from a slice of indices in parallel.
111 ///
112 /// This method uses Rayon to parallelize random access. It works by creating
113 /// a separate [`VarVecReader`](super::VarVecReader) for each thread and
114 /// distributing the lookup work among them.
115 ///
116 /// # Errors
117 ///
118 /// Returns [`VarVecError::IndexOutOfBounds`] if any index is out of bounds.
119 ///
120 /// # Examples
121 ///
122 /// ```
123 /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
124 /// # #[cfg(feature = "parallel")] {
125 /// use compressed_intvec::variable::{VarVec, SVarVec};
126 ///
127 /// let data: Vec<i64> = (0..1000).map(|x| x * -1).collect();
128 /// let vec: SVarVec<i64> = VarVec::from_slice(&data)?;
129 ///
130 /// let indices = [500, 10, 999, 0, 250];
131 /// let values = vec.par_get_many(&indices)?;
132 ///
133 /// assert_eq!(values, vec![-500, -10, -999, 0, -250]);
134 /// # }
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, VarVecError> {
139 if indices.is_empty() {
140 return Ok(Vec::new());
141 }
142
143 for &index in indices {
144 if index >= self.len {
145 return Err(VarVecError::IndexOutOfBounds(index));
146 }
147 }
148
149 // SAFETY: We have pre-checked the bounds of all indices.
150 Ok(unsafe { self.par_get_many_unchecked(indices) })
151 }
152
153 /// Retrieves multiple elements in parallel without bounds checking.
154 ///
155 /// # Safety
156 ///
157 /// Calling this method with any out-of-bounds index in the `indices` slice
158 /// is undefined behavior. In debug builds, an assertion will panic.
159 pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
160 #[cfg(debug_assertions)]
161 {
162 for &index in indices {
163 debug_assert!(
164 index < self.len,
165 "Index out of bounds: index was {} but length was {}",
166 index,
167 self.len
168 );
169 }
170 }
171
172 if indices.is_empty() {
173 return Vec::new();
174 }
175
176 let mut results = vec![Storable::from_word(0); indices.len()];
177
178 results.par_iter_mut().enumerate().for_each_init(
179 || self.reader(), // Create a thread-local reader.
180 |reader, (original_pos, res_val)| {
181 let target_index = indices[original_pos];
182 // SAFETY: bounds are guaranteed by the caller.
183 *res_val = unsafe { reader.get_unchecked(target_index) };
184 },
185 );
186
187 results
188 }
189}