Skip to main content

vortex_fastlanes/rle/array/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Display;
5use std::fmt::Formatter;
6
7use vortex_array::ArrayRef;
8use vortex_array::ExecutionCtx;
9use vortex_array::TypedArrayRef;
10use vortex_error::VortexExpect as _;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13
14pub mod rle_compress;
15pub mod rle_decompress;
16
17/// Run values in the dictionary.
18pub(super) const VALUES_SLOT: usize = 0;
19/// Chunk-local indices from all chunks. The start of each chunk is looked up in `values_idx_offsets`.
20pub(super) const INDICES_SLOT: usize = 1;
21/// Index start positions of each value chunk.
22///
23/// # Example
24/// ```text
25/// // Chunk 0: [10, 20] (starts at index 0)
26/// // Chunk 1: [30, 40] (starts at index 2)
27/// let values = [10, 20, 30, 40];           // Global values array
28/// let values_idx_offsets = [0, 2];         // Chunk 0 starts at index 0, Chunk 1 starts at index 2
29/// ```
30pub(super) const VALUES_IDX_OFFSETS_SLOT: usize = 2;
31pub(super) const NUM_SLOTS: usize = 3;
32pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["values", "indices", "values_idx_offsets"];
33
34#[derive(Clone, Debug)]
35pub struct RLEData {
36    // Offset relative to the start of the chunk.
37    pub(super) offset: usize,
38}
39
40impl Display for RLEData {
41    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42        write!(f, "offset: {}", self.offset)
43    }
44}
45
46impl RLEData {
47    /// Create a new chunk-based RLE array from its components.
48    ///
49    /// # Arguments
50    ///
51    /// * `values` - Unique values from all chunks
52    /// * `indices` - Chunk-local indices from all chunks
53    /// * `values_idx_offsets` - Start indices for each value chunk.
54    /// * `offset` - Offset into the first chunk
55    /// * `length` - Array length
56    pub fn try_new(offset: usize) -> VortexResult<Self> {
57        vortex_ensure!(
58            offset < 1024,
59            "Offset must be smaller than 1024, got {}",
60            offset
61        );
62        Ok(Self { offset })
63    }
64
65    /// Create a new RLEArray without validation.
66    ///
67    /// # Safety
68    /// The caller must ensure that:
69    /// - `offset + length` does not exceed the length of the indices array
70    /// - The `indices` array contains valid indices into chunks of the `values` array
71    /// - The `values_idx_offsets` array contains valid chunk start offsets
72    pub unsafe fn new_unchecked(offset: usize) -> Self {
73        Self { offset }
74    }
75
76    #[inline]
77    pub fn offset(&self) -> usize {
78        self.offset
79    }
80}
81
82pub trait RLEArrayExt: TypedArrayRef<crate::RLE> {
83    #[inline]
84    fn values(&self) -> &ArrayRef {
85        self.as_ref().slots()[VALUES_SLOT]
86            .as_ref()
87            .vortex_expect("RLEArray values slot must be populated")
88    }
89
90    #[inline]
91    fn indices(&self) -> &ArrayRef {
92        self.as_ref().slots()[INDICES_SLOT]
93            .as_ref()
94            .vortex_expect("RLEArray indices slot must be populated")
95    }
96
97    #[inline]
98    fn values_idx_offsets(&self) -> &ArrayRef {
99        self.as_ref().slots()[VALUES_IDX_OFFSETS_SLOT]
100            .as_ref()
101            .vortex_expect("RLEArray values_idx_offsets slot must be populated")
102    }
103
104    /// Values index offset relative to the first chunk.
105    ///
106    /// Offsets in `values_idx_offsets` are absolute and need to be shifted
107    /// by the offset of the first chunk, respective the current slice, in
108    /// order to make them relative.
109    #[expect(
110        clippy::expect_used,
111        reason = "expect is safe here as scalar_at returns a valid primitive"
112    )]
113    fn values_idx_offset(&self, chunk_idx: usize, ctx: &mut ExecutionCtx) -> usize {
114        self.values_idx_offsets()
115            .execute_scalar(chunk_idx, ctx)
116            .expect("index must be in bounds")
117            .as_primitive()
118            .as_::<usize>()
119            .expect("index must be of type usize")
120            - self
121                .values_idx_offsets()
122                .execute_scalar(0, ctx)
123                .expect("index must be in bounds")
124                .as_primitive()
125                .as_::<usize>()
126                .expect("index must be of type usize")
127    }
128
129    /// Index offset into the array
130    #[inline]
131    fn offset(&self) -> usize {
132        self.offset
133    }
134}
135
136impl<T: TypedArrayRef<crate::RLE>> RLEArrayExt for T {}
137
138#[cfg(test)]
139mod tests {
140    use vortex_array::ArrayContext;
141    use vortex_array::Canonical;
142    use vortex_array::IntoArray;
143    use vortex_array::LEGACY_SESSION;
144    use vortex_array::VortexSessionExecute;
145    use vortex_array::arrays::PrimitiveArray;
146    use vortex_array::arrays::primitive::PrimitiveArrayExt;
147    use vortex_array::assert_arrays_eq;
148    use vortex_array::dtype::DType;
149    use vortex_array::dtype::Nullability;
150    use vortex_array::dtype::PType;
151    use vortex_array::serde::SerializeOptions;
152    use vortex_array::serde::SerializedArray;
153    use vortex_array::validity::Validity;
154    use vortex_buffer::Buffer;
155    use vortex_buffer::ByteBufferMut;
156    use vortex_error::VortexExpect;
157    use vortex_error::VortexResult;
158    use vortex_session::registry::ReadContext;
159
160    use crate::FL_CHUNK_SIZE;
161    use crate::RLE;
162    use crate::RLEData;
163    use crate::rle::array::RLEArrayExt;
164    use crate::test::SESSION;
165
166    #[test]
167    fn test_try_new() {
168        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
169
170        // Pad indices to 1024 chunk.
171        let indices =
172            PrimitiveArray::from_iter([0u16, 0, 1, 1, 2].iter().cycle().take(1024).copied())
173                .into_array();
174        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
175        let rle_array = RLE::try_new(values, indices, values_idx_offsets, 0, 5)
176            .vortex_expect("RLEData is always valid");
177
178        assert_eq!(rle_array.len(), 5);
179        assert_eq!(rle_array.values().len(), 3);
180        assert_eq!(rle_array.values().dtype().as_ptype(), PType::U32);
181    }
182
183    #[test]
184    fn test_try_new_with_validity() {
185        let values = PrimitiveArray::from_iter([10u32, 20]).into_array();
186        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
187
188        let indices_pattern = [0u16, 1, 0];
189        let validity_pattern = [true, false, true];
190
191        // Pad indices to 1024 chunk.
192        let indices_with_validity = PrimitiveArray::new(
193            indices_pattern
194                .iter()
195                .cycle()
196                .take(1024)
197                .copied()
198                .collect::<Buffer<u16>>(),
199            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
200        )
201        .into_array();
202
203        let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 3)
204            .vortex_expect("RLEData is always valid");
205
206        assert_eq!(rle_array.len(), 3);
207        assert_eq!(rle_array.values().len(), 2);
208        let mut ctx = SESSION.create_execution_ctx();
209        assert!(rle_array.is_valid(0, &mut ctx).unwrap());
210        assert!(!rle_array.is_valid(1, &mut ctx).unwrap());
211        assert!(rle_array.is_valid(2, &mut ctx).unwrap());
212    }
213
214    #[test]
215    fn test_all_valid() {
216        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
217        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
218
219        let indices_pattern = [0u16, 1, 2, 0, 1];
220        let validity_pattern = [true, true, true, false, false];
221
222        // Pad indices to 1024 chunk.
223        let indices_with_validity = PrimitiveArray::new(
224            indices_pattern
225                .iter()
226                .cycle()
227                .take(1024)
228                .copied()
229                .collect::<Buffer<u16>>(),
230            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
231        )
232        .into_array();
233
234        let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 5)
235            .vortex_expect("RLEData is always valid");
236
237        let mut ctx = SESSION.create_execution_ctx();
238        let valid_slice = rle_array
239            .slice(0..3)
240            .unwrap()
241            .execute::<PrimitiveArray>(&mut ctx)
242            .unwrap();
243        // TODO(joe): replace with compute null count
244        assert!(valid_slice.all_valid(&mut ctx).unwrap());
245
246        let mixed_slice = rle_array.slice(1..5).unwrap();
247        assert!(!mixed_slice.all_valid(&mut ctx).unwrap());
248    }
249
250    #[test]
251    fn test_all_invalid() {
252        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
253        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
254
255        // Pad indices to 1024 chunk.
256        let indices_pattern = [0u16, 1, 2, 0, 1];
257        let validity_pattern = [true, true, false, false, false];
258
259        let indices_with_validity = PrimitiveArray::new(
260            indices_pattern
261                .iter()
262                .cycle()
263                .take(1024)
264                .copied()
265                .collect::<Buffer<u16>>(),
266            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
267        )
268        .into_array();
269
270        let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 5)
271            .vortex_expect("RLEData is always valid");
272
273        // TODO(joe): replace with compute null count
274        let invalid_slice = rle_array
275            .slice(2..5)
276            .unwrap()
277            .execute::<Canonical>(&mut LEGACY_SESSION.create_execution_ctx())
278            .unwrap()
279            .into_primitive();
280        let mut ctx = SESSION.create_execution_ctx();
281        assert!(invalid_slice.all_invalid(&mut ctx).unwrap());
282
283        let mixed_slice = rle_array.slice(1..4).unwrap();
284        assert!(!mixed_slice.all_invalid(&mut ctx).unwrap());
285    }
286
287    #[test]
288    fn test_validity_mask() {
289        let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
290        let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
291
292        // Pad indices to 1024 chunk.
293        let indices_pattern = [0u16, 1, 2, 0];
294        let validity_pattern = [true, false, true, false];
295
296        let indices_with_validity = PrimitiveArray::new(
297            indices_pattern
298                .iter()
299                .cycle()
300                .take(1024)
301                .copied()
302                .collect::<Buffer<u16>>(),
303            Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
304        )
305        .into_array();
306
307        let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 4)
308            .vortex_expect("RLEData is always valid");
309
310        let sliced_array = rle_array.slice(1..4).unwrap();
311        let validity_mask = sliced_array
312            .validity()
313            .unwrap()
314            .execute_mask(
315                sliced_array.len(),
316                &mut LEGACY_SESSION.create_execution_ctx(),
317            )
318            .unwrap();
319
320        let mut ctx = LEGACY_SESSION.create_execution_ctx();
321        let expected_mask = Validity::from_iter([false, true, false])
322            .execute_mask(3, &mut ctx)
323            .unwrap();
324        assert_eq!(validity_mask.len(), expected_mask.len());
325        assert_eq!(validity_mask, expected_mask);
326        assert_eq!(validity_mask.len(), expected_mask.len());
327        assert_eq!(validity_mask, expected_mask);
328    }
329
330    #[test]
331    fn test_try_new_empty() {
332        let values = PrimitiveArray::from_iter(Vec::<u32>::new()).into_array();
333        let indices = PrimitiveArray::from_iter(Vec::<u16>::new()).into_array();
334        let values_idx_offsets = PrimitiveArray::from_iter(Vec::<u64>::new()).into_array();
335        let rle_array = RLE::try_new(
336            values,
337            indices.clone(),
338            values_idx_offsets,
339            0,
340            indices.len(),
341        )
342        .vortex_expect("RLEData is always valid");
343
344        assert_eq!(rle_array.len(), 0);
345        assert_eq!(rle_array.values().len(), 0);
346    }
347
348    #[test]
349    fn test_multi_chunk_two_chunks() {
350        let mut ctx = LEGACY_SESSION.create_execution_ctx();
351        let values = PrimitiveArray::from_iter([10u32, 20, 30, 40]).into_array();
352        let indices = PrimitiveArray::from_iter([0u16, 1].repeat(1024)).into_array();
353        let values_idx_offsets = PrimitiveArray::from_iter([0u64, 2]).into_array();
354        let rle_array = RLE::try_new(values, indices, values_idx_offsets, 0, 2048)
355            .vortex_expect("RLEData is always valid");
356
357        assert_eq!(rle_array.len(), 2048);
358        assert_eq!(rle_array.values().len(), 4);
359
360        assert_eq!(rle_array.values_idx_offset(0, &mut ctx), 0);
361        assert_eq!(rle_array.values_idx_offset(1, &mut ctx), 2);
362    }
363
364    #[test]
365    fn test_rle_serialization() -> VortexResult<()> {
366        let mut exec_ctx = SESSION.create_execution_ctx();
367        let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
368        let rle_array = RLEData::encode(primitive.as_view(), &mut exec_ctx)?;
369        assert_eq!(rle_array.len(), 2048);
370
371        let original_data = rle_array
372            .as_array()
373            .clone()
374            .execute::<PrimitiveArray>(&mut exec_ctx)?;
375
376        let ctx = ArrayContext::empty();
377        let serialized =
378            rle_array
379                .into_array()
380                .serialize(&ctx, &SESSION, &SerializeOptions::default())?;
381
382        let mut concat = ByteBufferMut::empty();
383        for buf in serialized {
384            concat.extend_from_slice(buf.as_ref());
385        }
386        let concat = concat.freeze();
387
388        let parts = SerializedArray::try_from(concat)?;
389        let decoded = parts.decode(
390            &DType::Primitive(PType::U32, Nullability::NonNullable),
391            2048,
392            &ReadContext::new(ctx.to_ids()),
393            &SESSION,
394        )?;
395
396        let decoded_data = decoded.execute::<PrimitiveArray>(&mut exec_ctx)?;
397
398        assert_arrays_eq!(original_data, decoded_data);
399        Ok(())
400    }
401
402    #[test]
403    fn test_rle_serialization_slice() -> VortexResult<()> {
404        let mut exec_ctx = SESSION.create_execution_ctx();
405        let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
406        let rle_array = RLEData::encode(primitive.as_view(), &mut exec_ctx)?;
407
408        let sliced = RLE::try_new(
409            rle_array.values().clone(),
410            rle_array.indices().clone(),
411            rle_array.values_idx_offsets().clone(),
412            100,
413            100,
414        )
415        .vortex_expect("RLEData is always valid");
416        assert_eq!(sliced.len(), 100);
417
418        let ctx = ArrayContext::empty();
419        let serialized =
420            sliced
421                .clone()
422                .into_array()
423                .serialize(&ctx, &SESSION, &SerializeOptions::default())?;
424
425        let mut concat = ByteBufferMut::empty();
426        for buf in serialized {
427            concat.extend_from_slice(buf.as_ref());
428        }
429        let concat = concat.freeze();
430
431        let parts = SerializedArray::try_from(concat)?;
432        let decoded = parts.decode(
433            sliced.dtype(),
434            sliced.len(),
435            &ReadContext::new(ctx.to_ids()),
436            &SESSION,
437        )?;
438
439        let original_data = sliced
440            .as_array()
441            .clone()
442            .execute::<PrimitiveArray>(&mut exec_ctx)?;
443        let decoded_data = decoded.execute::<PrimitiveArray>(&mut exec_ctx)?;
444
445        assert_arrays_eq!(original_data, decoded_data);
446        Ok(())
447    }
448
449    /// Regression test: re-encoding RLE indices with RLE must not corrupt
450    /// chunk-local index values via cross-chunk fill-forward.
451    ///
452    /// The scenario: an array spanning 2 chunks where chunk 0 has 2 distinct
453    /// non-null values (producing chunk-local indices 0 and 1) and chunk 1 is
454    /// entirely null. When fill_forward_nulls propagated the last valid index
455    /// (1) from chunk 0 into chunk 1 during re-encoding, decoding panicked
456    /// because chunk 1 only had 1 unique value and index 1 was out of bounds.
457    #[test]
458    fn test_recompress_indices_no_cross_chunk_leak() -> VortexResult<()> {
459        let mut ctx = SESSION.create_execution_ctx();
460        let len = FL_CHUNK_SIZE + 100;
461        let mut values: Vec<Option<i16>> = vec![None; len];
462        // Two distinct values in chunk 0 → indices 0 and 1.
463        values[0] = Some(10);
464        values[500] = Some(20);
465        // Chunk 1 (positions 1024..) is all-null.
466
467        let original = PrimitiveArray::from_option_iter(values);
468        let rle = RLEData::encode(original.as_view(), &mut ctx)?;
469
470        // Simulate cascading compression: narrow u16->u8 then re-encode with RLE,
471        // matching the path taken by the BtrBlocks compressor.
472        let indices_prim = rle
473            .indices()
474            .clone()
475            .execute::<PrimitiveArray>(&mut ctx)?
476            .narrow(&mut ctx)?;
477        let re_encoded = RLEData::encode(indices_prim.as_view(), &mut ctx)?;
478
479        // Reconstruct the outer RLE with re-encoded indices.
480        // SAFETY: we only replace the indices child; all other invariants hold.
481        let reconstructed = unsafe {
482            RLE::new_unchecked(
483                rle.values().clone(),
484                re_encoded.into_array(),
485                rle.values_idx_offsets().clone(),
486                rle.offset(),
487                rle.len(),
488            )
489        };
490
491        // Decompress — panicked before the fill_forward_nulls chunk-boundary fix.
492        let decoded = reconstructed
493            .as_array()
494            .clone()
495            .execute::<PrimitiveArray>(&mut ctx)?;
496        assert_arrays_eq!(decoded, original);
497        Ok(())
498    }
499}