vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_buffer::BooleanBufferBuilder;
5use itertools::Itertools;
6use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
7use vortex_array::validity::Validity;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, buffer};
11use vortex_dtype::{
12    NativePType, Nullability, match_each_native_ptype, match_each_unsigned_integer_ptype,
13};
14use vortex_error::VortexExpect;
15use vortex_mask::Mask;
16use vortex_scalar::Scalar;
17
18use crate::iter::trimmed_ends_iter;
19
20/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
21pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
22    let validity = match array.validity() {
23        Validity::NonNullable => None,
24        Validity::AllValid => None,
25        Validity::AllInvalid => {
26            // We can trivially return an all-null REE array
27            return (
28                PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
29                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
30            );
31        }
32        Validity::Array(a) => Some(a.to_bool().boolean_buffer().clone()),
33    };
34
35    let (ends, values) = match validity {
36        None => {
37            match_each_native_ptype!(array.ptype(), |P| {
38                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
39                (
40                    PrimitiveArray::new(ends, Validity::NonNullable),
41                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
42                )
43            })
44        }
45        Some(validity) => {
46            match_each_native_ptype!(array.ptype(), |P| {
47                let (ends, values) =
48                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
49                (
50                    PrimitiveArray::new(ends, Validity::NonNullable),
51                    values.into_array(),
52                )
53            })
54        }
55    };
56
57    let ends = ends
58        .downcast()
59        .vortex_expect("Ends must succeed downcasting")
60        .to_primitive();
61
62    (ends, values)
63}
64
65fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
66    let mut ends = BufferMut::empty();
67    let mut values = BufferMut::empty();
68
69    if elements.is_empty() {
70        return (ends.freeze(), values.freeze());
71    }
72
73    // Run-end encode the values
74    let mut prev = elements[0];
75    let mut end = 1;
76    for &e in elements.iter().skip(1) {
77        if e != prev {
78            ends.push(end);
79            values.push(prev);
80        }
81        prev = e;
82        end += 1;
83    }
84    ends.push(end);
85    values.push(prev);
86
87    (ends.freeze(), values.freeze())
88}
89
90fn runend_encode_nullable_primitive<T: NativePType>(
91    elements: &[T],
92    element_validity: BooleanBuffer,
93) -> (Buffer<u64>, PrimitiveArray) {
94    let mut ends = BufferMut::empty();
95    let mut values = BufferMut::empty();
96    let mut validity = BooleanBufferBuilder::new(values.capacity());
97
98    if elements.is_empty() {
99        return (
100            ends.freeze(),
101            PrimitiveArray::new(
102                values,
103                Validity::Array(BoolArray::from(validity.finish()).into_array()),
104            ),
105        );
106    }
107
108    // Run-end encode the values
109    let mut prev = element_validity.value(0).then(|| elements[0]);
110    let mut end = 1;
111    for e in elements
112        .iter()
113        .zip(element_validity.iter())
114        .map(|(&e, is_valid)| is_valid.then_some(e))
115        .skip(1)
116    {
117        if e != prev {
118            ends.push(end);
119            match prev {
120                None => {
121                    validity.append(false);
122                    values.push(T::default());
123                }
124                Some(p) => {
125                    validity.append(true);
126                    values.push(p);
127                }
128            }
129        }
130        prev = e;
131        end += 1;
132    }
133    ends.push(end);
134
135    match prev {
136        None => {
137            validity.append(false);
138            values.push(T::default());
139        }
140        Some(p) => {
141            validity.append(true);
142            values.push(p);
143        }
144    }
145
146    (
147        ends.freeze(),
148        PrimitiveArray::new(values, Validity::from(validity.finish())),
149    )
150}
151
152pub fn runend_decode_primitive(
153    ends: PrimitiveArray,
154    values: PrimitiveArray,
155    offset: usize,
156    length: usize,
157) -> PrimitiveArray {
158    match_each_native_ptype!(values.ptype(), |P| {
159        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
160            runend_decode_typed_primitive(
161                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
162                values.as_slice::<P>(),
163                values.validity_mask(),
164                values.dtype().nullability(),
165                length,
166            )
167        })
168    })
169}
170
171pub fn runend_decode_bools(
172    ends: PrimitiveArray,
173    values: BoolArray,
174    offset: usize,
175    length: usize,
176) -> BoolArray {
177    match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
178        runend_decode_typed_bool(
179            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
180            values.boolean_buffer().clone(),
181            values.validity_mask(),
182            values.dtype().nullability(),
183            length,
184        )
185    })
186}
187
188pub fn runend_decode_typed_primitive<T: NativePType>(
189    run_ends: impl Iterator<Item = usize>,
190    values: &[T],
191    values_validity: Mask,
192    values_nullability: Nullability,
193    length: usize,
194) -> PrimitiveArray {
195    match values_validity {
196        Mask::AllTrue(_) => {
197            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
198            for (end, value) in run_ends.zip_eq(values) {
199                assert!(end <= length, "Runend end must be less than overall length");
200                // SAFETY:
201                // We preallocate enough capacity because we know the total length
202                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
203            }
204            PrimitiveArray::new(decoded, values_nullability.into())
205        }
206        Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
207        Mask::Values(mask) => {
208            let mut decoded = BufferMut::with_capacity(length);
209            let mut decoded_validity = BooleanBufferBuilder::new(length);
210            for (end, value) in run_ends.zip_eq(
211                values
212                    .iter()
213                    .zip(mask.boolean_buffer().iter())
214                    .map(|(&v, is_valid)| is_valid.then_some(v)),
215            ) {
216                assert!(end <= length, "Runend end must be less than overall length");
217                match value {
218                    None => {
219                        decoded_validity.append_n(end - decoded.len(), false);
220                        // SAFETY:
221                        // We preallocate enough capacity because we know the total length
222                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
223                    }
224                    Some(value) => {
225                        decoded_validity.append_n(end - decoded.len(), true);
226                        // SAFETY:
227                        // We preallocate enough capacity because we know the total length
228                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
229                    }
230                }
231            }
232            PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
233        }
234    }
235}
236
237pub fn runend_decode_typed_bool(
238    run_ends: impl Iterator<Item = usize>,
239    values: BooleanBuffer,
240    values_validity: Mask,
241    values_nullability: Nullability,
242    length: usize,
243) -> BoolArray {
244    match values_validity {
245        Mask::AllTrue(_) => {
246            let mut decoded = BooleanBufferBuilder::new(length);
247            for (end, value) in run_ends.zip_eq(values.iter()) {
248                decoded.append_n(end - decoded.len(), value);
249            }
250            BoolArray::new(decoded.finish(), values_nullability.into())
251        }
252        Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
253        Mask::Values(mask) => {
254            let mut decoded = BooleanBufferBuilder::new(length);
255            let mut decoded_validity = BooleanBufferBuilder::new(length);
256            for (end, value) in run_ends.zip_eq(
257                values
258                    .iter()
259                    .zip(mask.boolean_buffer().iter())
260                    .map(|(v, is_valid)| is_valid.then_some(v)),
261            ) {
262                match value {
263                    None => {
264                        decoded_validity.append_n(end - decoded.len(), false);
265                        decoded.append_n(end - decoded.len(), false);
266                    }
267                    Some(value) => {
268                        decoded_validity.append_n(end - decoded.len(), true);
269                        decoded.append_n(end - decoded.len(), value);
270                    }
271                }
272            }
273            BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
274        }
275    }
276}
277
278#[cfg(test)]
279mod test {
280    use arrow_buffer::BooleanBuffer;
281    use vortex_array::ToCanonical;
282    use vortex_array::arrays::PrimitiveArray;
283    use vortex_array::validity::Validity;
284    use vortex_buffer::buffer;
285
286    use crate::compress::{runend_decode_primitive, runend_encode};
287
288    #[test]
289    fn encode() {
290        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
291        let (ends, values) = runend_encode(&arr);
292        let values = values.to_primitive();
293
294        assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
295        assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
296    }
297
298    #[test]
299    fn encode_nullable() {
300        let arr = PrimitiveArray::new(
301            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
302            Validity::from(BooleanBuffer::from(vec![
303                true, true, false, false, true, true, true, true, false, false,
304            ])),
305        );
306        let (ends, values) = runend_encode(&arr);
307        let values = values.to_primitive();
308
309        assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
310        assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
311    }
312
313    #[test]
314    fn encode_all_null() {
315        let arr = PrimitiveArray::new(
316            buffer![0, 0, 0, 0, 0],
317            Validity::from(BooleanBuffer::new_unset(5)),
318        );
319        let (ends, values) = runend_encode(&arr);
320        let values = values.to_primitive();
321
322        assert_eq!(ends.as_slice::<u64>(), vec![5]);
323        assert_eq!(values.as_slice::<i32>(), vec![0]);
324    }
325
326    #[test]
327    fn decode() {
328        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
329        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
330        let decoded = runend_decode_primitive(ends, values, 0, 10);
331
332        assert_eq!(
333            decoded.as_slice::<i32>(),
334            vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
335        );
336    }
337}