Skip to main content

vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::IntoArray;
7use vortex_array::ToCanonical;
8use vortex_array::arrays::BoolArray;
9use vortex_array::arrays::ConstantArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinViewArray;
12use vortex_array::buffer::BufferHandle;
13use vortex_array::dtype::NativePType;
14use vortex_array::dtype::Nullability;
15use vortex_array::expr::stats::Precision;
16use vortex_array::expr::stats::Stat;
17use vortex_array::match_each_native_ptype;
18use vortex_array::match_each_unsigned_integer_ptype;
19use vortex_array::scalar::Scalar;
20use vortex_array::validity::Validity;
21use vortex_array::vtable::ValidityHelper;
22use vortex_buffer::BitBuffer;
23use vortex_buffer::BitBufferMut;
24use vortex_buffer::Buffer;
25use vortex_buffer::BufferMut;
26use vortex_buffer::buffer;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_mask::Mask;
30
31use crate::iter::trimmed_ends_iter;
32
33/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
34pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
35    let validity = match array.validity() {
36        Validity::NonNullable => None,
37        Validity::AllValid => None,
38        Validity::AllInvalid => {
39            // We can trivially return an all-null REE array
40            let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
41            ends.statistics()
42                .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
43            return (
44                ends,
45                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
46            );
47        }
48        Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
49    };
50
51    let (ends, values) = match validity {
52        None => {
53            match_each_native_ptype!(array.ptype(), |P| {
54                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
55                (
56                    PrimitiveArray::new(ends, Validity::NonNullable),
57                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
58                )
59            })
60        }
61        Some(validity) => {
62            match_each_native_ptype!(array.ptype(), |P| {
63                let (ends, values) =
64                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
65                (
66                    PrimitiveArray::new(ends, Validity::NonNullable),
67                    values.into_array(),
68                )
69            })
70        }
71    };
72
73    let ends = ends
74        .narrow()
75        .vortex_expect("Ends must succeed downcasting")
76        .to_primitive();
77
78    ends.statistics()
79        .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
80
81    (ends, values)
82}
83
84fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
85    let mut ends = BufferMut::empty();
86    let mut values = BufferMut::empty();
87
88    if elements.is_empty() {
89        return (ends.freeze(), values.freeze());
90    }
91
92    // Run-end encode the values
93    let mut prev = elements[0];
94    let mut end = 1;
95    for &e in elements.iter().skip(1) {
96        if e != prev {
97            ends.push(end);
98            values.push(prev);
99        }
100        prev = e;
101        end += 1;
102    }
103    ends.push(end);
104    values.push(prev);
105
106    (ends.freeze(), values.freeze())
107}
108
109fn runend_encode_nullable_primitive<T: NativePType>(
110    elements: &[T],
111    element_validity: BitBuffer,
112) -> (Buffer<u64>, PrimitiveArray) {
113    let mut ends = BufferMut::empty();
114    let mut values = BufferMut::empty();
115    let mut validity = BitBufferMut::with_capacity(values.capacity());
116
117    if elements.is_empty() {
118        return (
119            ends.freeze(),
120            PrimitiveArray::new(
121                values,
122                Validity::Array(BoolArray::from(validity.freeze()).into_array()),
123            ),
124        );
125    }
126
127    // Run-end encode the values
128    let mut prev = element_validity.value(0).then(|| elements[0]);
129    let mut end = 1;
130    for e in elements
131        .iter()
132        .zip(element_validity.iter())
133        .map(|(&e, is_valid)| is_valid.then_some(e))
134        .skip(1)
135    {
136        if e != prev {
137            ends.push(end);
138            match prev {
139                None => {
140                    validity.append(false);
141                    values.push(T::default());
142                }
143                Some(p) => {
144                    validity.append(true);
145                    values.push(p);
146                }
147            }
148        }
149        prev = e;
150        end += 1;
151    }
152    ends.push(end);
153
154    match prev {
155        None => {
156            validity.append(false);
157            values.push(T::default());
158        }
159        Some(p) => {
160            validity.append(true);
161            values.push(p);
162        }
163    }
164
165    (
166        ends.freeze(),
167        PrimitiveArray::new(values, Validity::from(validity.freeze())),
168    )
169}
170
171pub fn runend_decode_primitive(
172    ends: PrimitiveArray,
173    values: PrimitiveArray,
174    offset: usize,
175    length: usize,
176) -> VortexResult<PrimitiveArray> {
177    let validity_mask = values.validity_mask()?;
178    Ok(match_each_native_ptype!(values.ptype(), |P| {
179        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
180            runend_decode_typed_primitive(
181                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
182                values.as_slice::<P>(),
183                validity_mask,
184                values.dtype().nullability(),
185                length,
186            )
187        })
188    }))
189}
190
191pub fn runend_decode_bools(
192    ends: PrimitiveArray,
193    values: BoolArray,
194    offset: usize,
195    length: usize,
196) -> VortexResult<BoolArray> {
197    let validity_mask = values.validity_mask()?;
198    Ok(match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
199        runend_decode_typed_bool(
200            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
201            &values.to_bit_buffer(),
202            validity_mask,
203            values.dtype().nullability(),
204            length,
205        )
206    }))
207}
208
209/// Decode a run-end encoded slice of values into a flat `Buffer<T>` and `Validity`.
210///
211/// This is the core decode loop shared by primitive and varbinview run-end decoding.
212fn runend_decode_slice<T: Copy + Default>(
213    run_ends: impl Iterator<Item = usize>,
214    values: &[T],
215    values_validity: Mask,
216    values_nullability: Nullability,
217    length: usize,
218) -> (Buffer<T>, Validity) {
219    match values_validity {
220        Mask::AllTrue(_) => {
221            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
222            for (end, value) in run_ends.zip_eq(values) {
223                assert!(
224                    end >= decoded.len(),
225                    "Runend ends must be monotonic, got {end} after {}",
226                    decoded.len()
227                );
228                assert!(end <= length, "Runend end must be less than overall length");
229                // SAFETY:
230                // We preallocate enough capacity because we know the total length
231                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
232            }
233            (decoded.into(), values_nullability.into())
234        }
235        Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
236        Mask::Values(mask) => {
237            let mut decoded = BufferMut::with_capacity(length);
238            let mut decoded_validity = BitBufferMut::with_capacity(length);
239            for (end, value) in run_ends.zip_eq(
240                values
241                    .iter()
242                    .zip(mask.bit_buffer().iter())
243                    .map(|(&v, is_valid)| is_valid.then_some(v)),
244            ) {
245                assert!(
246                    end >= decoded.len(),
247                    "Runend ends must be monotonic, got {end} after {}",
248                    decoded.len()
249                );
250                assert!(end <= length, "Runend end must be less than overall length");
251                match value {
252                    None => {
253                        decoded_validity.append_n(false, end - decoded.len());
254                        // SAFETY:
255                        // We preallocate enough capacity because we know the total length
256                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
257                    }
258                    Some(value) => {
259                        decoded_validity.append_n(true, end - decoded.len());
260                        // SAFETY:
261                        // We preallocate enough capacity because we know the total length
262                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
263                    }
264                }
265            }
266            (decoded.into(), Validity::from(decoded_validity.freeze()))
267        }
268    }
269}
270
271pub fn runend_decode_typed_primitive<T: NativePType>(
272    run_ends: impl Iterator<Item = usize>,
273    values: &[T],
274    values_validity: Mask,
275    values_nullability: Nullability,
276    length: usize,
277) -> PrimitiveArray {
278    let (decoded, validity) = runend_decode_slice(
279        run_ends,
280        values,
281        values_validity,
282        values_nullability,
283        length,
284    );
285    PrimitiveArray::new(decoded, validity)
286}
287
288pub fn runend_decode_typed_bool(
289    run_ends: impl Iterator<Item = usize>,
290    values: &BitBuffer,
291    values_validity: Mask,
292    values_nullability: Nullability,
293    length: usize,
294) -> BoolArray {
295    match values_validity {
296        Mask::AllTrue(_) => {
297            let mut decoded = BitBufferMut::with_capacity(length);
298            for (end, value) in run_ends.zip_eq(values.iter()) {
299                decoded.append_n(value, end - decoded.len());
300            }
301            BoolArray::new(decoded.freeze(), values_nullability.into())
302        }
303        Mask::AllFalse(_) => BoolArray::new(BitBuffer::new_unset(length), Validity::AllInvalid),
304        Mask::Values(mask) => {
305            let mut decoded = BitBufferMut::with_capacity(length);
306            let mut decoded_validity = BitBufferMut::with_capacity(length);
307            for (end, value) in run_ends.zip_eq(
308                values
309                    .iter()
310                    .zip(mask.bit_buffer().iter())
311                    .map(|(v, is_valid)| is_valid.then_some(v)),
312            ) {
313                match value {
314                    None => {
315                        decoded_validity.append_n(false, end - decoded.len());
316                        decoded.append_n(false, end - decoded.len());
317                    }
318                    Some(value) => {
319                        decoded_validity.append_n(true, end - decoded.len());
320                        decoded.append_n(value, end - decoded.len());
321                    }
322                }
323            }
324            BoolArray::new(decoded.freeze(), Validity::from(decoded_validity.freeze()))
325        }
326    }
327}
328
329/// Decode a run-end encoded VarBinView array by expanding views directly.
330pub fn runend_decode_varbinview(
331    ends: PrimitiveArray,
332    values: VarBinViewArray,
333    offset: usize,
334    length: usize,
335) -> VortexResult<VarBinViewArray> {
336    let validity_mask = values.validity_mask()?;
337    let views = values.views();
338
339    let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
340        runend_decode_slice(
341            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
342            views,
343            validity_mask,
344            values.dtype().nullability(),
345            length,
346        )
347    });
348
349    let parts = values.into_parts();
350    let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
351
352    // SAFETY: we are expanding views from a valid VarBinViewArray with the same
353    // buffers, so all buffer indices and offsets remain valid.
354    Ok(unsafe {
355        VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
356    })
357}
358
359#[cfg(test)]
360mod test {
361    use vortex_array::ToCanonical;
362    use vortex_array::arrays::PrimitiveArray;
363    use vortex_array::assert_arrays_eq;
364    use vortex_array::validity::Validity;
365    use vortex_buffer::BitBuffer;
366    use vortex_buffer::buffer;
367    use vortex_error::VortexResult;
368
369    use crate::compress::runend_decode_primitive;
370    use crate::compress::runend_encode;
371
372    #[test]
373    fn encode() {
374        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
375        let (ends, values) = runend_encode(&arr);
376        let values = values.to_primitive();
377
378        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
379        assert_arrays_eq!(ends, expected_ends);
380        let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
381        assert_arrays_eq!(values, expected_values);
382    }
383
384    #[test]
385    fn encode_nullable() {
386        let arr = PrimitiveArray::new(
387            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
388            Validity::from(BitBuffer::from(vec![
389                true, true, false, false, true, true, true, true, false, false,
390            ])),
391        );
392        let (ends, values) = runend_encode(&arr);
393        let values = values.to_primitive();
394
395        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
396        assert_arrays_eq!(ends, expected_ends);
397        let expected_values =
398            PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
399        assert_arrays_eq!(values, expected_values);
400    }
401
402    #[test]
403    fn encode_all_null() {
404        let arr = PrimitiveArray::new(
405            buffer![0, 0, 0, 0, 0],
406            Validity::from(BitBuffer::new_unset(5)),
407        );
408        let (ends, values) = runend_encode(&arr);
409        let values = values.to_primitive();
410
411        let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
412        assert_arrays_eq!(ends, expected_ends);
413        let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
414        assert_arrays_eq!(values, expected_values);
415    }
416
417    #[test]
418    fn decode() -> VortexResult<()> {
419        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
420        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
421        let decoded = runend_decode_primitive(ends, values, 0, 10)?;
422
423        let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
424        assert_arrays_eq!(decoded, expected);
425        Ok(())
426    }
427}