vortex_runend/
compress.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::IntoArray;
7use vortex_array::ToCanonical;
8use vortex_array::arrays::BoolArray;
9use vortex_array::arrays::ConstantArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::validity::Validity;
12use vortex_array::vtable::ValidityHelper;
13use vortex_buffer::BitBuffer;
14use vortex_buffer::BitBufferMut;
15use vortex_buffer::Buffer;
16use vortex_buffer::BufferMut;
17use vortex_buffer::buffer;
18use vortex_dtype::NativePType;
19use vortex_dtype::Nullability;
20use vortex_dtype::match_each_native_ptype;
21use vortex_dtype::match_each_unsigned_integer_ptype;
22use vortex_error::VortexExpect;
23use vortex_mask::Mask;
24use vortex_scalar::Scalar;
25
26use crate::iter::trimmed_ends_iter;
27
28/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
29pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
30    let validity = match array.validity() {
31        Validity::NonNullable => None,
32        Validity::AllValid => None,
33        Validity::AllInvalid => {
34            // We can trivially return an all-null REE array
35            return (
36                PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
37                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
38            );
39        }
40        Validity::Array(a) => Some(a.to_bool().bit_buffer().clone()),
41    };
42
43    let (ends, values) = match validity {
44        None => {
45            match_each_native_ptype!(array.ptype(), |P| {
46                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
47                (
48                    PrimitiveArray::new(ends, Validity::NonNullable),
49                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
50                )
51            })
52        }
53        Some(validity) => {
54            match_each_native_ptype!(array.ptype(), |P| {
55                let (ends, values) =
56                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
57                (
58                    PrimitiveArray::new(ends, Validity::NonNullable),
59                    values.into_array(),
60                )
61            })
62        }
63    };
64
65    let ends = ends
66        .narrow()
67        .vortex_expect("Ends must succeed downcasting")
68        .to_primitive();
69
70    (ends, values)
71}
72
73fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
74    let mut ends = BufferMut::empty();
75    let mut values = BufferMut::empty();
76
77    if elements.is_empty() {
78        return (ends.freeze(), values.freeze());
79    }
80
81    // Run-end encode the values
82    let mut prev = elements[0];
83    let mut end = 1;
84    for &e in elements.iter().skip(1) {
85        if e != prev {
86            ends.push(end);
87            values.push(prev);
88        }
89        prev = e;
90        end += 1;
91    }
92    ends.push(end);
93    values.push(prev);
94
95    (ends.freeze(), values.freeze())
96}
97
98fn runend_encode_nullable_primitive<T: NativePType>(
99    elements: &[T],
100    element_validity: BitBuffer,
101) -> (Buffer<u64>, PrimitiveArray) {
102    let mut ends = BufferMut::empty();
103    let mut values = BufferMut::empty();
104    let mut validity = BitBufferMut::with_capacity(values.capacity());
105
106    if elements.is_empty() {
107        return (
108            ends.freeze(),
109            PrimitiveArray::new(
110                values,
111                Validity::Array(BoolArray::from(validity.freeze()).into_array()),
112            ),
113        );
114    }
115
116    // Run-end encode the values
117    let mut prev = element_validity.value(0).then(|| elements[0]);
118    let mut end = 1;
119    for e in elements
120        .iter()
121        .zip(element_validity.iter())
122        .map(|(&e, is_valid)| is_valid.then_some(e))
123        .skip(1)
124    {
125        if e != prev {
126            ends.push(end);
127            match prev {
128                None => {
129                    validity.append(false);
130                    values.push(T::default());
131                }
132                Some(p) => {
133                    validity.append(true);
134                    values.push(p);
135                }
136            }
137        }
138        prev = e;
139        end += 1;
140    }
141    ends.push(end);
142
143    match prev {
144        None => {
145            validity.append(false);
146            values.push(T::default());
147        }
148        Some(p) => {
149            validity.append(true);
150            values.push(p);
151        }
152    }
153
154    (
155        ends.freeze(),
156        PrimitiveArray::new(values, Validity::from(validity.freeze())),
157    )
158}
159
160pub fn runend_decode_primitive(
161    ends: PrimitiveArray,
162    values: PrimitiveArray,
163    offset: usize,
164    length: usize,
165) -> PrimitiveArray {
166    match_each_native_ptype!(values.ptype(), |P| {
167        match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
168            runend_decode_typed_primitive(
169                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
170                values.as_slice::<P>(),
171                values.validity_mask(),
172                values.dtype().nullability(),
173                length,
174            )
175        })
176    })
177}
178
179pub fn runend_decode_bools(
180    ends: PrimitiveArray,
181    values: BoolArray,
182    offset: usize,
183    length: usize,
184) -> BoolArray {
185    match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
186        runend_decode_typed_bool(
187            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
188            values.bit_buffer(),
189            values.validity_mask(),
190            values.dtype().nullability(),
191            length,
192        )
193    })
194}
195
196pub fn runend_decode_typed_primitive<T: NativePType>(
197    run_ends: impl Iterator<Item = usize>,
198    values: &[T],
199    values_validity: Mask,
200    values_nullability: Nullability,
201    length: usize,
202) -> PrimitiveArray {
203    match values_validity {
204        Mask::AllTrue(_) => {
205            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
206            for (end, value) in run_ends.zip_eq(values) {
207                assert!(end <= length, "Runend end must be less than overall length");
208                // SAFETY:
209                // We preallocate enough capacity because we know the total length
210                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
211            }
212            PrimitiveArray::new(decoded, values_nullability.into())
213        }
214        Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
215        Mask::Values(mask) => {
216            let mut decoded = BufferMut::with_capacity(length);
217            let mut decoded_validity = BitBufferMut::with_capacity(length);
218            for (end, value) in run_ends.zip_eq(
219                values
220                    .iter()
221                    .zip(mask.bit_buffer().iter())
222                    .map(|(&v, is_valid)| is_valid.then_some(v)),
223            ) {
224                assert!(end <= length, "Runend end must be less than overall length");
225                match value {
226                    None => {
227                        decoded_validity.append_n(false, end - decoded.len());
228                        // SAFETY:
229                        // We preallocate enough capacity because we know the total length
230                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
231                    }
232                    Some(value) => {
233                        decoded_validity.append_n(true, end - decoded.len());
234                        // SAFETY:
235                        // We preallocate enough capacity because we know the total length
236                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
237                    }
238                }
239            }
240            PrimitiveArray::new(decoded, Validity::from(decoded_validity.freeze()))
241        }
242    }
243}
244
245pub fn runend_decode_typed_bool(
246    run_ends: impl Iterator<Item = usize>,
247    values: &BitBuffer,
248    values_validity: Mask,
249    values_nullability: Nullability,
250    length: usize,
251) -> BoolArray {
252    match values_validity {
253        Mask::AllTrue(_) => {
254            let mut decoded = BitBufferMut::with_capacity(length);
255            for (end, value) in run_ends.zip_eq(values.iter()) {
256                decoded.append_n(value, end - decoded.len());
257            }
258            BoolArray::from_bit_buffer(decoded.freeze(), values_nullability.into())
259        }
260        Mask::AllFalse(_) => {
261            BoolArray::from_bit_buffer(BitBuffer::new_unset(length), Validity::AllInvalid)
262        }
263        Mask::Values(mask) => {
264            let mut decoded = BitBufferMut::with_capacity(length);
265            let mut decoded_validity = BitBufferMut::with_capacity(length);
266            for (end, value) in run_ends.zip_eq(
267                values
268                    .iter()
269                    .zip(mask.bit_buffer().iter())
270                    .map(|(v, is_valid)| is_valid.then_some(v)),
271            ) {
272                match value {
273                    None => {
274                        decoded_validity.append_n(false, end - decoded.len());
275                        decoded.append_n(false, end - decoded.len());
276                    }
277                    Some(value) => {
278                        decoded_validity.append_n(true, end - decoded.len());
279                        decoded.append_n(value, end - decoded.len());
280                    }
281                }
282            }
283            BoolArray::from_bit_buffer(decoded.freeze(), Validity::from(decoded_validity.freeze()))
284        }
285    }
286}
287
288#[cfg(test)]
289mod test {
290    use vortex_array::ToCanonical;
291    use vortex_array::arrays::PrimitiveArray;
292    use vortex_array::assert_arrays_eq;
293    use vortex_array::validity::Validity;
294    use vortex_buffer::BitBuffer;
295    use vortex_buffer::buffer;
296
297    use crate::compress::runend_decode_primitive;
298    use crate::compress::runend_encode;
299
300    #[test]
301    fn encode() {
302        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
303        let (ends, values) = runend_encode(&arr);
304        let values = values.to_primitive();
305
306        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
307        assert_arrays_eq!(ends, expected_ends);
308        let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
309        assert_arrays_eq!(values, expected_values);
310    }
311
312    #[test]
313    fn encode_nullable() {
314        let arr = PrimitiveArray::new(
315            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
316            Validity::from(BitBuffer::from(vec![
317                true, true, false, false, true, true, true, true, false, false,
318            ])),
319        );
320        let (ends, values) = runend_encode(&arr);
321        let values = values.to_primitive();
322
323        let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
324        assert_arrays_eq!(ends, expected_ends);
325        let expected_values =
326            PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
327        assert_arrays_eq!(values, expected_values);
328    }
329
330    #[test]
331    fn encode_all_null() {
332        let arr = PrimitiveArray::new(
333            buffer![0, 0, 0, 0, 0],
334            Validity::from(BitBuffer::new_unset(5)),
335        );
336        let (ends, values) = runend_encode(&arr);
337        let values = values.to_primitive();
338
339        let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
340        assert_arrays_eq!(ends, expected_ends);
341        let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
342        assert_arrays_eq!(values, expected_values);
343    }
344
345    #[test]
346    fn decode() {
347        let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
348        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
349        let decoded = runend_decode_primitive(ends, values, 0, 10);
350
351        let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
352        assert_arrays_eq!(decoded, expected);
353    }
354}