Skip to main content

vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::ExecutionCtx;
8use vortex_array::IntoArray;
9use vortex_array::arrays::BoolArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Primitive;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::VarBinViewArray;
14use vortex_array::arrays::bool::BoolArrayExt;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::dtype::NativePType;
18use vortex_array::dtype::Nullability;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::match_each_native_ptype;
22use vortex_array::match_each_unsigned_integer_ptype;
23use vortex_array::scalar::Scalar;
24use vortex_array::validity::Validity;
25use vortex_buffer::BitBuffer;
26use vortex_buffer::BitBufferMut;
27use vortex_buffer::Buffer;
28use vortex_buffer::BufferMut;
29use vortex_buffer::buffer;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_mask::Mask;
33
34use crate::iter::trimmed_ends_iter;
35
36/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
37pub fn runend_encode(
38    array: ArrayView<Primitive>,
39    ctx: &mut ExecutionCtx,
40) -> (PrimitiveArray, ArrayRef) {
41    let validity = match array
42        .validity()
43        .vortex_expect("run-end validity should be derivable")
44    {
45        Validity::NonNullable => None,
46        Validity::AllValid => None,
47        Validity::AllInvalid => {
48            // We can trivially return an all-null REE array
49            let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
50            ends.statistics()
51                .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
52            return (
53                ends,
54                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
55            );
56        }
57        Validity::Array(a) => {
58            let bool_array = a
59                .execute::<BoolArray>(ctx)
60                .vortex_expect("validity array must be convertible to bool");
61            Some(bool_array.to_bit_buffer())
62        }
63    };
64
65    let (ends, values) = match validity {
66        None => {
67            match_each_native_ptype!(array.ptype(), |P| {
68                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
69                (
70                    PrimitiveArray::new(ends, Validity::NonNullable),
71                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
72                )
73            })
74        }
75        Some(validity) => {
76            match_each_native_ptype!(array.ptype(), |P| {
77                let (ends, values) =
78                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
79                (
80                    PrimitiveArray::new(ends, Validity::NonNullable),
81                    values.into_array(),
82                )
83            })
84        }
85    };
86
87    let ends = ends
88        .narrow(ctx)
89        .vortex_expect("Ends must succeed downcasting");
90
91    ends.statistics()
92        .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
93
94    (ends, values)
95}
96
97fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
98    let mut ends = BufferMut::empty();
99    let mut values = BufferMut::empty();
100
101    if elements.is_empty() {
102        return (ends.freeze(), values.freeze());
103    }
104
105    // Run-end encode the values
106    let mut prev = elements[0];
107    let mut end = 1;
108    for &e in elements.iter().skip(1) {
109        if e != prev {
110            ends.push(end);
111            values.push(prev);
112        }
113        prev = e;
114        end += 1;
115    }
116    ends.push(end);
117    values.push(prev);
118
119    (ends.freeze(), values.freeze())
120}
121
122fn runend_encode_nullable_primitive<T: NativePType>(
123    elements: &[T],
124    element_validity: BitBuffer,
125) -> (Buffer<u64>, PrimitiveArray) {
126    let mut ends = BufferMut::empty();
127    let mut values = BufferMut::empty();
128    let mut validity = BitBufferMut::with_capacity(values.capacity());
129
130    if elements.is_empty() {
131        return (
132            ends.freeze(),
133            PrimitiveArray::new(
134                values,
135                Validity::Array(BoolArray::from(validity.freeze()).into_array()),
136            ),
137        );
138    }
139
140    // Run-end encode the values
141    let mut prev = element_validity.value(0).then(|| elements[0]);
142    let mut end = 1;
143    for e in elements
144        .iter()
145        .zip(element_validity.iter())
146        .map(|(&e, is_valid)| is_valid.then_some(e))
147        .skip(1)
148    {
149        if e != prev {
150            ends.push(end);
151            match prev {
152                None => {
153                    validity.append(false);
154                    values.push(T::default());
155                }
156                Some(p) => {
157                    validity.append(true);
158                    values.push(p);
159                }
160            }
161        }
162        prev = e;
163        end += 1;
164    }
165    ends.push(end);
166
167    match prev {
168        None => {
169            validity.append(false);
170            values.push(T::default());
171        }
172        Some(p) => {
173            validity.append(true);
174            values.push(p);
175        }
176    }
177
178    (
179        ends.freeze(),
180        PrimitiveArray::new(values, Validity::from(validity.freeze())),
181    )
182}
183
184pub fn runend_decode_primitive(
185    ends: PrimitiveArray,
186    values: PrimitiveArray,
187    offset: usize,
188    length: usize,
189    ctx: &mut ExecutionCtx,
190) -> VortexResult<PrimitiveArray> {
191    let validity_mask = values
192        .as_ref()
193        .validity()?
194        .execute_mask(values.as_ref().len(), ctx)?;
195    Ok(match_each_native_ptype!(values.ptype(), |P| {
196        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
197            runend_decode_typed_primitive(
198                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
199                values.as_slice::<P>(),
200                validity_mask,
201                values.dtype().nullability(),
202                length,
203            )
204        })
205    }))
206}
207
208/// Decode a run-end encoded slice of values into a flat `Buffer<T>` and `Validity`.
209///
210/// This is the core decode loop shared by primitive and varbinview run-end decoding.
211fn runend_decode_slice<T: Copy + Default>(
212    run_ends: impl Iterator<Item = usize>,
213    values: &[T],
214    values_validity: Mask,
215    values_nullability: Nullability,
216    length: usize,
217) -> (Buffer<T>, Validity) {
218    match values_validity {
219        Mask::AllTrue(_) => {
220            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
221            for (end, value) in run_ends.zip_eq(values) {
222                assert!(
223                    end >= decoded.len(),
224                    "Runend ends must be monotonic, got {end} after {}",
225                    decoded.len()
226                );
227                assert!(end <= length, "Runend end must be less than overall length");
228                // SAFETY:
229                // We preallocate enough capacity because we know the total length
230                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
231            }
232            (decoded.into(), values_nullability.into())
233        }
234        Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
235        Mask::Values(mask) => {
236            let mut decoded = BufferMut::with_capacity(length);
237            let mut decoded_validity = BitBufferMut::with_capacity(length);
238            for (end, value) in run_ends.zip_eq(
239                values
240                    .iter()
241                    .zip(mask.bit_buffer().iter())
242                    .map(|(&v, is_valid)| is_valid.then_some(v)),
243            ) {
244                assert!(
245                    end >= decoded.len(),
246                    "Runend ends must be monotonic, got {end} after {}",
247                    decoded.len()
248                );
249                assert!(end <= length, "Runend end must be less than overall length");
250                match value {
251                    None => {
252                        decoded_validity.append_n(false, end - decoded.len());
253                        // SAFETY:
254                        // We preallocate enough capacity because we know the total length
255                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
256                    }
257                    Some(value) => {
258                        decoded_validity.append_n(true, end - decoded.len());
259                        // SAFETY:
260                        // We preallocate enough capacity because we know the total length
261                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
262                    }
263                }
264            }
265            (decoded.into(), Validity::from(decoded_validity.freeze()))
266        }
267    }
268}
269
270pub fn runend_decode_typed_primitive<T: NativePType>(
271    run_ends: impl Iterator<Item = usize>,
272    values: &[T],
273    values_validity: Mask,
274    values_nullability: Nullability,
275    length: usize,
276) -> PrimitiveArray {
277    let (decoded, validity) = runend_decode_slice(
278        run_ends,
279        values,
280        values_validity,
281        values_nullability,
282        length,
283    );
284    PrimitiveArray::new(decoded, validity)
285}
286
287/// Decode a run-end encoded VarBinView array by expanding views directly.
288pub fn runend_decode_varbinview(
289    ends: PrimitiveArray,
290    values: VarBinViewArray,
291    offset: usize,
292    length: usize,
293    ctx: &mut ExecutionCtx,
294) -> VortexResult<VarBinViewArray> {
295    let validity_mask = values
296        .as_ref()
297        .validity()?
298        .execute_mask(values.as_ref().len(), ctx)?;
299    let views = values.views();
300
301    let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
302        runend_decode_slice(
303            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
304            views,
305            validity_mask,
306            values.dtype().nullability(),
307            length,
308        )
309    });
310
311    let parts = values.into_data_parts();
312    let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
313
314    // SAFETY: we are expanding views from a valid VarBinViewArray with the same
315    // buffers, so all buffer indices and offsets remain valid.
316    Ok(unsafe {
317        VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
318    })
319}
320
321#[cfg(test)]
322mod tests {
323    use vortex_array::LEGACY_SESSION;
324    use vortex_array::VortexSessionExecute;
325    use vortex_array::arrays::PrimitiveArray;
326    use vortex_array::assert_arrays_eq;
327    use vortex_array::validity::Validity;
328    use vortex_buffer::BitBuffer;
329    use vortex_buffer::buffer;
330    use vortex_error::VortexResult;
331
332    use crate::compress::runend_decode_primitive;
333    use crate::compress::runend_encode;
334
335    #[test]
336    fn encode() -> VortexResult<()> {
337        let mut ctx = LEGACY_SESSION.create_execution_ctx();
338        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
339        let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
340        let values = values.execute::<PrimitiveArray>(&mut ctx)?;
341
342        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
343        assert_arrays_eq!(ends, expected_ends);
344        let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
345        assert_arrays_eq!(values, expected_values);
346        Ok(())
347    }
348
349    #[test]
350    fn encode_nullable() -> VortexResult<()> {
351        let mut ctx = LEGACY_SESSION.create_execution_ctx();
352        let arr = PrimitiveArray::new(
353            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
354            Validity::from(BitBuffer::from(vec![
355                true, true, false, false, true, true, true, true, false, false,
356            ])),
357        );
358        let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
359        let values = values.execute::<PrimitiveArray>(&mut ctx)?;
360
361        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
362        assert_arrays_eq!(ends, expected_ends);
363        let expected_values =
364            PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
365        assert_arrays_eq!(values, expected_values);
366        Ok(())
367    }
368
369    #[test]
370    fn encode_all_null() -> VortexResult<()> {
371        let mut ctx = LEGACY_SESSION.create_execution_ctx();
372        let arr = PrimitiveArray::new(
373            buffer![0, 0, 0, 0, 0],
374            Validity::from(BitBuffer::new_unset(5)),
375        );
376        let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
377        let values = values.execute::<PrimitiveArray>(&mut ctx)?;
378
379        let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
380        assert_arrays_eq!(ends, expected_ends);
381        let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
382        assert_arrays_eq!(values, expected_values);
383        Ok(())
384    }
385
386    #[test]
387    fn decode() -> VortexResult<()> {
388        let mut ctx = LEGACY_SESSION.create_execution_ctx();
389        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
390        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
391        let decoded = runend_decode_primitive(ends, values, 0, 10, &mut ctx)?;
392
393        let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
394        assert_arrays_eq!(decoded, expected);
395        Ok(())
396    }
397}