Skip to main content

compressed_intvec/variable/
parallel.rs

1//! Parallel operations for [`VarVec`].
2//!
3//! This module provides parallel implementations for [`VarVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate data decompression and access.
7//!
8//! [Rayon]: https://github.com/rayon-rs/rayon
9//! [`VarVec`]: crate::variable::VarVec
10
11use super::{VarVec, VarVecBitReader, VarVecError, traits::Storable};
12use dsi_bitstream::{
13    dispatch::{CodesRead, StaticCodeRead},
14    prelude::{BitRead, BitSeek, Endianness},
15};
16use rayon::prelude::{
17    IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
18};
19
20impl<T, E, B> VarVec<T, E, B>
21where
22    T: Storable + Send + Sync,
23    E: Endianness + Send + Sync,
24    B: AsRef<[u64]> + Send + Sync,
25    for<'a> VarVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
26        + CodesRead<E>
27        + BitSeek<Error = core::convert::Infallible>
28        + Send,
29{
30    /// Returns a parallel iterator over the decompressed values.
31    ///
32    /// This method uses Rayon to decompress the entire vector in parallel. It
33    /// can provide a significant speedup on multi-core systems, especially when
34    /// using a computationally intensive compression codec.
35    ///
36    /// # Performance
37    ///
38    /// For the specific task of full decompression, this parallel version is not
39    /// always faster than the sequential [`iter`](super::VarVec::iter). If the
40    /// decoding operation is very fast (e.g., with [`VByte`](crate::variable::Codec::VByteLe) encoding), the
41    /// operation can be limited by memory bandwidth. In such cases, the
42    /// sequential iterator's better use of CPU caches may outperform this
43    /// parallel version.
44    ///
45    /// # Examples
46    ///
47    /// ```
48    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
49    /// # #[cfg(feature = "parallel")] {
50    /// use compressed_intvec::variable::{VarVec, UVarVec};
51    /// use rayon::prelude::*;
52    ///
53    /// let data: Vec<u32> = (0..1000).collect();
54    /// let vec: UVarVec<u32> = VarVec::from_slice(&data)?;
55    ///
56    /// // Use the parallel iterator to compute the sum in parallel
57    /// let sum: u32 = vec.par_iter().sum();
58    ///
59    /// assert_eq!(sum, (0..1000).sum());
60    /// # }
61    /// # Ok(())
62    /// # }
63    /// ```
64    pub fn par_iter(&self) -> impl ParallelIterator<Item = T> + '_ {
65        let k = self.k;
66        let num_samples = self.samples.len();
67        let num_threads = rayon::current_num_threads();
68        // Divide the sample blocks among the available threads.
69        let chunk_size = num_samples.div_ceil(num_threads).max(1);
70        let num_chunks = num_samples.div_ceil(chunk_size);
71
72        (0..num_chunks).into_par_iter().flat_map(move |chunk_idx| {
73            use crate::common::codec_reader::CodecReader;
74
75            let start_sample_idx = chunk_idx * chunk_size;
76            let end_sample_idx = (start_sample_idx + chunk_size).min(num_samples);
77            let mut bit_reader = VarVecBitReader::<E>::new(
78                dsi_bitstream::impls::MemWordReader::new_inf(self.data.as_ref()),
79            );
80            let mut values = Vec::new();
81            let code_reader = CodecReader::new(self.encoding);
82
83            // Each thread decodes its assigned range of sample blocks.
84            for sample_idx in start_sample_idx..end_sample_idx {
85                let (start_elem_index, end_elem_index) = if k.is_power_of_two() {
86                    let k_exp = k.trailing_zeros();
87                    (
88                        sample_idx << k_exp,
89                        ((sample_idx + 1) << k_exp).min(self.len),
90                    )
91                } else {
92                    (sample_idx * k, ((sample_idx + 1) * k).min(self.len))
93                };
94
95                unsafe {
96                    bit_reader
97                        .set_bit_pos(self.samples.get_unchecked(sample_idx))
98                        .unwrap();
99                }
100
101                for _ in start_elem_index..end_elem_index {
102                    let word = code_reader.read(&mut bit_reader).unwrap();
103                    values.push(Storable::from_word(word));
104                }
105            }
106            values.into_par_iter()
107        })
108    }
109
110    /// Retrieves multiple elements from a slice of indices in parallel.
111    ///
112    /// This method uses Rayon to parallelize random access. It works by creating
113    /// a separate [`VarVecReader`](super::VarVecReader) for each thread and
114    /// distributing the lookup work among them.
115    ///
116    /// # Errors
117    ///
118    /// Returns [`VarVecError::IndexOutOfBounds`] if any index is out of bounds.
119    ///
120    /// # Examples
121    ///
122    /// ```
123    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
124    /// # #[cfg(feature = "parallel")] {
125    /// use compressed_intvec::variable::{VarVec, SVarVec};
126    ///
127    /// let data: Vec<i64> = (0..1000).map(|x| x * -1).collect();
128    /// let vec: SVarVec<i64> = VarVec::from_slice(&data)?;
129    ///
130    /// let indices = [500, 10, 999, 0, 250];
131    /// let values = vec.par_get_many(&indices)?;
132    ///
133    /// assert_eq!(values, vec![-500, -10, -999, 0, -250]);
134    /// # }
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, VarVecError> {
139        if indices.is_empty() {
140            return Ok(Vec::new());
141        }
142
143        for &index in indices {
144            if index >= self.len {
145                return Err(VarVecError::IndexOutOfBounds(index));
146            }
147        }
148
149        // SAFETY: We have pre-checked the bounds of all indices.
150        Ok(unsafe { self.par_get_many_unchecked(indices) })
151    }
152
153    /// Retrieves multiple elements in parallel without bounds checking.
154    ///
155    /// # Safety
156    ///
157    /// Calling this method with any out-of-bounds index in the `indices` slice
158    /// is undefined behavior. In debug builds, an assertion will panic.
159    pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
160        #[cfg(debug_assertions)]
161        {
162            for &index in indices {
163                debug_assert!(
164                    index < self.len,
165                    "Index out of bounds: index was {} but length was {}",
166                    index,
167                    self.len
168                );
169            }
170        }
171
172        if indices.is_empty() {
173            return Vec::new();
174        }
175
176        let mut results = vec![Storable::from_word(0); indices.len()];
177
178        results.par_iter_mut().enumerate().for_each_init(
179            || self.reader(), // Create a thread-local reader.
180            |reader, (original_pos, res_val)| {
181                let target_index = indices[original_pos];
182                // SAFETY: bounds are guaranteed by the caller.
183                *res_val = unsafe { reader.get_unchecked(target_index) };
184            },
185        );
186
187        results
188    }
189}