compressed_intvec/variable/
parallel.rs

1//! Parallel operations for [`IntVec`].
2//!
3//! This module provides parallel implementations for [`IntVec`] operations,
4//! enabled by the `parallel` feature flag. These methods are built on the
5//! [Rayon] library and are designed to leverage multi-core architectures to
6//! accelerate data decompression and access.
7//!
8//! [Rayon]: https://github.com/rayon-rs/rayon
9//! [`IntVec`]: crate::variable::IntVec
10
11use super::{traits::Storable, IntVec, IntVecBitReader, IntVecError};
12use dsi_bitstream::{
13    dispatch::{CodesRead, FuncCodeReader, StaticCodeRead},
14    prelude::{BitRead, BitSeek, Endianness},
15};
16use rayon::prelude::{
17    IndexedParallelIterator, IntoParallelIterator, IntoParallelRefMutIterator, ParallelIterator,
18};
19
20#[cfg(feature = "parallel")]
21impl<T, E, B> IntVec<T, E, B>
22where
23    T: Storable + Send + Sync,
24    E: Endianness + Send + Sync,
25    B: AsRef<[u64]> + Send + Sync,
26    for<'a> IntVecBitReader<'a, E>: BitRead<E, Error = core::convert::Infallible>
27        + CodesRead<E>
28        + BitSeek<Error = core::convert::Infallible>
29        + Send,
30{
31    /// Returns a parallel iterator over the decompressed values.
32    ///
33    /// This method uses Rayon to decompress the entire vector in parallel. It
34    /// can provide a significant speedup on multi-core systems, especially when
35    /// using a computationally intensive compression codec.
36    ///
37    /// # Performance
38    ///
39    /// For the specific task of full decompression, this parallel version is not
40    /// always faster than the sequential [`iter`](super::IntVec::iter). If the
41    /// decoding operation is very fast (e.g., with `VByte` encoding), the
42    /// operation can be limited by memory bandwidth. In such cases, the
43    /// sequential iterator's better use of CPU caches may outperform this
44    /// parallel version.
45    ///
46    /// # Examples
47    ///
48    /// ```
49    /// # #[cfg(feature = "parallel")] {
50    /// use compressed_intvec::variable::{IntVec, UIntVec};
51    /// use rayon::prelude::*;
52    ///
53    /// let data: Vec<u32> = (0..1000).collect();
54    /// let vec: UIntVec<u32> = IntVec::from_slice(&data).unwrap();
55    ///
56    /// // Use the parallel iterator to compute the sum in parallel
57    /// let sum: u32 = vec.par_iter().sum();
58    ///
59    /// assert_eq!(sum, (0..1000).sum());
60    /// # }
61    /// ```
62    pub fn par_iter(&self) -> impl ParallelIterator<Item = T> + '_ {
63        let k = self.k;
64        let num_samples = self.samples.len();
65        let num_threads = rayon::current_num_threads();
66        // Divide the sample blocks among the available threads.
67        let chunk_size = num_samples.div_ceil(num_threads).max(1);
68        let num_chunks = num_samples.div_ceil(chunk_size);
69
70        (0..num_chunks).into_par_iter().flat_map(move |chunk_idx| {
71            let start_sample_idx = chunk_idx * chunk_size;
72            let end_sample_idx = (start_sample_idx + chunk_size).min(num_samples);
73            let mut bit_reader = IntVecBitReader::<E>::new(dsi_bitstream::impls::MemWordReader::new(
74                self.data.as_ref(),
75            ));
76            let mut values = Vec::new();
77            let code_reader = FuncCodeReader::<E, _>::new(self.encoding).unwrap();
78
79            // Each thread decodes its assigned range of sample blocks.
80            for sample_idx in start_sample_idx..end_sample_idx {
81                let start_elem_index = sample_idx * k;
82                let end_elem_index = ((sample_idx + 1) * k).min(self.len);
83
84                unsafe {
85                    bit_reader
86                        .set_bit_pos(self.samples.get_unchecked(sample_idx))
87                        .unwrap();
88                }
89
90                for _ in start_elem_index..end_elem_index {
91                    let word = code_reader.read(&mut bit_reader).unwrap();
92                    values.push(Storable::from_word(word));
93                }
94            }
95            values.into_par_iter()
96        })
97    }
98
99    /// Retrieves multiple elements from a slice of indices in parallel.
100    ///
101    /// This method uses Rayon to parallelize random access. It works by creating
102    /// a separate [`IntVecReader`](super::IntVecReader) for each thread and
103    /// distributing the lookup work among them.
104    ///
105    /// # Errors
106    ///
107    /// Returns [`IntVecError::IndexOutOfBounds`] if any index is out of bounds.
108    ///
109    /// # Examples
110    ///
111    /// ```
112    /// # #[cfg(feature = "parallel")] {
113    /// use compressed_intvec::variable::{IntVec, SIntVec};
114    ///
115    /// let data: Vec<i64> = (0..1000).map(|x| x * -1).collect();
116    /// let vec: SIntVec<i64> = IntVec::from_slice(&data).unwrap();
117    ///
118    /// let indices = [500, 10, 999, 0, 250];
119    /// let values = vec.par_get_many(&indices).unwrap();
120    ///
121    /// assert_eq!(values, vec![-500, -10, -999, 0, -250]);
122    /// # }
123    /// ```
124    pub fn par_get_many(&self, indices: &[usize]) -> Result<Vec<T>, IntVecError> {
125        if indices.is_empty() {
126            return Ok(Vec::new());
127        }
128
129        for &index in indices {
130            if index >= self.len {
131                return Err(IntVecError::IndexOutOfBounds(index));
132            }
133        }
134
135        // SAFETY: We have pre-checked the bounds of all indices.
136        Ok(unsafe { self.par_get_many_unchecked(indices) })
137    }
138
139    /// Retrieves multiple elements in parallel without bounds checking.
140    ///
141    /// # Safety
142    ///
143    /// Calling this method with any out-of-bounds index in the `indices` slice
144    /// is undefined behavior. In debug builds, an assertion will panic.
145    pub unsafe fn par_get_many_unchecked(&self, indices: &[usize]) -> Vec<T> {
146        #[cfg(debug_assertions)]
147        {
148            for &index in indices {
149                debug_assert!(
150                    index < self.len,
151                    "Index out of bounds: index was {} but length was {}",
152                    index,
153                    self.len
154                );
155            }
156        }
157
158        if indices.is_empty() {
159            return Vec::new();
160        }
161
162        let mut results = vec![Storable::from_word(0); indices.len()];
163
164        results.par_iter_mut().enumerate().for_each_init(
165            || self.reader(), // Create a thread-local reader.
166            |reader, (original_pos, res_val)| {
167                let target_index = indices[original_pos];
168                // SAFETY: bounds are guaranteed by the caller.
169                *res_val = unsafe { reader.get_unchecked(target_index) };
170            },
171        );
172
173        results
174    }
175}