compressed_intvec/variable/parallel.rs
1//! Parallel operations for [`IntVec`].
2//!
3//! This module provides parallel implementations for [`IntVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate data decompression and access.
7//!
8//! [Rayon]: https://github.com/rayon-rs/rayon
9//! [`IntVec`]: crate::variable::IntVec
10
11use super::{traits::Storable, IntVec, IntVecBitReader, IntVecError};
12use dsi_bitstream::{
13 dispatch::{CodesRead, FuncCodeReader, StaticCodeRead},
14 prelude::{BitRead, BitSeek, Endianness},
15};
16use rayon::prelude::{
17 IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
18};
19
20#[cfg(feature = "parallel")]
21impl<T, E, B> IntVec<T, E, B>
22where
23 T: Storable + Send + Sync,
24 E: Endianness + Send + Sync,
25 B: AsRef<[u64]> + Send + Sync,
26 for<'a> IntVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
27 + CodesRead<E>
28 + BitSeek<Error = core::convert::Infallible>
29 + Send,
30{
31 /// Returns a parallel iterator over the decompressed values.
32 ///
33 /// This method uses Rayon to decompress the entire vector in parallel. It
34 /// can provide a significant speedup on multi-core systems, especially when
35 /// using a computationally intensive compression codec.
36 ///
37 /// # Performance
38 ///
39 /// For the specific task of full decompression, this parallel version is not
40 /// always faster than the sequential [`iter`](super::IntVec::iter). If the
41 /// decoding operation is very fast (e.g., with `VByte` encoding), the
42 /// operation can be limited by memory bandwidth. In such cases, the
43 /// sequential iterator's better use of CPU caches may outperform this
44 /// parallel version.
45 ///
46 /// # Examples
47 ///
48 /// ```
49 /// # #[cfg(feature = "parallel")] {
50 /// use compressed_intvec::variable::{IntVec, UIntVec};
51 /// use rayon::prelude::*;
52 ///
53 /// let data: Vec<u32> = (0..1000).collect();
54 /// let vec: UIntVec<u32> = IntVec::from_slice(&data).unwrap();
55 ///
56 /// // Use the parallel iterator to compute the sum in parallel
57 /// let sum: u32 = vec.par_iter().sum();
58 ///
59 /// assert_eq!(sum, (0..1000).sum());
60 /// # }
61 /// ```
62 pub fn par_iter(&self) -> impl ParallelIterator<Item = T> + '_ {
63 let k = self.k;
64 let num_samples = self.samples.len();
65 let num_threads = rayon::current_num_threads();
66 // Divide the sample blocks among the available threads.
67 let chunk_size = num_samples.div_ceil(num_threads).max(1);
68 let num_chunks = num_samples.div_ceil(chunk_size);
69
70 (0..num_chunks).into_par_iter().flat_map(move |chunk_idx| {
71 let start_sample_idx = chunk_idx * chunk_size;
72 let end_sample_idx = (start_sample_idx + chunk_size).min(num_samples);
73 let mut bit_reader = IntVecBitReader::<E>::new(dsi_bitstream::impls::MemWordReader::new(
74 self.data.as_ref(),
75 ));
76 let mut values = Vec::new();
77 let code_reader = FuncCodeReader::<E, _>::new(self.encoding).unwrap();
78
79 // Each thread decodes its assigned range of sample blocks.
80 for sample_idx in start_sample_idx..end_sample_idx {
81 let start_elem_index = sample_idx * k;
82 let end_elem_index = ((sample_idx + 1) * k).min(self.len);
83
84 unsafe {
85 bit_reader
86 .set_bit_pos(self.samples.get_unchecked(sample_idx))
87 .unwrap();
88 }
89
90 for _ in start_elem_index..end_elem_index {
91 let word = code_reader.read(&mut bit_reader).unwrap();
92 values.push(Storable::from_word(word));
93 }
94 }
95 values.into_par_iter()
96 })
97 }
98
99 /// Retrieves multiple elements from a slice of indices in parallel.
100 ///
101 /// This method uses Rayon to parallelize random access. It works by creating
102 /// a separate [`IntVecReader`](super::IntVecReader) for each thread and
103 /// distributing the lookup work among them.
104 ///
105 /// # Errors
106 ///
107 /// Returns [`IntVecError::IndexOutOfBounds`] if any index is out of bounds.
108 ///
109 /// # Examples
110 ///
111 /// ```
112 /// # #[cfg(feature = "parallel")] {
113 /// use compressed_intvec::variable::{IntVec, SIntVec};
114 ///
115 /// let data: Vec<i64> = (0..1000).map(|x| x * -1).collect();
116 /// let vec: SIntVec<i64> = IntVec::from_slice(&data).unwrap();
117 ///
118 /// let indices = [500, 10, 999, 0, 250];
119 /// let values = vec.par_get_many(&indices).unwrap();
120 ///
121 /// assert_eq!(values, vec![-500, -10, -999, 0, -250]);
122 /// # }
123 /// ```
124 pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, IntVecError> {
125 if indices.is_empty() {
126 return Ok(Vec::new());
127 }
128
129 for &index in indices {
130 if index >= self.len {
131 return Err(IntVecError::IndexOutOfBounds(index));
132 }
133 }
134
135 // SAFETY: We have pre-checked the bounds of all indices.
136 Ok(unsafe { self.par_get_many_unchecked(indices) })
137 }
138
139 /// Retrieves multiple elements in parallel without bounds checking.
140 ///
141 /// # Safety
142 ///
143 /// Calling this method with any out-of-bounds index in the `indices` slice
144 /// is undefined behavior. In debug builds, an assertion will panic.
145 pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
146 #[cfg(debug_assertions)]
147 {
148 for &index in indices {
149 debug_assert!(
150 index < self.len,
151 "Index out of bounds: index was {} but length was {}",
152 index,
153 self.len
154 );
155 }
156 }
157
158 if indices.is_empty() {
159 return Vec::new();
160 }
161
162 let mut results = vec![Storable::from_word(0); indices.len()];
163
164 results.par_iter_mut().enumerate().for_each_init(
165 || self.reader(), // Create a thread-local reader.
166 |reader, (original_pos, res_val)| {
167 let target_index = indices[original_pos];
168 // SAFETY: bounds are guaranteed by the caller.
169 *res_val = unsafe { reader.get_unchecked(target_index) };
170 },
171 );
172
173 results
174 }
175}