Skip to main content

vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::IntoArray;
8use vortex_array::LEGACY_SESSION;
9use vortex_array::ToCanonical;
10use vortex_array::VortexSessionExecute;
11use vortex_array::arrays::BoolArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Primitive;
14use vortex_array::arrays::PrimitiveArray;
15use vortex_array::arrays::VarBinViewArray;
16use vortex_array::arrays::bool::BoolArrayExt;
17use vortex_array::arrays::primitive::PrimitiveArrayExt;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::dtype::NativePType;
20use vortex_array::dtype::Nullability;
21use vortex_array::expr::stats::Precision;
22use vortex_array::expr::stats::Stat;
23use vortex_array::match_each_native_ptype;
24use vortex_array::match_each_unsigned_integer_ptype;
25use vortex_array::scalar::Scalar;
26use vortex_array::validity::Validity;
27use vortex_buffer::BitBuffer;
28use vortex_buffer::BitBufferMut;
29use vortex_buffer::Buffer;
30use vortex_buffer::BufferMut;
31use vortex_buffer::buffer;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_mask::Mask;
35
36use crate::iter::trimmed_ends_iter;
37
38/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
39pub fn runend_encode(array: ArrayView<Primitive>) -> (PrimitiveArray, ArrayRef) {
40    let validity = match array
41        .validity()
42        .vortex_expect("run-end validity should be derivable")
43    {
44        Validity::NonNullable => None,
45        Validity::AllValid => None,
46        Validity::AllInvalid => {
47            // We can trivially return an all-null REE array
48            let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
49            ends.statistics()
50                .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
51            return (
52                ends,
53                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
54            );
55        }
56        Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
57    };
58
59    let (ends, values) = match validity {
60        None => {
61            match_each_native_ptype!(array.ptype(), |P| {
62                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
63                (
64                    PrimitiveArray::new(ends, Validity::NonNullable),
65                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
66                )
67            })
68        }
69        Some(validity) => {
70            match_each_native_ptype!(array.ptype(), |P| {
71                let (ends, values) =
72                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
73                (
74                    PrimitiveArray::new(ends, Validity::NonNullable),
75                    values.into_array(),
76                )
77            })
78        }
79    };
80
81    let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
82
83    ends.statistics()
84        .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
85
86    (ends, values)
87}
88
89fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
90    let mut ends = BufferMut::empty();
91    let mut values = BufferMut::empty();
92
93    if elements.is_empty() {
94        return (ends.freeze(), values.freeze());
95    }
96
97    // Run-end encode the values
98    let mut prev = elements[0];
99    let mut end = 1;
100    for &e in elements.iter().skip(1) {
101        if e != prev {
102            ends.push(end);
103            values.push(prev);
104        }
105        prev = e;
106        end += 1;
107    }
108    ends.push(end);
109    values.push(prev);
110
111    (ends.freeze(), values.freeze())
112}
113
114fn runend_encode_nullable_primitive<T: NativePType>(
115    elements: &[T],
116    element_validity: BitBuffer,
117) -> (Buffer<u64>, PrimitiveArray) {
118    let mut ends = BufferMut::empty();
119    let mut values = BufferMut::empty();
120    let mut validity = BitBufferMut::with_capacity(values.capacity());
121
122    if elements.is_empty() {
123        return (
124            ends.freeze(),
125            PrimitiveArray::new(
126                values,
127                Validity::Array(BoolArray::from(validity.freeze()).into_array()),
128            ),
129        );
130    }
131
132    // Run-end encode the values
133    let mut prev = element_validity.value(0).then(|| elements[0]);
134    let mut end = 1;
135    for e in elements
136        .iter()
137        .zip(element_validity.iter())
138        .map(|(&e, is_valid)| is_valid.then_some(e))
139        .skip(1)
140    {
141        if e != prev {
142            ends.push(end);
143            match prev {
144                None => {
145                    validity.append(false);
146                    values.push(T::default());
147                }
148                Some(p) => {
149                    validity.append(true);
150                    values.push(p);
151                }
152            }
153        }
154        prev = e;
155        end += 1;
156    }
157    ends.push(end);
158
159    match prev {
160        None => {
161            validity.append(false);
162            values.push(T::default());
163        }
164        Some(p) => {
165            validity.append(true);
166            values.push(p);
167        }
168    }
169
170    (
171        ends.freeze(),
172        PrimitiveArray::new(values, Validity::from(validity.freeze())),
173    )
174}
175
176pub fn runend_decode_primitive(
177    ends: PrimitiveArray,
178    values: PrimitiveArray,
179    offset: usize,
180    length: usize,
181) -> VortexResult<PrimitiveArray> {
182    let validity_mask = values.as_ref().validity()?.to_mask(
183        values.as_ref().len(),
184        &mut LEGACY_SESSION.create_execution_ctx(),
185    )?;
186    Ok(match_each_native_ptype!(values.ptype(), |P| {
187        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
188            runend_decode_typed_primitive(
189                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
190                values.as_slice::<P>(),
191                validity_mask,
192                values.dtype().nullability(),
193                length,
194            )
195        })
196    }))
197}
198
199/// Decode a run-end encoded slice of values into a flat `Buffer<T>` and `Validity`.
200///
201/// This is the core decode loop shared by primitive and varbinview run-end decoding.
202fn runend_decode_slice<T: Copy + Default>(
203    run_ends: impl Iterator<Item = usize>,
204    values: &[T],
205    values_validity: Mask,
206    values_nullability: Nullability,
207    length: usize,
208) -> (Buffer<T>, Validity) {
209    match values_validity {
210        Mask::AllTrue(_) => {
211            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
212            for (end, value) in run_ends.zip_eq(values) {
213                assert!(
214                    end >= decoded.len(),
215                    "Runend ends must be monotonic, got {end} after {}",
216                    decoded.len()
217                );
218                assert!(end <= length, "Runend end must be less than overall length");
219                // SAFETY:
220                // We preallocate enough capacity because we know the total length
221                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
222            }
223            (decoded.into(), values_nullability.into())
224        }
225        Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
226        Mask::Values(mask) => {
227            let mut decoded = BufferMut::with_capacity(length);
228            let mut decoded_validity = BitBufferMut::with_capacity(length);
229            for (end, value) in run_ends.zip_eq(
230                values
231                    .iter()
232                    .zip(mask.bit_buffer().iter())
233                    .map(|(&v, is_valid)| is_valid.then_some(v)),
234            ) {
235                assert!(
236                    end >= decoded.len(),
237                    "Runend ends must be monotonic, got {end} after {}",
238                    decoded.len()
239                );
240                assert!(end <= length, "Runend end must be less than overall length");
241                match value {
242                    None => {
243                        decoded_validity.append_n(false, end - decoded.len());
244                        // SAFETY:
245                        // We preallocate enough capacity because we know the total length
246                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
247                    }
248                    Some(value) => {
249                        decoded_validity.append_n(true, end - decoded.len());
250                        // SAFETY:
251                        // We preallocate enough capacity because we know the total length
252                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
253                    }
254                }
255            }
256            (decoded.into(), Validity::from(decoded_validity.freeze()))
257        }
258    }
259}
260
261pub fn runend_decode_typed_primitive<T: NativePType>(
262    run_ends: impl Iterator<Item = usize>,
263    values: &[T],
264    values_validity: Mask,
265    values_nullability: Nullability,
266    length: usize,
267) -> PrimitiveArray {
268    let (decoded, validity) = runend_decode_slice(
269        run_ends,
270        values,
271        values_validity,
272        values_nullability,
273        length,
274    );
275    PrimitiveArray::new(decoded, validity)
276}
277
278/// Decode a run-end encoded VarBinView array by expanding views directly.
279pub fn runend_decode_varbinview(
280    ends: PrimitiveArray,
281    values: VarBinViewArray,
282    offset: usize,
283    length: usize,
284) -> VortexResult<VarBinViewArray> {
285    let validity_mask = values.as_ref().validity()?.to_mask(
286        values.as_ref().len(),
287        &mut LEGACY_SESSION.create_execution_ctx(),
288    )?;
289    let views = values.views();
290
291    let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
292        runend_decode_slice(
293            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
294            views,
295            validity_mask,
296            values.dtype().nullability(),
297            length,
298        )
299    });
300
301    let parts = values.into_data_parts();
302    let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
303
304    // SAFETY: we are expanding views from a valid VarBinViewArray with the same
305    // buffers, so all buffer indices and offsets remain valid.
306    Ok(unsafe {
307        VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
308    })
309}
310
311#[cfg(test)]
312mod test {
313    use vortex_array::ToCanonical;
314    use vortex_array::arrays::PrimitiveArray;
315    use vortex_array::assert_arrays_eq;
316    use vortex_array::validity::Validity;
317    use vortex_buffer::BitBuffer;
318    use vortex_buffer::buffer;
319    use vortex_error::VortexResult;
320
321    use crate::compress::runend_decode_primitive;
322    use crate::compress::runend_encode;
323
324    #[test]
325    fn encode() {
326        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
327        let (ends, values) = runend_encode(arr.as_view());
328        let values = values.to_primitive();
329
330        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
331        assert_arrays_eq!(ends, expected_ends);
332        let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
333        assert_arrays_eq!(values, expected_values);
334    }
335
336    #[test]
337    fn encode_nullable() {
338        let arr = PrimitiveArray::new(
339            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
340            Validity::from(BitBuffer::from(vec![
341                true, true, false, false, true, true, true, true, false, false,
342            ])),
343        );
344        let (ends, values) = runend_encode(arr.as_view());
345        let values = values.to_primitive();
346
347        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
348        assert_arrays_eq!(ends, expected_ends);
349        let expected_values =
350            PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
351        assert_arrays_eq!(values, expected_values);
352    }
353
354    #[test]
355    fn encode_all_null() {
356        let arr = PrimitiveArray::new(
357            buffer![0, 0, 0, 0, 0],
358            Validity::from(BitBuffer::new_unset(5)),
359        );
360        let (ends, values) = runend_encode(arr.as_view());
361        let values = values.to_primitive();
362
363        let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
364        assert_arrays_eq!(ends, expected_ends);
365        let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
366        assert_arrays_eq!(values, expected_values);
367    }
368
369    #[test]
370    fn decode() -> VortexResult<()> {
371        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
372        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
373        let decoded = runend_decode_primitive(ends, values, 0, 10)?;
374
375        let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
376        assert_arrays_eq!(decoded, expected);
377        Ok(())
378    }
379}