Skip to main content

vortex_fastlanes/rle/array/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use vortex_array::Array;
5use vortex_array::ArrayRef;
6use vortex_array::dtype::DType;
7use vortex_array::dtype::PType;
8use vortex_array::stats::ArrayStats;
9use vortex_error::VortexResult;
10use vortex_error::vortex_ensure;
11
12use crate::FL_CHUNK_SIZE;
13
14pub mod rle_compress;
15pub mod rle_decompress;
16
17#[derive(Clone, Debug)]
18pub struct RLEArray {
19    pub(super) dtype: DType,
20    /// Run value in the dictionary.
21    pub(super) values: ArrayRef,
22    /// Chunk-local indices from all chunks. The start of each chunk is looked up in `values_idx_offsets`.
23    pub(super) indices: ArrayRef,
24    /// Index start positions of each value chunk.
25    ///
26    /// # Example
27    /// ```
28    /// // Chunk 0: [10, 20] (starts at index 0)
29    /// // Chunk 1: [30, 40] (starts at index 2)
30    /// let values = [10, 20, 30, 40];           // Global values array
31    /// let values_idx_offsets = [0, 2];         // Chunk 0 starts at index 0, Chunk 1 starts at index 2
32    /// ```
33    pub(super) values_idx_offsets: ArrayRef,
34
35    pub(super) stats_set: ArrayStats,
36    // Offset relative to the start of the chunk.
37    pub(super) offset: usize,
38    pub(super) length: usize,
39}
40
41impl RLEArray {
42    fn validate(
43        values: &dyn Array,
44        indices: &dyn Array,
45        value_idx_offsets: &dyn Array,
46        offset: usize,
47    ) -> VortexResult<()> {
48        vortex_ensure!(
49            offset < 1024,
50            "Offset must be smaller than 1024, got {}",
51            offset
52        );
53
54        vortex_ensure!(
55            values.dtype().is_primitive(),
56            "RLE values must be a primitive type, got {}",
57            values.dtype()
58        );
59
60        vortex_ensure!(
61            matches!(indices.dtype().as_ptype(), PType::U8 | PType::U16),
62            "RLE indices must be u8 or u16, got {}",
63            indices.dtype()
64        );
65
66        vortex_ensure!(
67            value_idx_offsets.dtype().is_unsigned_int() && !value_idx_offsets.dtype().is_nullable(),
68            "RLE value idx offsets must be non-nullable unsigned integer, got {}",
69            value_idx_offsets.dtype()
70        );
71
72        vortex_ensure!(
73            indices.len().div_ceil(FL_CHUNK_SIZE) == value_idx_offsets.len(),
74            "RLE must have one value idx offset per chunk, got {}",
75            value_idx_offsets.len()
76        );
77
78        vortex_ensure!(
79            indices.len() >= values.len(),
80            "RLE must have at least as many indices as values, got {} indices and {} values",
81            indices.len(),
82            values.len()
83        );
84
85        Ok(())
86    }
87
88    /// Create a new chunk-based RLE array from its components.
89    ///
90    /// # Arguments
91    ///
92    /// * `values` - Unique values from all chunks
93    /// * `indices` - Chunk-local indices from all chunks
94    /// * `values_idx_offsets` - Start indices for each value chunk.
95    /// * `offset` - Offset into the first chunk
96    /// * `length` - Array length
97    pub fn try_new(
98        values: ArrayRef,
99        indices: ArrayRef,
100        values_idx_offsets: ArrayRef,
101        offset: usize,
102        length: usize,
103    ) -> VortexResult<Self> {
104        assert_eq!(indices.len() % FL_CHUNK_SIZE, 0);
105        Self::validate(&values, &indices, &values_idx_offsets, offset)?;
106
107        // Ensure that the DType has the same nullability as the indices array.
108        let dtype = DType::Primitive(values.dtype().as_ptype(), indices.dtype().nullability());
109
110        Ok(Self {
111            dtype,
112            values,
113            indices,
114            values_idx_offsets,
115            stats_set: ArrayStats::default(),
116            offset,
117            length,
118        })
119    }
120
121    /// Create a new RLEArray without validation.
122    ///
123    /// # Safety
124    /// The caller must ensure that:
125    /// - `offset + length` does not exceed the length of the indices array
126    /// - The `dtype` is consistent with the values array's primitive type and validity nullability
127    /// - The `indices` array contains valid indices into chunks of the `values` array
128    /// - The `values_idx_offsets` array contains valid chunk start offsets
129    /// - The `validity` array has the same length as `length`
130    pub unsafe fn new_unchecked(
131        values: ArrayRef,
132        indices: ArrayRef,
133        values_idx_offsets: ArrayRef,
134        dtype: DType,
135        offset: usize,
136        length: usize,
137    ) -> Self {
138        Self {
139            dtype,
140            values,
141            indices,
142            values_idx_offsets,
143            stats_set: ArrayStats::default(),
144            offset,
145            length,
146        }
147    }
148
149    #[inline]
150    pub fn len(&self) -> usize {
151        self.length
152    }
153
154    #[inline]
155    pub fn is_empty(&self) -> bool {
156        self.length == 0
157    }
158
159    #[inline]
160    pub fn dtype(&self) -> &DType {
161        &self.dtype
162    }
163
164    #[inline]
165    pub fn values(&self) -> &ArrayRef {
166        &self.values
167    }
168
169    #[inline]
170    pub fn indices(&self) -> &ArrayRef {
171        &self.indices
172    }
173
174    #[inline]
175    pub fn values_idx_offsets(&self) -> &ArrayRef {
176        &self.values_idx_offsets
177    }
178
179    /// Values index offset relative to the first chunk.
180    ///
181    /// Offsets in `values_idx_offsets` are absolute and need to be shifted
182    /// by the offset of the first chunk, respective the current slice, in
183    /// order to make them relative.
184    #[expect(
185        clippy::expect_used,
186        reason = "expect is safe here as scalar_at returns a valid primitive"
187    )]
188    pub(crate) fn values_idx_offset(&self, chunk_idx: usize) -> usize {
189        self.values_idx_offsets
190            .scalar_at(chunk_idx)
191            .expect("index must be in bounds")
192            .as_primitive()
193            .as_::<usize>()
194            .expect("index must be of type usize")
195            - self
196                .values_idx_offsets
197                .scalar_at(0)
198                .expect("index must be in bounds")
199                .as_primitive()
200                .as_::<usize>()
201                .expect("index must be of type usize")
202    }
203
204    /// Index offset into the array
205    #[inline]
206    pub fn offset(&self) -> usize {
207        self.offset
208    }
209
210    #[inline]
211    pub(crate) fn stats_set(&self) -> &ArrayStats {
212        &self.stats_set
213    }
214}
215
216#[cfg(test)]
217mod tests {
218    use vortex_array::Array;
219    use vortex_array::ArrayContext;
220    use vortex_array::IntoArray;
221    use vortex_array::ToCanonical;
222    use vortex_array::arrays::PrimitiveArray;
223    use vortex_array::assert_arrays_eq;
224    use vortex_array::dtype::DType;
225    use vortex_array::dtype::Nullability;
226    use vortex_array::dtype::PType;
227    use vortex_array::serde::ArrayParts;
228    use vortex_array::serde::SerializeOptions;
229    use vortex_array::validity::Validity;
230    use vortex_buffer::Buffer;
231    use vortex_buffer::ByteBufferMut;
232
233    use crate::RLEArray;
234    use crate::test::SESSION;
235
236    #[test]
237    fn test_try_new() {
238        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
239
240        // Pad indices to 1024 chunk.
241        let indices =
242            PrimitiveArray::from_iter([0u16, 0, 1, 1, 2].iter().cycle().take(1024).copied())
243                .into_array();
244        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
245        let rle_array = RLEArray::try_new(values, indices, values_idx_offsets, 0, 5).unwrap();
246
247        assert_eq!(rle_array.len(), 5);
248        assert_eq!(rle_array.values().len(), 3);
249        assert_eq!(rle_array.values().dtype().as_ptype(), PType::U32);
250    }
251
252    #[test]
253    fn test_try_new_with_validity() {
254        let values = PrimitiveArray::from_iter([10u32, 20]).into_array();
255        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
256
257        let indices_pattern = [0u16, 1, 0];
258        let validity_pattern = [true, false, true];
259
260        // Pad indices to 1024 chunk.
261        let indices_with_validity = PrimitiveArray::new(
262            indices_pattern
263                .iter()
264                .cycle()
265                .take(1024)
266                .copied()
267                .collect::<Buffer<u16>>(),
268            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
269        )
270        .into_array();
271
272        let rle_array = RLEArray::try_new(
273            values.clone(),
274            indices_with_validity,
275            values_idx_offsets,
276            0,
277            3,
278        )
279        .unwrap();
280
281        assert_eq!(rle_array.len(), 3);
282        assert_eq!(rle_array.values().len(), 2);
283        assert!(rle_array.is_valid(0).unwrap());
284        assert!(!rle_array.is_valid(1).unwrap());
285        assert!(rle_array.is_valid(2).unwrap());
286    }
287
288    #[test]
289    fn test_all_valid() {
290        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
291        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
292
293        let indices_pattern = [0u16, 1, 2, 0, 1];
294        let validity_pattern = [true, true, true, false, false];
295
296        // Pad indices to 1024 chunk.
297        let indices_with_validity = PrimitiveArray::new(
298            indices_pattern
299                .iter()
300                .cycle()
301                .take(1024)
302                .copied()
303                .collect::<Buffer<u16>>(),
304            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
305        )
306        .into_array();
307
308        let rle_array = RLEArray::try_new(
309            values.clone(),
310            indices_with_validity,
311            values_idx_offsets,
312            0,
313            5,
314        )
315        .unwrap();
316
317        let valid_slice = rle_array.slice(0..3).unwrap().to_primitive();
318        // TODO(joe): replace with compute null count
319        assert!(valid_slice.all_valid().unwrap());
320
321        let mixed_slice = rle_array.slice(1..5).unwrap();
322        assert!(!mixed_slice.all_valid().unwrap());
323    }
324
325    #[test]
326    fn test_all_invalid() {
327        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
328        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
329
330        // Pad indices to 1024 chunk.
331        let indices_pattern = [0u16, 1, 2, 0, 1];
332        let validity_pattern = [true, true, false, false, false];
333
334        let indices_with_validity = PrimitiveArray::new(
335            indices_pattern
336                .iter()
337                .cycle()
338                .take(1024)
339                .copied()
340                .collect::<Buffer<u16>>(),
341            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
342        )
343        .into_array();
344
345        let rle_array = RLEArray::try_new(
346            values.clone(),
347            indices_with_validity,
348            values_idx_offsets,
349            0,
350            5,
351        )
352        .unwrap();
353
354        // TODO(joe): replace with compute null count
355        let invalid_slice = rle_array
356            .slice(2..5)
357            .unwrap()
358            .to_canonical()
359            .unwrap()
360            .into_primitive();
361        assert!(invalid_slice.all_invalid().unwrap());
362
363        let mixed_slice = rle_array.slice(1..4).unwrap();
364        assert!(!mixed_slice.all_invalid().unwrap());
365    }
366
367    #[test]
368    fn test_validity_mask() {
369        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
370        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
371
372        // Pad indices to 1024 chunk.
373        let indices_pattern = [0u16, 1, 2, 0];
374        let validity_pattern = [true, false, true, false];
375
376        let indices_with_validity = PrimitiveArray::new(
377            indices_pattern
378                .iter()
379                .cycle()
380                .take(1024)
381                .copied()
382                .collect::<Buffer<u16>>(),
383            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
384        )
385        .into_array();
386
387        let rle_array = RLEArray::try_new(
388            values.clone(),
389            indices_with_validity,
390            values_idx_offsets,
391            0,
392            4,
393        )
394        .unwrap();
395
396        let sliced_array = rle_array.slice(1..4).unwrap();
397        let validity_mask = sliced_array.validity_mask().unwrap();
398
399        let expected_mask = Validity::from_iter([false, true, false]).to_mask(3);
400        assert_eq!(validity_mask.len(), expected_mask.len());
401        assert_eq!(validity_mask, expected_mask);
402    }
403
404    #[test]
405    fn test_try_new_empty() {
406        let values = PrimitiveArray::from_iter(Vec::<u32>::new()).into_array();
407        let indices = PrimitiveArray::from_iter(Vec::<u16>::new()).into_array();
408        let values_idx_offsets = PrimitiveArray::from_iter(Vec::<u64>::new()).into_array();
409        let rle_array = RLEArray::try_new(
410            values,
411            indices.clone(),
412            values_idx_offsets,
413            0,
414            indices.len(),
415        )
416        .unwrap();
417
418        assert_eq!(rle_array.len(), 0);
419        assert_eq!(rle_array.values().len(), 0);
420    }
421
422    #[test]
423    fn test_multi_chunk_two_chunks() {
424        let values = PrimitiveArray::from_iter([10u32, 20, 30, 40]).into_array();
425        let indices = PrimitiveArray::from_iter([0u16, 1].repeat(1024)).into_array();
426        let values_idx_offsets = PrimitiveArray::from_iter([0u64, 2]).into_array();
427        let rle_array = RLEArray::try_new(values, indices, values_idx_offsets, 0, 2048).unwrap();
428
429        assert_eq!(rle_array.len(), 2048);
430        assert_eq!(rle_array.values().len(), 4);
431
432        assert_eq!(rle_array.values_idx_offset(0), 0);
433        assert_eq!(rle_array.values_idx_offset(1), 2);
434    }
435
436    #[test]
437    fn test_rle_serialization() {
438        let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
439        let rle_array = RLEArray::encode(&primitive).unwrap();
440        assert_eq!(rle_array.len(), 2048);
441
442        let original_data = rle_array.to_primitive();
443
444        let ctx = ArrayContext::empty();
445        let serialized = rle_array
446            .to_array()
447            .serialize(&ctx, &SerializeOptions::default())
448            .unwrap();
449
450        let mut concat = ByteBufferMut::empty();
451        for buf in serialized {
452            concat.extend_from_slice(buf.as_ref());
453        }
454        let concat = concat.freeze();
455
456        let parts = ArrayParts::try_from(concat).unwrap();
457        let decoded = parts
458            .decode(
459                &DType::Primitive(PType::U32, Nullability::NonNullable),
460                2048,
461                &ctx,
462                &SESSION,
463            )
464            .unwrap();
465
466        let decoded_data = decoded.to_primitive();
467
468        assert_arrays_eq!(original_data, decoded_data);
469    }
470
471    #[test]
472    fn test_rle_serialization_slice() {
473        let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
474        let rle_array = RLEArray::encode(&primitive).unwrap();
475
476        let sliced = RLEArray::try_new(
477            rle_array.values().clone(),
478            rle_array.indices().clone(),
479            rle_array.values_idx_offsets().clone(),
480            100,
481            100,
482        )
483        .unwrap();
484        assert_eq!(sliced.len(), 100);
485
486        let ctx = ArrayContext::empty();
487        let serialized = sliced
488            .to_array()
489            .serialize(&ctx, &SerializeOptions::default())
490            .unwrap();
491
492        let mut concat = ByteBufferMut::empty();
493        for buf in serialized {
494            concat.extend_from_slice(buf.as_ref());
495        }
496        let concat = concat.freeze();
497
498        let parts = ArrayParts::try_from(concat).unwrap();
499        let decoded = parts
500            .decode(sliced.dtype(), sliced.len(), &ctx, &SESSION)
501            .unwrap();
502
503        let original_data = sliced.to_primitive();
504        let decoded_data = decoded.to_primitive();
505
506        assert_arrays_eq!(original_data, decoded_data);
507    }
508}