vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::DeserializeMetadata;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::ToCanonical;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::buffer::BufferHandle;
20use vortex_array::search_sorted::SearchSorted;
21use vortex_array::search_sorted::SearchSortedSide;
22use vortex_array::serde::ArrayChildren;
23use vortex_array::stats::ArrayStats;
24use vortex_array::stats::StatsSetRef;
25use vortex_array::validity::Validity;
26use vortex_array::vtable;
27use vortex_array::vtable::ArrayId;
28use vortex_array::vtable::ArrayVTable;
29use vortex_array::vtable::ArrayVTableExt;
30use vortex_array::vtable::BaseArrayVTable;
31use vortex_array::vtable::CanonicalVTable;
32use vortex_array::vtable::NotSupported;
33use vortex_array::vtable::VTable;
34use vortex_array::vtable::ValidityVTable;
35use vortex_dtype::DType;
36use vortex_dtype::Nullability;
37use vortex_dtype::PType;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_mask::Mask;
44use vortex_scalar::PValue;
45
46use crate::compress::runend_decode_bools;
47use crate::compress::runend_decode_primitive;
48use crate::compress::runend_encode;
49use crate::rules::RULES;
50
51vtable!(RunEnd);
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55    #[prost(enumeration = "PType", tag = "1")]
56    pub ends_ptype: i32,
57    #[prost(uint64, tag = "2")]
58    pub num_runs: u64,
59    #[prost(uint64, tag = "3")]
60    pub offset: u64,
61}
62
63impl VTable for RunEndVTable {
64    type Array = RunEndArray;
65
66    type Metadata = ProstMetadata<RunEndMetadata>;
67
68    type ArrayVTable = Self;
69    type CanonicalVTable = Self;
70    type OperationsVTable = Self;
71    type ValidityVTable = Self;
72    type VisitorVTable = Self;
73    type ComputeVTable = NotSupported;
74    type EncodeVTable = Self;
75
76    fn id(&self) -> ArrayId {
77        ArrayId::new_ref("vortex.runend")
78    }
79
80    fn encoding(_array: &Self::Array) -> ArrayVTable {
81        RunEndVTable.as_vtable()
82    }
83
84    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
85        Ok(ProstMetadata(RunEndMetadata {
86            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
87                as i32,
88            num_runs: array.ends().len() as u64,
89            offset: array.offset() as u64,
90        }))
91    }
92
93    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
94        Ok(Some(metadata.serialize()))
95    }
96
97    fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
98        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(buffer)?;
99        Ok(ProstMetadata(inner))
100    }
101
102    fn build(
103        &self,
104        dtype: &DType,
105        len: usize,
106        metadata: &Self::Metadata,
107        _buffers: &[BufferHandle],
108        children: &dyn ArrayChildren,
109    ) -> VortexResult<RunEndArray> {
110        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
111        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
112        let ends = children.get(0, &ends_dtype, runs)?;
113
114        let values = children.get(1, dtype, runs)?;
115
116        RunEndArray::try_new_offset_length(
117            ends,
118            values,
119            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
120            len,
121        )
122    }
123
124    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
125        vortex_ensure!(
126            children.len() == 2,
127            "RunEndArray expects 2 children, got {}",
128            children.len()
129        );
130
131        let mut children_iter = children.into_iter();
132        array.ends = children_iter.next().vortex_expect("ends child");
133        array.values = children_iter.next().vortex_expect("values child");
134
135        Ok(())
136    }
137
138    fn reduce_parent(
139        array: &Self::Array,
140        parent: &ArrayRef,
141        child_idx: usize,
142    ) -> VortexResult<Option<ArrayRef>> {
143        RULES.evaluate(array, parent, child_idx)
144    }
145}
146
147#[derive(Clone, Debug)]
148pub struct RunEndArray {
149    ends: ArrayRef,
150    values: ArrayRef,
151    offset: usize,
152    length: usize,
153    stats_set: ArrayStats,
154}
155
156#[derive(Debug)]
157pub struct RunEndVTable;
158
159impl RunEndArray {
160    fn validate(
161        ends: &dyn Array,
162        values: &dyn Array,
163        offset: usize,
164        length: usize,
165    ) -> VortexResult<()> {
166        // DType validation
167        vortex_ensure!(
168            ends.dtype().is_unsigned_int(),
169            "run ends must be unsigned integers, was {}",
170            ends.dtype(),
171        );
172        vortex_ensure!(
173            values.dtype().is_primitive() || values.dtype().is_boolean(),
174            "RunEnd array can only have Bool or Primitive values, {} given",
175            values.dtype()
176        );
177
178        vortex_ensure!(
179            ends.len() == values.len(),
180            "run ends len != run values len, {} != {}",
181            ends.len(),
182            values.len()
183        );
184
185        // Handle empty run-ends
186        if ends.is_empty() {
187            vortex_ensure!(
188                offset == 0,
189                "non-zero offset provided for empty RunEndArray"
190            );
191            return Ok(());
192        }
193
194        // Verify that the ends are strictly monotonic.
195        if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
196            vortex_ensure!(is_sorted, "run ends must be monotonic");
197        } else {
198            // TODO(aduffy): fallback to slow scalar scanning if computefn impl not available?
199            vortex_bail!("run ends must report IsStrictSorted");
200        }
201
202        // Validate the offset and length are valid for the given ends and values
203        if offset != 0 && length != 0 {
204            let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
205            if first_run_end <= offset {
206                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
207            }
208        }
209
210        Ok(())
211    }
212}
213
214impl RunEndArray {
215    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
216    ///
217    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
218    /// not satisfied.
219    ///
220    /// # Examples
221    ///
222    /// ```
223    /// # use vortex_array::arrays::BoolArray;
224    /// # use vortex_array::IntoArray;
225    /// # use vortex_buffer::buffer;
226    /// # use vortex_runend::RunEndArray;
227    /// let ends = buffer![2u8, 3u8].into_array();
228    /// let values = BoolArray::from_iter([false, true]).into_array();
229    /// let run_end = RunEndArray::new(ends, values);
230    ///
231    /// // Array encodes
232    /// assert_eq!(run_end.scalar_at(0), false.into());
233    /// assert_eq!(run_end.scalar_at(1), false.into());
234    /// assert_eq!(run_end.scalar_at(2), true.into());
235    /// ```
236    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
237        Self::try_new(ends, values).vortex_expect("RunEndArray new")
238    }
239
240    /// Build a new `RunEndArray` from components.
241    ///
242    /// # Validation
243    ///
244    /// The `ends` must be non-nullable unsigned integers. The values may be `Bool` or `Primitive`
245    /// types.
246    ///
247    /// All `ends` must be strictly monotonic. A run cannot be empty, therefore there can be no
248    /// duplicates in the `ends`.
249    ///
250    /// # Examples
251    ///
252    /// ```
253    /// # use vortex_array::arrays::{BoolArray, VarBinViewArray};
254    /// # use vortex_array::IntoArray;
255    /// # use vortex_buffer::buffer;
256    /// # use vortex_runend::RunEndArray;
257    ///
258    /// // Error to provide duplicate ends!
259    /// let result = RunEndArray::try_new(
260    ///     buffer![1u8, 1u8, 2u8].into_array(),
261    ///     buffer![100i32, 200i32, 300i32].into_array(),
262    /// );
263    /// assert!(result.is_err());
264    ///
265    /// // Error to provide incorrectly-typed values!
266    /// let result = RunEndArray::try_new(
267    ///     buffer![1u8, 2u8].into_array(),
268    ///     VarBinViewArray::from_iter_str(["bad", "dtype"]).into_array(),
269    /// );
270    /// assert!(result.is_err());
271    ///
272    /// // This array is happy
273    /// let result = RunEndArray::try_new(
274    ///     buffer![1u8, 2u8].into_array(),
275    ///     BoolArray::from_iter([false, true]).into_array(),
276    /// );
277    ///
278    /// assert!(result.is_ok());
279    /// ```
280    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
281        let length: usize = if ends.is_empty() {
282            0
283        } else {
284            ends.scalar_at(ends.len() - 1).as_ref().try_into()?
285        };
286
287        Self::try_new_offset_length(ends, values, 0, length)
288    }
289
290    /// Construct a new sliced `RunEndArray` with the provided offset and length.
291    ///
292    /// This performs all the same validation as [`RunEndArray::try_new`].
293    pub fn try_new_offset_length(
294        ends: ArrayRef,
295        values: ArrayRef,
296        offset: usize,
297        length: usize,
298    ) -> VortexResult<Self> {
299        Self::validate(&ends, &values, offset, length)?;
300
301        Ok(Self {
302            ends,
303            values,
304            offset,
305            length,
306            stats_set: Default::default(),
307        })
308    }
309
310    /// Build a new `RunEndArray` without validation.
311    ///
312    /// # Safety
313    ///
314    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
315    /// satisfied before calling this function.
316    ///
317    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
318    pub unsafe fn new_unchecked(
319        ends: ArrayRef,
320        values: ArrayRef,
321        offset: usize,
322        length: usize,
323    ) -> Self {
324        Self {
325            ends,
326            values,
327            offset,
328            length,
329            stats_set: Default::default(),
330        }
331    }
332
333    /// Convert the given logical index to an index into the `values` array
334    pub fn find_physical_index(&self, index: usize) -> usize {
335        self.ends()
336            .as_primitive_typed()
337            .search_sorted(
338                &PValue::from(index + self.offset()),
339                SearchSortedSide::Right,
340            )
341            .to_ends_index(self.ends().len())
342    }
343
344    /// Run the array through run-end encoding.
345    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
346        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
347            let (ends, values) = runend_encode(parray);
348            // SAFETY: runend_encode handles this
349            unsafe {
350                Ok(Self::new_unchecked(
351                    ends.into_array(),
352                    values,
353                    0,
354                    array.len(),
355                ))
356            }
357        } else {
358            vortex_bail!("REE can only encode primitive arrays")
359        }
360    }
361
362    /// The offset that the `ends` is relative to.
363    ///
364    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
365    #[inline]
366    pub fn offset(&self) -> usize {
367        self.offset
368    }
369
370    /// The encoded "ends" of value runs.
371    ///
372    /// The `i`-th element indicates that there is a run of the same value, beginning
373    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
374    #[inline]
375    pub fn ends(&self) -> &ArrayRef {
376        &self.ends
377    }
378
379    /// The scalar values.
380    ///
381    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
382    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
383    #[inline]
384    pub fn values(&self) -> &ArrayRef {
385        &self.values
386    }
387}
388
389impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
390    fn len(array: &RunEndArray) -> usize {
391        array.length
392    }
393
394    fn dtype(array: &RunEndArray) -> &DType {
395        array.values.dtype()
396    }
397
398    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
399        array.stats_set.to_ref(array.as_ref())
400    }
401
402    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
403        array.ends.array_hash(state, precision);
404        array.values.array_hash(state, precision);
405        array.offset.hash(state);
406        array.length.hash(state);
407    }
408
409    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
410        array.ends.array_eq(&other.ends, precision)
411            && array.values.array_eq(&other.values, precision)
412            && array.offset == other.offset
413            && array.length == other.length
414    }
415}
416
417impl ValidityVTable<RunEndVTable> for RunEndVTable {
418    fn is_valid(array: &RunEndArray, index: usize) -> bool {
419        let physical_idx = array.find_physical_index(index);
420        array.values().is_valid(physical_idx)
421    }
422
423    fn all_valid(array: &RunEndArray) -> bool {
424        array.values().all_valid()
425    }
426
427    fn all_invalid(array: &RunEndArray) -> bool {
428        array.values().all_invalid()
429    }
430
431    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
432        Ok(match array.values().validity()? {
433            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
434            Validity::AllInvalid => Validity::AllInvalid,
435            Validity::Array(values_validity) => Validity::Array(unsafe {
436                RunEndArray::new_unchecked(
437                    array.ends().clone(),
438                    values_validity,
439                    array.offset(),
440                    array.len(),
441                )
442                .into_array()
443            }),
444        })
445    }
446
447    fn validity_mask(array: &RunEndArray) -> Mask {
448        match array.values().validity_mask() {
449            Mask::AllTrue(_) => Mask::AllTrue(array.len()),
450            Mask::AllFalse(_) => Mask::AllFalse(array.len()),
451            Mask::Values(values) => {
452                // SAFETY: we preserve ends from an existing validated RunEndArray.
453                //  Validity is checked on construction to have the correct len.
454                let ree_validity = unsafe {
455                    RunEndArray::new_unchecked(
456                        array.ends().clone(),
457                        values.into_array(),
458                        array.offset(),
459                        array.len(),
460                    )
461                    .into_array()
462                };
463                Mask::from_buffer(ree_validity.to_bool().bit_buffer().clone())
464            }
465        }
466    }
467}
468
469impl CanonicalVTable<RunEndVTable> for RunEndVTable {
470    fn canonicalize(array: &RunEndArray) -> Canonical {
471        let pends = array.ends().to_primitive();
472        match array.dtype() {
473            DType::Bool(_) => {
474                let bools = array.values().to_bool();
475                Canonical::Bool(runend_decode_bools(
476                    pends,
477                    bools,
478                    array.offset(),
479                    array.len(),
480                ))
481            }
482            DType::Primitive(..) => {
483                let pvalues = array.values().to_primitive();
484                Canonical::Primitive(runend_decode_primitive(
485                    pends,
486                    pvalues,
487                    array.offset(),
488                    array.len(),
489                ))
490            }
491            _ => vortex_panic!("Only Primitive and Bool values are supported"),
492        }
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use vortex_array::IntoArray;
499    use vortex_array::assert_arrays_eq;
500    use vortex_buffer::buffer;
501    use vortex_dtype::DType;
502    use vortex_dtype::Nullability;
503    use vortex_dtype::PType;
504
505    use crate::RunEndArray;
506
507    #[test]
508    fn test_runend_constructor() {
509        let arr = RunEndArray::new(
510            buffer![2u32, 5, 10].into_array(),
511            buffer![1i32, 2, 3].into_array(),
512        );
513        assert_eq!(arr.len(), 10);
514        assert_eq!(
515            arr.dtype(),
516            &DType::Primitive(PType::I32, Nullability::NonNullable)
517        );
518
519        // 0, 1 => 1
520        // 2, 3, 4 => 2
521        // 5, 6, 7, 8, 9 => 3
522        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
523        assert_arrays_eq!(arr.to_array(), expected);
524    }
525}