Skip to main content

vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::BoolArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Primitive;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::VarBinViewArray;
14use vortex_array::arrays::bool::BoolArrayExt;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::dtype::NativePType;
18use vortex_array::dtype::Nullability;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::match_each_native_ptype;
22use vortex_array::match_each_unsigned_integer_ptype;
23use vortex_array::scalar::Scalar;
24use vortex_array::validity::Validity;
25use vortex_buffer::BitBuffer;
26use vortex_buffer::BitBufferMut;
27use vortex_buffer::Buffer;
28use vortex_buffer::BufferMut;
29use vortex_buffer::buffer;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_mask::Mask;
33
34use crate::iter::trimmed_ends_iter;
35
36/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
37pub fn runend_encode(array: ArrayView<Primitive>) -> (PrimitiveArray, ArrayRef) {
38    let validity = match array
39        .validity()
40        .vortex_expect("run-end validity should be derivable")
41    {
42        Validity::NonNullable => None,
43        Validity::AllValid => None,
44        Validity::AllInvalid => {
45            // We can trivially return an all-null REE array
46            let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
47            ends.statistics()
48                .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
49            return (
50                ends,
51                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
52            );
53        }
54        Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
55    };
56
57    let (ends, values) = match validity {
58        None => {
59            match_each_native_ptype!(array.ptype(), |P| {
60                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
61                (
62                    PrimitiveArray::new(ends, Validity::NonNullable),
63                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
64                )
65            })
66        }
67        Some(validity) => {
68            match_each_native_ptype!(array.ptype(), |P| {
69                let (ends, values) =
70                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
71                (
72                    PrimitiveArray::new(ends, Validity::NonNullable),
73                    values.into_array(),
74                )
75            })
76        }
77    };
78
79    let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
80
81    ends.statistics()
82        .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
83
84    (ends, values)
85}
86
87fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
88    let mut ends = BufferMut::empty();
89    let mut values = BufferMut::empty();
90
91    if elements.is_empty() {
92        return (ends.freeze(), values.freeze());
93    }
94
95    // Run-end encode the values
96    let mut prev = elements[0];
97    let mut end = 1;
98    for &e in elements.iter().skip(1) {
99        if e != prev {
100            ends.push(end);
101            values.push(prev);
102        }
103        prev = e;
104        end += 1;
105    }
106    ends.push(end);
107    values.push(prev);
108
109    (ends.freeze(), values.freeze())
110}
111
112fn runend_encode_nullable_primitive<T: NativePType>(
113    elements: &[T],
114    element_validity: BitBuffer,
115) -> (Buffer<u64>, PrimitiveArray) {
116    let mut ends = BufferMut::empty();
117    let mut values = BufferMut::empty();
118    let mut validity = BitBufferMut::with_capacity(values.capacity());
119
120    if elements.is_empty() {
121        return (
122            ends.freeze(),
123            PrimitiveArray::new(
124                values,
125                Validity::Array(BoolArray::from(validity.freeze()).into_array()),
126            ),
127        );
128    }
129
130    // Run-end encode the values
131    let mut prev = element_validity.value(0).then(|| elements[0]);
132    let mut end = 1;
133    for e in elements
134        .iter()
135        .zip(element_validity.iter())
136        .map(|(&e, is_valid)| is_valid.then_some(e))
137        .skip(1)
138    {
139        if e != prev {
140            ends.push(end);
141            match prev {
142                None => {
143                    validity.append(false);
144                    values.push(T::default());
145                }
146                Some(p) => {
147                    validity.append(true);
148                    values.push(p);
149                }
150            }
151        }
152        prev = e;
153        end += 1;
154    }
155    ends.push(end);
156
157    match prev {
158        None => {
159            validity.append(false);
160            values.push(T::default());
161        }
162        Some(p) => {
163            validity.append(true);
164            values.push(p);
165        }
166    }
167
168    (
169        ends.freeze(),
170        PrimitiveArray::new(values, Validity::from(validity.freeze())),
171    )
172}
173
174pub fn runend_decode_primitive(
175    ends: PrimitiveArray,
176    values: PrimitiveArray,
177    offset: usize,
178    length: usize,
179) -> VortexResult<PrimitiveArray> {
180    let validity_mask = values.validity_mask()?;
181    Ok(match_each_native_ptype!(values.ptype(), |P| {
182        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
183            runend_decode_typed_primitive(
184                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
185                values.as_slice::<P>(),
186                validity_mask,
187                values.dtype().nullability(),
188                length,
189            )
190        })
191    }))
192}
193
194/// Decode a run-end encoded slice of values into a flat `Buffer<T>` and `Validity`.
195///
196/// This is the core decode loop shared by primitive and varbinview run-end decoding.
197fn runend_decode_slice<T: Copy + Default>(
198    run_ends: impl Iterator<Item = usize>,
199    values: &[T],
200    values_validity: Mask,
201    values_nullability: Nullability,
202    length: usize,
203) -> (Buffer<T>, Validity) {
204    match values_validity {
205        Mask::AllTrue(_) => {
206            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
207            for (end, value) in run_ends.zip_eq(values) {
208                assert!(
209                    end >= decoded.len(),
210                    "Runend ends must be monotonic, got {end} after {}",
211                    decoded.len()
212                );
213                assert!(end <= length, "Runend end must be less than overall length");
214                // SAFETY:
215                // We preallocate enough capacity because we know the total length
216                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
217            }
218            (decoded.into(), values_nullability.into())
219        }
220        Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
221        Mask::Values(mask) => {
222            let mut decoded = BufferMut::with_capacity(length);
223            let mut decoded_validity = BitBufferMut::with_capacity(length);
224            for (end, value) in run_ends.zip_eq(
225                values
226                    .iter()
227                    .zip(mask.bit_buffer().iter())
228                    .map(|(&v, is_valid)| is_valid.then_some(v)),
229            ) {
230                assert!(
231                    end >= decoded.len(),
232                    "Runend ends must be monotonic, got {end} after {}",
233                    decoded.len()
234                );
235                assert!(end <= length, "Runend end must be less than overall length");
236                match value {
237                    None => {
238                        decoded_validity.append_n(false, end - decoded.len());
239                        // SAFETY:
240                        // We preallocate enough capacity because we know the total length
241                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
242                    }
243                    Some(value) => {
244                        decoded_validity.append_n(true, end - decoded.len());
245                        // SAFETY:
246                        // We preallocate enough capacity because we know the total length
247                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
248                    }
249                }
250            }
251            (decoded.into(), Validity::from(decoded_validity.freeze()))
252        }
253    }
254}
255
256pub fn runend_decode_typed_primitive<T: NativePType>(
257    run_ends: impl Iterator<Item = usize>,
258    values: &[T],
259    values_validity: Mask,
260    values_nullability: Nullability,
261    length: usize,
262) -> PrimitiveArray {
263    let (decoded, validity) = runend_decode_slice(
264        run_ends,
265        values,
266        values_validity,
267        values_nullability,
268        length,
269    );
270    PrimitiveArray::new(decoded, validity)
271}
272
273/// Decode a run-end encoded VarBinView array by expanding views directly.
274pub fn runend_decode_varbinview(
275    ends: PrimitiveArray,
276    values: VarBinViewArray,
277    offset: usize,
278    length: usize,
279) -> VortexResult<VarBinViewArray> {
280    let validity_mask = values.validity_mask()?;
281    let views = values.views();
282
283    let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
284        runend_decode_slice(
285            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
286            views,
287            validity_mask,
288            values.dtype().nullability(),
289            length,
290        )
291    });
292
293    let parts = values.into_data_parts();
294    let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
295
296    // SAFETY: we are expanding views from a valid VarBinViewArray with the same
297    // buffers, so all buffer indices and offsets remain valid.
298    Ok(unsafe {
299        VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
300    })
301}
302
303#[cfg(test)]
304mod test {
305    use vortex_array::ToCanonical;
306    use vortex_array::arrays::PrimitiveArray;
307    use vortex_array::assert_arrays_eq;
308    use vortex_array::validity::Validity;
309    use vortex_buffer::BitBuffer;
310    use vortex_buffer::buffer;
311    use vortex_error::VortexResult;
312
313    use crate::compress::runend_decode_primitive;
314    use crate::compress::runend_encode;
315
316    #[test]
317    fn encode() {
318        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
319        let (ends, values) = runend_encode(arr.as_view());
320        let values = values.to_primitive();
321
322        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
323        assert_arrays_eq!(ends, expected_ends);
324        let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
325        assert_arrays_eq!(values, expected_values);
326    }
327
328    #[test]
329    fn encode_nullable() {
330        let arr = PrimitiveArray::new(
331            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
332            Validity::from(BitBuffer::from(vec![
333                true, true, false, false, true, true, true, true, false, false,
334            ])),
335        );
336        let (ends, values) = runend_encode(arr.as_view());
337        let values = values.to_primitive();
338
339        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
340        assert_arrays_eq!(ends, expected_ends);
341        let expected_values =
342            PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
343        assert_arrays_eq!(values, expected_values);
344    }
345
346    #[test]
347    fn encode_all_null() {
348        let arr = PrimitiveArray::new(
349            buffer![0, 0, 0, 0, 0],
350            Validity::from(BitBuffer::new_unset(5)),
351        );
352        let (ends, values) = runend_encode(arr.as_view());
353        let values = values.to_primitive();
354
355        let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
356        assert_arrays_eq!(ends, expected_ends);
357        let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
358        assert_arrays_eq!(values, expected_values);
359    }
360
361    #[test]
362    fn decode() -> VortexResult<()> {
363        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
364        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
365        let decoded = runend_decode_primitive(ends, values, 0, 10)?;
366
367        let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
368        assert_arrays_eq!(decoded, expected);
369        Ok(())
370    }
371}