Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::ExecutionCtx;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::arrays::PrimitiveVTable;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::dtype::DType;
20use vortex_array::dtype::Nullability;
21use vortex_array::dtype::PType;
22use vortex_array::scalar::PValue;
23use vortex_array::search_sorted::SearchSorted;
24use vortex_array::search_sorted::SearchSortedSide;
25use vortex_array::serde::ArrayChildren;
26use vortex_array::stats::ArrayStats;
27use vortex_array::stats::StatsSetRef;
28use vortex_array::validity::Validity;
29use vortex_array::vtable;
30use vortex_array::vtable::ArrayId;
31use vortex_array::vtable::VTable;
32use vortex_array::vtable::ValidityVTable;
33use vortex_error::VortexExpect as _;
34use vortex_error::VortexResult;
35use vortex_error::vortex_bail;
36use vortex_error::vortex_ensure;
37use vortex_error::vortex_panic;
38use vortex_session::VortexSession;
39
40use crate::compress::runend_decode_bools;
41use crate::compress::runend_decode_primitive;
42use crate::compress::runend_encode;
43use crate::kernel::PARENT_KERNELS;
44use crate::rules::RULES;
45
46vtable!(RunEnd);
47
48#[derive(Clone, prost::Message)]
49pub struct RunEndMetadata {
50    #[prost(enumeration = "PType", tag = "1")]
51    pub ends_ptype: i32,
52    #[prost(uint64, tag = "2")]
53    pub num_runs: u64,
54    #[prost(uint64, tag = "3")]
55    pub offset: u64,
56}
57
58impl VTable for RunEndVTable {
59    type Array = RunEndArray;
60
61    type Metadata = ProstMetadata<RunEndMetadata>;
62    type OperationsVTable = Self;
63    type ValidityVTable = Self;
64
65    fn id(_array: &Self::Array) -> ArrayId {
66        Self::ID
67    }
68
69    fn len(array: &RunEndArray) -> usize {
70        array.length
71    }
72
73    fn dtype(array: &RunEndArray) -> &DType {
74        array.values.dtype()
75    }
76
77    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
78        array.stats_set.to_ref(array.as_ref())
79    }
80
81    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
82        array.ends.array_hash(state, precision);
83        array.values.array_hash(state, precision);
84        array.offset.hash(state);
85        array.length.hash(state);
86    }
87
88    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
89        array.ends.array_eq(&other.ends, precision)
90            && array.values.array_eq(&other.values, precision)
91            && array.offset == other.offset
92            && array.length == other.length
93    }
94
95    fn nbuffers(_array: &RunEndArray) -> usize {
96        0
97    }
98
99    fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
100        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
101    }
102
103    fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
104        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
105    }
106
107    fn nchildren(_array: &RunEndArray) -> usize {
108        2
109    }
110
111    fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
112        match idx {
113            0 => array.ends().clone(),
114            1 => array.values().clone(),
115            _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
116        }
117    }
118
119    fn child_name(_array: &RunEndArray, idx: usize) -> String {
120        match idx {
121            0 => "ends".to_string(),
122            1 => "values".to_string(),
123            _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
124        }
125    }
126
127    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
128        Ok(ProstMetadata(RunEndMetadata {
129            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
130                as i32,
131            num_runs: array.ends().len() as u64,
132            offset: array.offset() as u64,
133        }))
134    }
135
136    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
137        Ok(Some(metadata.serialize()))
138    }
139
140    fn deserialize(
141        bytes: &[u8],
142        _dtype: &DType,
143        _len: usize,
144        _buffers: &[BufferHandle],
145        _session: &VortexSession,
146    ) -> VortexResult<Self::Metadata> {
147        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
148        Ok(ProstMetadata(inner))
149    }
150
151    fn build(
152        dtype: &DType,
153        len: usize,
154        metadata: &Self::Metadata,
155        _buffers: &[BufferHandle],
156        children: &dyn ArrayChildren,
157    ) -> VortexResult<RunEndArray> {
158        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
159        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
160        let ends = children.get(0, &ends_dtype, runs)?;
161
162        let values = children.get(1, dtype, runs)?;
163
164        RunEndArray::try_new_offset_length(
165            ends,
166            values,
167            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
168            len,
169        )
170    }
171
172    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
173        vortex_ensure!(
174            children.len() == 2,
175            "RunEndArray expects 2 children, got {}",
176            children.len()
177        );
178
179        let mut children_iter = children.into_iter();
180        array.ends = children_iter.next().vortex_expect("ends child");
181        array.values = children_iter.next().vortex_expect("values child");
182
183        Ok(())
184    }
185
186    fn reduce_parent(
187        array: &Self::Array,
188        parent: &ArrayRef,
189        child_idx: usize,
190    ) -> VortexResult<Option<ArrayRef>> {
191        RULES.evaluate(array, parent, child_idx)
192    }
193
194    fn execute_parent(
195        array: &Self::Array,
196        parent: &ArrayRef,
197        child_idx: usize,
198        ctx: &mut ExecutionCtx,
199    ) -> VortexResult<Option<ArrayRef>> {
200        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
201    }
202
203    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
204        run_end_canonicalize(array, ctx)
205    }
206}
207
208#[derive(Clone, Debug)]
209pub struct RunEndArray {
210    ends: ArrayRef,
211    values: ArrayRef,
212    offset: usize,
213    length: usize,
214    stats_set: ArrayStats,
215}
216
217pub struct RunEndArrayParts {
218    pub ends: ArrayRef,
219    pub values: ArrayRef,
220}
221
222#[derive(Debug)]
223pub struct RunEndVTable;
224
225impl RunEndVTable {
226    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
227}
228
229impl RunEndArray {
230    fn validate(
231        ends: &ArrayRef,
232        values: &ArrayRef,
233        offset: usize,
234        length: usize,
235    ) -> VortexResult<()> {
236        // DType validation
237        vortex_ensure!(
238            ends.dtype().is_unsigned_int(),
239            "run ends must be unsigned integers, was {}",
240            ends.dtype(),
241        );
242        vortex_ensure!(
243            values.dtype().is_primitive() || values.dtype().is_boolean(),
244            "RunEnd array can only have Bool or Primitive values, {} given",
245            values.dtype()
246        );
247
248        vortex_ensure!(
249            ends.len() == values.len(),
250            "run ends len != run values len, {} != {}",
251            ends.len(),
252            values.len()
253        );
254
255        // Handle empty run-ends
256        if ends.is_empty() {
257            vortex_ensure!(
258                offset == 0,
259                "non-zero offset provided for empty RunEndArray"
260            );
261            return Ok(());
262        }
263
264        // Avoid building a non-empty array with zero logical length.
265        if length == 0 {
266            vortex_ensure!(
267                ends.is_empty(),
268                "run ends must be empty when length is zero"
269            );
270            return Ok(());
271        }
272
273        debug_assert!({
274            // Run ends must be strictly sorted for binary search to work correctly.
275            let pre_validation = ends.statistics().to_owned();
276
277            let is_sorted = ends
278                .statistics()
279                .compute_is_strict_sorted()
280                .unwrap_or(false);
281
282            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
283            // We don't want to run with different stats in debug mode and outside.
284            ends.statistics().inherit(pre_validation.iter());
285            is_sorted
286        });
287
288        // Skip host-only validation when ends are not host-resident.
289        if !ends.is_host() {
290            return Ok(());
291        }
292
293        // Validate the offset and length are valid for the given ends and values
294        if offset != 0 && length != 0 {
295            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
296            if first_run_end <= offset {
297                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
298            }
299        }
300
301        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
302        let min_required_end = offset + length;
303        if last_run_end < min_required_end {
304            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
305        }
306
307        Ok(())
308    }
309}
310
311impl RunEndArray {
312    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
313    ///
314    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
315    /// not satisfied.
316    ///
317    /// # Examples
318    ///
319    /// ```
320    /// # use vortex_array::arrays::BoolArray;
321    /// # use vortex_array::IntoArray;
322    /// # use vortex_buffer::buffer;
323    /// # use vortex_error::VortexResult;
324    /// # use vortex_runend::RunEndArray;
325    /// # fn main() -> VortexResult<()> {
326    /// let ends = buffer![2u8, 3u8].into_array();
327    /// let values = BoolArray::from_iter([false, true]).into_array();
328    /// let run_end = RunEndArray::new(ends, values);
329    ///
330    /// // Array encodes
331    /// assert_eq!(run_end.scalar_at(0)?, false.into());
332    /// assert_eq!(run_end.scalar_at(1)?, false.into());
333    /// assert_eq!(run_end.scalar_at(2)?, true.into());
334    /// # Ok(())
335    /// # }
336    /// ```
337    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
338        Self::try_new(ends, values).vortex_expect("RunEndArray new")
339    }
340
341    /// Build a new `RunEndArray` from components.
342    ///
343    /// # Validation
344    ///
345    /// The `ends` must be non-nullable unsigned integers. The values may be `Bool` or `Primitive`
346    /// types.
347    ///
348    /// # Examples
349    ///
350    /// ```
351    /// # use vortex_array::arrays::{BoolArray, VarBinViewArray};
352    /// # use vortex_array::IntoArray;
353    /// # use vortex_buffer::buffer;
354    /// # use vortex_runend::RunEndArray;
355    ///
356    /// // Error to provide incorrectly-typed values!
357    /// let result = RunEndArray::try_new(
358    ///     buffer![1u8, 2u8].into_array(),
359    ///     VarBinViewArray::from_iter_str(["bad", "dtype"]).into_array(),
360    /// );
361    /// assert!(result.is_err());
362    ///
363    /// // This array is happy
364    /// let result = RunEndArray::try_new(
365    ///     buffer![1u8, 2u8].into_array(),
366    ///     BoolArray::from_iter([false, true]).into_array(),
367    /// );
368    ///
369    /// assert!(result.is_ok());
370    /// ```
371    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
372        let length: usize = if ends.is_empty() {
373            0
374        } else {
375            usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
376        };
377
378        Self::try_new_offset_length(ends, values, 0, length)
379    }
380
381    /// Construct a new sliced `RunEndArray` with the provided offset and length.
382    ///
383    /// This performs all the same validation as [`RunEndArray::try_new`].
384    pub fn try_new_offset_length(
385        ends: ArrayRef,
386        values: ArrayRef,
387        offset: usize,
388        length: usize,
389    ) -> VortexResult<Self> {
390        Self::validate(&ends, &values, offset, length)?;
391
392        Ok(Self {
393            ends,
394            values,
395            offset,
396            length,
397            stats_set: Default::default(),
398        })
399    }
400
401    /// Build a new `RunEndArray` without validation.
402    ///
403    /// # Safety
404    ///
405    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
406    /// satisfied before calling this function.
407    ///
408    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
409    pub unsafe fn new_unchecked(
410        ends: ArrayRef,
411        values: ArrayRef,
412        offset: usize,
413        length: usize,
414    ) -> Self {
415        Self {
416            ends,
417            values,
418            offset,
419            length,
420            stats_set: Default::default(),
421        }
422    }
423
424    /// Convert the given logical index to an index into the `values` array
425    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
426        Ok(self
427            .ends()
428            .as_primitive_typed()
429            .search_sorted(
430                &PValue::from(index + self.offset()),
431                SearchSortedSide::Right,
432            )?
433            .to_ends_index(self.ends().len()))
434    }
435
436    /// Run the array through run-end encoding.
437    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
438        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
439            let (ends, values) = runend_encode(parray);
440            // SAFETY: runend_encode handles this
441            unsafe {
442                Ok(Self::new_unchecked(
443                    ends.into_array(),
444                    values,
445                    0,
446                    array.len(),
447                ))
448            }
449        } else {
450            vortex_bail!("REE can only encode primitive arrays")
451        }
452    }
453
454    /// The offset that the `ends` is relative to.
455    ///
456    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
457    #[inline]
458    pub fn offset(&self) -> usize {
459        self.offset
460    }
461
462    /// The encoded "ends" of value runs.
463    ///
464    /// The `i`-th element indicates that there is a run of the same value, beginning
465    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
466    #[inline]
467    pub fn ends(&self) -> &ArrayRef {
468        &self.ends
469    }
470
471    /// The scalar values.
472    ///
473    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
474    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
475    #[inline]
476    pub fn values(&self) -> &ArrayRef {
477        &self.values
478    }
479
480    /// Split an `RunEndArray` into parts.
481    #[inline]
482    pub fn into_parts(self) -> RunEndArrayParts {
483        RunEndArrayParts {
484            ends: self.ends,
485            values: self.values,
486        }
487    }
488}
489
490impl ValidityVTable<RunEndVTable> for RunEndVTable {
491    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
492        Ok(match array.values().validity()? {
493            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
494            Validity::AllInvalid => Validity::AllInvalid,
495            Validity::Array(values_validity) => Validity::Array(unsafe {
496                RunEndArray::new_unchecked(
497                    array.ends().clone(),
498                    values_validity,
499                    array.offset(),
500                    array.len(),
501                )
502                .into_array()
503            }),
504        })
505    }
506}
507
508pub(super) fn run_end_canonicalize(
509    array: &RunEndArray,
510    ctx: &mut ExecutionCtx,
511) -> VortexResult<ArrayRef> {
512    let pends = array.ends().clone().execute_as("ends", ctx)?;
513    Ok(match array.dtype() {
514        DType::Bool(_) => {
515            let bools = array.values().clone().execute_as("values", ctx)?;
516            runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
517        }
518        DType::Primitive(..) => {
519            let pvalues = array.values().clone().execute_as("values", ctx)?;
520            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
521        }
522        _ => vortex_panic!("Only Primitive and Bool values are supported"),
523    })
524}
525
526#[cfg(test)]
527mod tests {
528    use vortex_array::IntoArray;
529    use vortex_array::assert_arrays_eq;
530    use vortex_array::dtype::DType;
531    use vortex_array::dtype::Nullability;
532    use vortex_array::dtype::PType;
533    use vortex_buffer::buffer;
534
535    use crate::RunEndArray;
536
537    #[test]
538    fn test_runend_constructor() {
539        let arr = RunEndArray::new(
540            buffer![2u32, 5, 10].into_array(),
541            buffer![1i32, 2, 3].into_array(),
542        );
543        assert_eq!(arr.len(), 10);
544        assert_eq!(
545            arr.dtype(),
546            &DType::Primitive(PType::I32, Nullability::NonNullable)
547        );
548
549        // 0, 1 => 1
550        // 2, 3, 4 => 2
551        // 5, 6, 7, 8, 9 => 3
552        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
553        assert_arrays_eq!(arr.to_array(), expected);
554    }
555}