vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_buffer::BooleanBufferBuilder;
5use itertools::Itertools;
6use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
7use vortex_array::validity::Validity;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, buffer};
11use vortex_dtype::{
12    NativePType, Nullability, match_each_native_ptype, match_each_unsigned_integer_ptype,
13};
14use vortex_error::VortexResult;
15use vortex_mask::Mask;
16use vortex_scalar::Scalar;
17
18use crate::iter::trimmed_ends_iter;
19
20/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
21pub fn runend_encode(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, ArrayRef)> {
22    let validity = match array.validity() {
23        Validity::NonNullable => None,
24        Validity::AllValid => None,
25        Validity::AllInvalid => {
26            // We can trivially return an all-null REE array
27            return Ok((
28                PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
29                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
30            ));
31        }
32        Validity::Array(a) => Some(a.to_bool()?.boolean_buffer().clone()),
33    };
34
35    let (ends, values) = match validity {
36        None => {
37            match_each_native_ptype!(array.ptype(), |P| {
38                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
39                (
40                    PrimitiveArray::new(ends, Validity::NonNullable),
41                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
42                )
43            })
44        }
45        Some(validity) => {
46            match_each_native_ptype!(array.ptype(), |P| {
47                let (ends, values) =
48                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
49                (
50                    PrimitiveArray::new(ends, Validity::NonNullable),
51                    values.into_array(),
52                )
53            })
54        }
55    };
56
57    let ends = ends.downcast()?.to_primitive()?;
58
59    Ok((ends, values))
60}
61
62fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
63    let mut ends = BufferMut::empty();
64    let mut values = BufferMut::empty();
65
66    if elements.is_empty() {
67        return (ends.freeze(), values.freeze());
68    }
69
70    // Run-end encode the values
71    let mut prev = elements[0];
72    let mut end = 1;
73    for &e in elements.iter().skip(1) {
74        if e != prev {
75            ends.push(end);
76            values.push(prev);
77        }
78        prev = e;
79        end += 1;
80    }
81    ends.push(end);
82    values.push(prev);
83
84    (ends.freeze(), values.freeze())
85}
86
87fn runend_encode_nullable_primitive<T: NativePType>(
88    elements: &[T],
89    element_validity: BooleanBuffer,
90) -> (Buffer<u64>, PrimitiveArray) {
91    let mut ends = BufferMut::empty();
92    let mut values = BufferMut::empty();
93    let mut validity = BooleanBufferBuilder::new(values.capacity());
94
95    if elements.is_empty() {
96        return (
97            ends.freeze(),
98            PrimitiveArray::new(
99                values,
100                Validity::Array(BoolArray::from(validity.finish()).into_array()),
101            ),
102        );
103    }
104
105    // Run-end encode the values
106    let mut prev = element_validity.value(0).then(|| elements[0]);
107    let mut end = 1;
108    for e in elements
109        .iter()
110        .zip(element_validity.iter())
111        .map(|(&e, is_valid)| is_valid.then_some(e))
112        .skip(1)
113    {
114        if e != prev {
115            ends.push(end);
116            match prev {
117                None => {
118                    validity.append(false);
119                    values.push(T::default());
120                }
121                Some(p) => {
122                    validity.append(true);
123                    values.push(p);
124                }
125            }
126        }
127        prev = e;
128        end += 1;
129    }
130    ends.push(end);
131
132    match prev {
133        None => {
134            validity.append(false);
135            values.push(T::default());
136        }
137        Some(p) => {
138            validity.append(true);
139            values.push(p);
140        }
141    }
142
143    (
144        ends.freeze(),
145        PrimitiveArray::new(values, Validity::from(validity.finish())),
146    )
147}
148
149pub fn runend_decode_primitive(
150    ends: PrimitiveArray,
151    values: PrimitiveArray,
152    offset: usize,
153    length: usize,
154) -> VortexResult<PrimitiveArray> {
155    match_each_native_ptype!(values.ptype(), |P| {
156        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
157            runend_decode_typed_primitive(
158                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
159                values.as_slice::<P>(),
160                values.validity_mask()?,
161                values.dtype().nullability(),
162                length,
163            )
164        })
165    })
166}
167
168pub fn runend_decode_bools(
169    ends: PrimitiveArray,
170    values: BoolArray,
171    offset: usize,
172    length: usize,
173) -> VortexResult<BoolArray> {
174    match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
175        runend_decode_typed_bool(
176            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
177            values.boolean_buffer().clone(),
178            values.validity_mask()?,
179            values.dtype().nullability(),
180            length,
181        )
182    })
183}
184
185pub fn runend_decode_typed_primitive<T: NativePType>(
186    run_ends: impl Iterator<Item = usize>,
187    values: &[T],
188    values_validity: Mask,
189    values_nullability: Nullability,
190    length: usize,
191) -> VortexResult<PrimitiveArray> {
192    Ok(match values_validity {
193        Mask::AllTrue(_) => {
194            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
195            for (end, value) in run_ends.zip_eq(values) {
196                assert!(end <= length, "Runend end must be less than overall length");
197                // SAFETY:
198                // We preallocate enough capacity because we know the total length
199                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
200            }
201            PrimitiveArray::new(decoded, values_nullability.into())
202        }
203        Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
204        Mask::Values(mask) => {
205            let mut decoded = BufferMut::with_capacity(length);
206            let mut decoded_validity = BooleanBufferBuilder::new(length);
207            for (end, value) in run_ends.zip_eq(
208                values
209                    .iter()
210                    .zip(mask.boolean_buffer().iter())
211                    .map(|(&v, is_valid)| is_valid.then_some(v)),
212            ) {
213                assert!(end <= length, "Runend end must be less than overall length");
214                match value {
215                    None => {
216                        decoded_validity.append_n(end - decoded.len(), false);
217                        // SAFETY:
218                        // We preallocate enough capacity because we know the total length
219                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
220                    }
221                    Some(value) => {
222                        decoded_validity.append_n(end - decoded.len(), true);
223                        // SAFETY:
224                        // We preallocate enough capacity because we know the total length
225                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
226                    }
227                }
228            }
229            PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
230        }
231    })
232}
233
234pub fn runend_decode_typed_bool(
235    run_ends: impl Iterator<Item = usize>,
236    values: BooleanBuffer,
237    values_validity: Mask,
238    values_nullability: Nullability,
239    length: usize,
240) -> VortexResult<BoolArray> {
241    Ok(match values_validity {
242        Mask::AllTrue(_) => {
243            let mut decoded = BooleanBufferBuilder::new(length);
244            for (end, value) in run_ends.zip_eq(values.iter()) {
245                decoded.append_n(end - decoded.len(), value);
246            }
247            BoolArray::new(decoded.finish(), values_nullability.into())
248        }
249        Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
250        Mask::Values(mask) => {
251            let mut decoded = BooleanBufferBuilder::new(length);
252            let mut decoded_validity = BooleanBufferBuilder::new(length);
253            for (end, value) in run_ends.zip_eq(
254                values
255                    .iter()
256                    .zip(mask.boolean_buffer().iter())
257                    .map(|(v, is_valid)| is_valid.then_some(v)),
258            ) {
259                match value {
260                    None => {
261                        decoded_validity.append_n(end - decoded.len(), false);
262                        decoded.append_n(end - decoded.len(), false);
263                    }
264                    Some(value) => {
265                        decoded_validity.append_n(end - decoded.len(), true);
266                        decoded.append_n(end - decoded.len(), value);
267                    }
268                }
269            }
270            BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
271        }
272    })
273}
274
275#[cfg(test)]
276mod test {
277    use arrow_buffer::BooleanBuffer;
278    use vortex_array::ToCanonical;
279    use vortex_array::arrays::PrimitiveArray;
280    use vortex_array::validity::Validity;
281    use vortex_buffer::buffer;
282
283    use crate::compress::{runend_decode_primitive, runend_encode};
284
285    #[test]
286    fn encode() {
287        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
288        let (ends, values) = runend_encode(&arr).unwrap();
289        let values = values.to_primitive().unwrap();
290
291        assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
292        assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
293    }
294
295    #[test]
296    fn encode_nullable() {
297        let arr = PrimitiveArray::new(
298            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
299            Validity::from(BooleanBuffer::from(vec![
300                true, true, false, false, true, true, true, true, false, false,
301            ])),
302        );
303        let (ends, values) = runend_encode(&arr).unwrap();
304        let values = values.to_primitive().unwrap();
305
306        assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
307        assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
308    }
309
310    #[test]
311    fn encode_all_null() {
312        let arr = PrimitiveArray::new(
313            buffer![0, 0, 0, 0, 0],
314            Validity::from(BooleanBuffer::new_unset(5)),
315        );
316        let (ends, values) = runend_encode(&arr).unwrap();
317        let values = values.to_primitive().unwrap();
318
319        assert_eq!(ends.as_slice::<u64>(), vec![5]);
320        assert_eq!(values.as_slice::<i32>(), vec![0]);
321    }
322
323    #[test]
324    fn decode() {
325        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
326        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
327        let decoded = runend_decode_primitive(ends, values, 0, 10).unwrap();
328
329        assert_eq!(
330            decoded.as_slice::<i32>(),
331            vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
332        );
333    }
334}