vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::DeserializeMetadata;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::ToCanonical;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::search_sorted::SearchSorted;
20use vortex_array::search_sorted::SearchSortedSide;
21use vortex_array::serde::ArrayChildren;
22use vortex_array::stats::ArrayStats;
23use vortex_array::stats::StatsSetRef;
24use vortex_array::vtable;
25use vortex_array::vtable::ArrayId;
26use vortex_array::vtable::ArrayVTable;
27use vortex_array::vtable::ArrayVTableExt;
28use vortex_array::vtable::BaseArrayVTable;
29use vortex_array::vtable::CanonicalVTable;
30use vortex_array::vtable::NotSupported;
31use vortex_array::vtable::VTable;
32use vortex_array::vtable::ValidityVTable;
33use vortex_buffer::BufferHandle;
34use vortex_dtype::DType;
35use vortex_dtype::Nullability;
36use vortex_dtype::PType;
37use vortex_error::VortexExpect as _;
38use vortex_error::VortexResult;
39use vortex_error::vortex_bail;
40use vortex_error::vortex_ensure;
41use vortex_error::vortex_panic;
42use vortex_mask::Mask;
43use vortex_scalar::PValue;
44
45use crate::compress::runend_decode_bools;
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_encode;
48
49vtable!(RunEnd);
50
51#[derive(Clone, prost::Message)]
52pub struct RunEndMetadata {
53    #[prost(enumeration = "PType", tag = "1")]
54    pub ends_ptype: i32,
55    #[prost(uint64, tag = "2")]
56    pub num_runs: u64,
57    #[prost(uint64, tag = "3")]
58    pub offset: u64,
59}
60
61impl VTable for RunEndVTable {
62    type Array = RunEndArray;
63
64    type Metadata = ProstMetadata<RunEndMetadata>;
65
66    type ArrayVTable = Self;
67    type CanonicalVTable = Self;
68    type OperationsVTable = Self;
69    type ValidityVTable = Self;
70    type VisitorVTable = Self;
71    type ComputeVTable = NotSupported;
72    type EncodeVTable = Self;
73
74    fn id(&self) -> ArrayId {
75        ArrayId::new_ref("vortex.runend")
76    }
77
78    fn encoding(_array: &Self::Array) -> ArrayVTable {
79        RunEndVTable.as_vtable()
80    }
81
82    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
83        Ok(ProstMetadata(RunEndMetadata {
84            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
85                as i32,
86            num_runs: array.ends().len() as u64,
87            offset: array.offset() as u64,
88        }))
89    }
90
91    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
92        Ok(Some(metadata.serialize()))
93    }
94
95    fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
96        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(buffer)?;
97        Ok(ProstMetadata(inner))
98    }
99
100    fn build(
101        &self,
102        dtype: &DType,
103        len: usize,
104        metadata: &Self::Metadata,
105        _buffers: &[BufferHandle],
106        children: &dyn ArrayChildren,
107    ) -> VortexResult<RunEndArray> {
108        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
109        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
110        let ends = children.get(0, &ends_dtype, runs)?;
111
112        let values = children.get(1, dtype, runs)?;
113
114        RunEndArray::try_new_offset_length(
115            ends,
116            values,
117            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
118            len,
119        )
120    }
121}
122
123#[derive(Clone, Debug)]
124pub struct RunEndArray {
125    ends: ArrayRef,
126    values: ArrayRef,
127    offset: usize,
128    length: usize,
129    stats_set: ArrayStats,
130}
131
132#[derive(Debug)]
133pub struct RunEndVTable;
134
135impl RunEndArray {
136    fn validate(
137        ends: &dyn Array,
138        values: &dyn Array,
139        offset: usize,
140        length: usize,
141    ) -> VortexResult<()> {
142        // DType validation
143        vortex_ensure!(
144            ends.dtype().is_unsigned_int(),
145            "run ends must be unsigned integers, was {}",
146            ends.dtype(),
147        );
148        vortex_ensure!(
149            values.dtype().is_primitive() || values.dtype().is_boolean(),
150            "RunEnd array can only have Bool or Primitive values, {} given",
151            values.dtype()
152        );
153
154        vortex_ensure!(
155            ends.len() == values.len(),
156            "run ends len != run values len, {} != {}",
157            ends.len(),
158            values.len()
159        );
160
161        // Handle empty run-ends
162        if ends.is_empty() {
163            vortex_ensure!(
164                offset == 0,
165                "non-zero offset provided for empty RunEndArray"
166            );
167            return Ok(());
168        }
169
170        // Verify that the ends are strictly monotonic.
171        if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
172            vortex_ensure!(is_sorted, "run ends must be monotonic");
173        } else {
174            // TODO(aduffy): fallback to slow scalar scanning if computefn impl not available?
175            vortex_bail!("run ends must report IsStrictSorted");
176        }
177
178        // Validate the offset and length are valid for the given ends and values
179        if offset != 0 && length != 0 {
180            let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
181            if first_run_end <= offset {
182                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
183            }
184        }
185
186        Ok(())
187    }
188}
189
190impl RunEndArray {
191    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
192    ///
193    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
194    /// not satisfied.
195    ///
196    /// # Examples
197    ///
198    /// ```
199    /// # use vortex_array::arrays::BoolArray;
200    /// # use vortex_array::IntoArray;
201    /// # use vortex_buffer::buffer;
202    /// # use vortex_runend::RunEndArray;
203    /// let ends = buffer![2u8, 3u8].into_array();
204    /// let values = BoolArray::from_iter([false, true]).into_array();
205    /// let run_end = RunEndArray::new(ends, values);
206    ///
207    /// // Array encodes
208    /// assert_eq!(run_end.scalar_at(0), false.into());
209    /// assert_eq!(run_end.scalar_at(1), false.into());
210    /// assert_eq!(run_end.scalar_at(2), true.into());
211    /// ```
212    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
213        Self::try_new(ends, values).vortex_expect("RunEndArray new")
214    }
215
216    /// Build a new `RunEndArray` from components.
217    ///
218    /// # Validation
219    ///
220    /// The `ends` must be non-nullable unsigned integers. The values may be `Bool` or `Primitive`
221    /// types.
222    ///
223    /// All `ends` must be strictly monotonic. A run cannot be empty, therefore there can be no
224    /// duplicates in the `ends`.
225    ///
226    /// # Examples
227    ///
228    /// ```
229    /// # use vortex_array::arrays::{BoolArray, VarBinViewArray};
230    /// # use vortex_array::IntoArray;
231    /// # use vortex_buffer::buffer;
232    /// # use vortex_runend::RunEndArray;
233    ///
234    /// // Error to provide duplicate ends!
235    /// let result = RunEndArray::try_new(
236    ///     buffer![1u8, 1u8, 2u8].into_array(),
237    ///     buffer![100i32, 200i32, 300i32].into_array(),
238    /// );
239    /// assert!(result.is_err());
240    ///
241    /// // Error to provide incorrectly-typed values!
242    /// let result = RunEndArray::try_new(
243    ///     buffer![1u8, 2u8].into_array(),
244    ///     VarBinViewArray::from_iter_str(["bad", "dtype"]).into_array(),
245    /// );
246    /// assert!(result.is_err());
247    ///
248    /// // This array is happy
249    /// let result = RunEndArray::try_new(
250    ///     buffer![1u8, 2u8].into_array(),
251    ///     BoolArray::from_iter([false, true]).into_array(),
252    /// );
253    ///
254    /// assert!(result.is_ok());
255    /// ```
256    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
257        let length: usize = if ends.is_empty() {
258            0
259        } else {
260            ends.scalar_at(ends.len() - 1).as_ref().try_into()?
261        };
262
263        Self::try_new_offset_length(ends, values, 0, length)
264    }
265
266    /// Construct a new sliced `RunEndArray` with the provided offset and length.
267    ///
268    /// This performs all the same validation as [`RunEndArray::try_new`].
269    pub fn try_new_offset_length(
270        ends: ArrayRef,
271        values: ArrayRef,
272        offset: usize,
273        length: usize,
274    ) -> VortexResult<Self> {
275        Self::validate(&ends, &values, offset, length)?;
276
277        Ok(Self {
278            ends,
279            values,
280            offset,
281            length,
282            stats_set: Default::default(),
283        })
284    }
285
286    /// Build a new `RunEndArray` without validation.
287    ///
288    /// # Safety
289    ///
290    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
291    /// satisfied before calling this function.
292    ///
293    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
294    pub unsafe fn new_unchecked(
295        ends: ArrayRef,
296        values: ArrayRef,
297        offset: usize,
298        length: usize,
299    ) -> Self {
300        Self {
301            ends,
302            values,
303            offset,
304            length,
305            stats_set: Default::default(),
306        }
307    }
308
309    /// Convert the given logical index to an index into the `values` array
310    pub fn find_physical_index(&self, index: usize) -> usize {
311        self.ends()
312            .as_primitive_typed()
313            .search_sorted(
314                &PValue::from(index + self.offset()),
315                SearchSortedSide::Right,
316            )
317            .to_ends_index(self.ends().len())
318    }
319
320    /// Run the array through run-end encoding.
321    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
322        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
323            let (ends, values) = runend_encode(parray);
324            // SAFETY: runend_encode handles this
325            unsafe {
326                Ok(Self::new_unchecked(
327                    ends.into_array(),
328                    values,
329                    0,
330                    array.len(),
331                ))
332            }
333        } else {
334            vortex_bail!("REE can only encode primitive arrays")
335        }
336    }
337
338    /// The offset that the `ends` is relative to.
339    ///
340    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
341    #[inline]
342    pub fn offset(&self) -> usize {
343        self.offset
344    }
345
346    /// The encoded "ends" of value runs.
347    ///
348    /// The `i`-th element indicates that there is a run of the same value, beginning
349    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
350    #[inline]
351    pub fn ends(&self) -> &ArrayRef {
352        &self.ends
353    }
354
355    /// The scalar values.
356    ///
357    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
358    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
359    #[inline]
360    pub fn values(&self) -> &ArrayRef {
361        &self.values
362    }
363}
364
365impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
366    fn len(array: &RunEndArray) -> usize {
367        array.length
368    }
369
370    fn dtype(array: &RunEndArray) -> &DType {
371        array.values.dtype()
372    }
373
374    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
375        array.stats_set.to_ref(array.as_ref())
376    }
377
378    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
379        array.ends.array_hash(state, precision);
380        array.values.array_hash(state, precision);
381        array.offset.hash(state);
382        array.length.hash(state);
383    }
384
385    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
386        array.ends.array_eq(&other.ends, precision)
387            && array.values.array_eq(&other.values, precision)
388            && array.offset == other.offset
389            && array.length == other.length
390    }
391}
392
393impl ValidityVTable<RunEndVTable> for RunEndVTable {
394    fn is_valid(array: &RunEndArray, index: usize) -> bool {
395        let physical_idx = array.find_physical_index(index);
396        array.values().is_valid(physical_idx)
397    }
398
399    fn all_valid(array: &RunEndArray) -> bool {
400        array.values().all_valid()
401    }
402
403    fn all_invalid(array: &RunEndArray) -> bool {
404        array.values().all_invalid()
405    }
406
407    fn validity_mask(array: &RunEndArray) -> Mask {
408        match array.values().validity_mask() {
409            Mask::AllTrue(_) => Mask::AllTrue(array.len()),
410            Mask::AllFalse(_) => Mask::AllFalse(array.len()),
411            Mask::Values(values) => {
412                // SAFETY: we preserve ends from an existing validated RunEndArray.
413                //  Validity is checked on construction to have the correct len.
414                let ree_validity = unsafe {
415                    RunEndArray::new_unchecked(
416                        array.ends().clone(),
417                        values.into_array(),
418                        array.offset(),
419                        array.len(),
420                    )
421                    .into_array()
422                };
423                Mask::from_buffer(ree_validity.to_bool().bit_buffer().clone())
424            }
425        }
426    }
427}
428
429impl CanonicalVTable<RunEndVTable> for RunEndVTable {
430    fn canonicalize(array: &RunEndArray) -> Canonical {
431        let pends = array.ends().to_primitive();
432        match array.dtype() {
433            DType::Bool(_) => {
434                let bools = array.values().to_bool();
435                Canonical::Bool(runend_decode_bools(
436                    pends,
437                    bools,
438                    array.offset(),
439                    array.len(),
440                ))
441            }
442            DType::Primitive(..) => {
443                let pvalues = array.values().to_primitive();
444                Canonical::Primitive(runend_decode_primitive(
445                    pends,
446                    pvalues,
447                    array.offset(),
448                    array.len(),
449                ))
450            }
451            _ => vortex_panic!("Only Primitive and Bool values are supported"),
452        }
453    }
454}
455
456#[cfg(test)]
457mod tests {
458    use vortex_array::IntoArray;
459    use vortex_array::assert_arrays_eq;
460    use vortex_buffer::buffer;
461    use vortex_dtype::DType;
462    use vortex_dtype::Nullability;
463    use vortex_dtype::PType;
464
465    use crate::RunEndArray;
466
467    #[test]
468    fn test_runend_constructor() {
469        let arr = RunEndArray::new(
470            buffer![2u32, 5, 10].into_array(),
471            buffer![1i32, 2, 3].into_array(),
472        );
473        assert_eq!(arr.len(), 10);
474        assert_eq!(
475            arr.dtype(),
476            &DType::Primitive(PType::I32, Nullability::NonNullable)
477        );
478
479        // 0, 1 => 1
480        // 2, 3, 4 => 2
481        // 5, 6, 7, 8, 9 => 3
482        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
483        assert_arrays_eq!(arr.to_array(), expected);
484    }
485}