vortex_runend/
compress.rs

1use arrow_buffer::BooleanBufferBuilder;
2use itertools::Itertools;
3use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
4use vortex_array::compress::downscale_integer_array;
5use vortex_array::validity::Validity;
6use vortex_array::vtable::ValidityHelper;
7use vortex_array::{ArrayRef, IntoArray, ToCanonical};
8use vortex_buffer::{Buffer, BufferMut, buffer};
9use vortex_dtype::{NativePType, Nullability, match_each_integer_ptype, match_each_native_ptype};
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12use vortex_scalar::Scalar;
13
14use crate::iter::trimmed_ends_iter;
15
16/// Run-end encode a `PrimitiveArray`, returning a tuple of `(ends, values)`.
17pub fn runend_encode(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, ArrayRef)> {
18    let validity = match array.validity() {
19        Validity::NonNullable => None,
20        Validity::AllValid => None,
21        Validity::AllInvalid => {
22            // We can trivially return an all-null REE array
23            return Ok((
24                PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
25                ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
26            ));
27        }
28        Validity::Array(a) => Some(a.to_bool()?.boolean_buffer().clone()),
29    };
30
31    let (ends, values) = match validity {
32        None => {
33            match_each_native_ptype!(array.ptype(), |P| {
34                let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
35                (
36                    PrimitiveArray::new(ends, Validity::NonNullable),
37                    PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
38                )
39            })
40        }
41        Some(validity) => {
42            match_each_native_ptype!(array.ptype(), |P| {
43                let (ends, values) =
44                    runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
45                (
46                    PrimitiveArray::new(ends, Validity::NonNullable),
47                    values.into_array(),
48                )
49            })
50        }
51    };
52
53    let ends = downscale_integer_array(ends.to_array())?.to_primitive()?;
54
55    Ok((ends, values))
56}
57
58fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
59    let mut ends = BufferMut::empty();
60    let mut values = BufferMut::empty();
61
62    if elements.is_empty() {
63        return (ends.freeze(), values.freeze());
64    }
65
66    // Run-end encode the values
67    let mut prev = elements[0];
68    let mut end = 1;
69    for &e in elements.iter().skip(1) {
70        if e != prev {
71            ends.push(end);
72            values.push(prev);
73        }
74        prev = e;
75        end += 1;
76    }
77    ends.push(end);
78    values.push(prev);
79
80    (ends.freeze(), values.freeze())
81}
82
83fn runend_encode_nullable_primitive<T: NativePType>(
84    elements: &[T],
85    element_validity: BooleanBuffer,
86) -> (Buffer<u64>, PrimitiveArray) {
87    let mut ends = BufferMut::empty();
88    let mut values = BufferMut::empty();
89    let mut validity = BooleanBufferBuilder::new(values.capacity());
90
91    if elements.is_empty() {
92        return (
93            ends.freeze(),
94            PrimitiveArray::new(
95                values,
96                Validity::Array(BoolArray::from(validity.finish()).into_array()),
97            ),
98        );
99    }
100
101    // Run-end encode the values
102    let mut prev = element_validity.value(0).then(|| elements[0]);
103    let mut end = 1;
104    for e in elements
105        .iter()
106        .zip(element_validity.iter())
107        .map(|(&e, is_valid)| is_valid.then_some(e))
108        .skip(1)
109    {
110        if e != prev {
111            ends.push(end);
112            match prev {
113                None => {
114                    validity.append(false);
115                    values.push(T::default());
116                }
117                Some(p) => {
118                    validity.append(true);
119                    values.push(p);
120                }
121            }
122        }
123        prev = e;
124        end += 1;
125    }
126    ends.push(end);
127
128    match prev {
129        None => {
130            validity.append(false);
131            values.push(T::default());
132        }
133        Some(p) => {
134            validity.append(true);
135            values.push(p);
136        }
137    }
138
139    (
140        ends.freeze(),
141        PrimitiveArray::new(values, Validity::from(validity.finish())),
142    )
143}
144
145pub fn runend_decode_primitive(
146    ends: PrimitiveArray,
147    values: PrimitiveArray,
148    offset: usize,
149    length: usize,
150) -> VortexResult<PrimitiveArray> {
151    match_each_native_ptype!(values.ptype(), |P| {
152        match_each_integer_ptype!(ends.ptype(), |E| {
153            runend_decode_typed_primitive(
154                trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
155                values.as_slice::<P>(),
156                values.validity_mask()?,
157                values.dtype().nullability(),
158                length,
159            )
160        })
161    })
162}
163
164pub fn runend_decode_bools(
165    ends: PrimitiveArray,
166    values: BoolArray,
167    offset: usize,
168    length: usize,
169) -> VortexResult<BoolArray> {
170    match_each_integer_ptype!(ends.ptype(), |E| {
171        runend_decode_typed_bool(
172            trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
173            values.boolean_buffer().clone(),
174            values.validity_mask()?,
175            values.dtype().nullability(),
176            length,
177        )
178    })
179}
180
181pub fn runend_decode_typed_primitive<T: NativePType>(
182    run_ends: impl Iterator<Item = usize>,
183    values: &[T],
184    values_validity: Mask,
185    values_nullability: Nullability,
186    length: usize,
187) -> VortexResult<PrimitiveArray> {
188    Ok(match values_validity {
189        Mask::AllTrue(_) => {
190            let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
191            for (end, value) in run_ends.zip_eq(values) {
192                assert!(end <= length, "Runend end must be less than overall length");
193                // SAFETY:
194                // We preallocate enough capacity because we know the total length
195                unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
196            }
197            PrimitiveArray::new(decoded, values_nullability.into())
198        }
199        Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
200        Mask::Values(mask) => {
201            let mut decoded = BufferMut::with_capacity(length);
202            let mut decoded_validity = BooleanBufferBuilder::new(length);
203            for (end, value) in run_ends.zip_eq(
204                values
205                    .iter()
206                    .zip(mask.boolean_buffer().iter())
207                    .map(|(&v, is_valid)| is_valid.then_some(v)),
208            ) {
209                assert!(end <= length, "Runend end must be less than overall length");
210                match value {
211                    None => {
212                        decoded_validity.append_n(end - decoded.len(), false);
213                        // SAFETY:
214                        // We preallocate enough capacity because we know the total length
215                        unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
216                    }
217                    Some(value) => {
218                        decoded_validity.append_n(end - decoded.len(), true);
219                        // SAFETY:
220                        // We preallocate enough capacity because we know the total length
221                        unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
222                    }
223                }
224            }
225            PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
226        }
227    })
228}
229
230pub fn runend_decode_typed_bool(
231    run_ends: impl Iterator<Item = usize>,
232    values: BooleanBuffer,
233    values_validity: Mask,
234    values_nullability: Nullability,
235    length: usize,
236) -> VortexResult<BoolArray> {
237    Ok(match values_validity {
238        Mask::AllTrue(_) => {
239            let mut decoded = BooleanBufferBuilder::new(length);
240            for (end, value) in run_ends.zip_eq(values.iter()) {
241                decoded.append_n(end - decoded.len(), value);
242            }
243            BoolArray::new(decoded.finish(), values_nullability.into())
244        }
245        Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
246        Mask::Values(mask) => {
247            let mut decoded = BooleanBufferBuilder::new(length);
248            let mut decoded_validity = BooleanBufferBuilder::new(length);
249            for (end, value) in run_ends.zip_eq(
250                values
251                    .iter()
252                    .zip(mask.boolean_buffer().iter())
253                    .map(|(v, is_valid)| is_valid.then_some(v)),
254            ) {
255                match value {
256                    None => {
257                        decoded_validity.append_n(end - decoded.len(), false);
258                        decoded.append_n(end - decoded.len(), false);
259                    }
260                    Some(value) => {
261                        decoded_validity.append_n(end - decoded.len(), true);
262                        decoded.append_n(end - decoded.len(), value);
263                    }
264                }
265            }
266            BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
267        }
268    })
269}
270
271#[cfg(test)]
272mod test {
273    use arrow_buffer::BooleanBuffer;
274    use vortex_array::ToCanonical;
275    use vortex_array::arrays::PrimitiveArray;
276    use vortex_array::validity::Validity;
277    use vortex_buffer::buffer;
278
279    use crate::compress::{runend_decode_primitive, runend_encode};
280
281    #[test]
282    fn encode() {
283        let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
284        let (ends, values) = runend_encode(&arr).unwrap();
285        let values = values.to_primitive().unwrap();
286
287        assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
288        assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
289    }
290
291    #[test]
292    fn encode_nullable() {
293        let arr = PrimitiveArray::new(
294            buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
295            Validity::from(BooleanBuffer::from(vec![
296                true, true, false, false, true, true, true, true, false, false,
297            ])),
298        );
299        let (ends, values) = runend_encode(&arr).unwrap();
300        let values = values.to_primitive().unwrap();
301
302        assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
303        assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
304    }
305
306    #[test]
307    fn encode_all_null() {
308        let arr = PrimitiveArray::new(
309            buffer![0, 0, 0, 0, 0],
310            Validity::from(BooleanBuffer::new_unset(5)),
311        );
312        let (ends, values) = runend_encode(&arr).unwrap();
313        let values = values.to_primitive().unwrap();
314
315        assert_eq!(ends.as_slice::<u64>(), vec![5]);
316        assert_eq!(values.as_slice::<i32>(), vec![0]);
317    }
318
319    #[test]
320    fn decode() {
321        let ends = PrimitiveArray::from_iter([2, 5, 10]);
322        let values = PrimitiveArray::from_iter([1i32, 2, 3]);
323        let decoded = runend_decode_primitive(ends, values, 0, 10).unwrap();
324
325        assert_eq!(
326            decoded.as_slice::<i32>(),
327            vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
328        );
329    }
330}