Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::DynArray;
13use vortex_array::ExecutionCtx;
14use vortex_array::ExecutionResult;
15use vortex_array::IntoArray;
16use vortex_array::Precision;
17use vortex_array::ProstMetadata;
18use vortex_array::SerializeMetadata;
19use vortex_array::arrays::Primitive;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::Nullability;
24use vortex_array::dtype::PType;
25use vortex_array::scalar::PValue;
26use vortex_array::search_sorted::SearchSorted;
27use vortex_array::search_sorted::SearchSortedSide;
28use vortex_array::serde::ArrayChildren;
29use vortex_array::stats::ArrayStats;
30use vortex_array::stats::StatsSetRef;
31use vortex_array::validity::Validity;
32use vortex_array::vtable;
33use vortex_array::vtable::Array;
34use vortex_array::vtable::ArrayId;
35use vortex_array::vtable::VTable;
36use vortex_array::vtable::ValidityVTable;
37use vortex_error::VortexExpect as _;
38use vortex_error::VortexResult;
39use vortex_error::vortex_bail;
40use vortex_error::vortex_ensure;
41use vortex_error::vortex_panic;
42use vortex_session::VortexSession;
43
44use crate::compress::runend_decode_primitive;
45use crate::compress::runend_decode_varbinview;
46use crate::compress::runend_encode;
47use crate::decompress_bool::runend_decode_bools;
48use crate::kernel::PARENT_KERNELS;
49use crate::rules::RULES;
50
51vtable!(RunEnd);
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55    #[prost(enumeration = "PType", tag = "1")]
56    pub ends_ptype: i32,
57    #[prost(uint64, tag = "2")]
58    pub num_runs: u64,
59    #[prost(uint64, tag = "3")]
60    pub offset: u64,
61}
62
63impl VTable for RunEnd {
64    type Array = RunEndArray;
65
66    type Metadata = ProstMetadata<RunEndMetadata>;
67    type OperationsVTable = Self;
68    type ValidityVTable = Self;
69
70    fn vtable(_array: &Self::Array) -> &Self {
71        &RunEnd
72    }
73
74    fn id(&self) -> ArrayId {
75        Self::ID
76    }
77
78    fn len(array: &RunEndArray) -> usize {
79        array.length
80    }
81
82    fn dtype(array: &RunEndArray) -> &DType {
83        array.values.dtype()
84    }
85
86    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
87        array.stats_set.to_ref(array.as_ref())
88    }
89
90    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
91        array.ends.array_hash(state, precision);
92        array.values.array_hash(state, precision);
93        array.offset.hash(state);
94        array.length.hash(state);
95    }
96
97    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
98        array.ends.array_eq(&other.ends, precision)
99            && array.values.array_eq(&other.values, precision)
100            && array.offset == other.offset
101            && array.length == other.length
102    }
103
104    fn nbuffers(_array: &RunEndArray) -> usize {
105        0
106    }
107
108    fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
109        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
110    }
111
112    fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
113        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
114    }
115
116    fn nchildren(_array: &RunEndArray) -> usize {
117        2
118    }
119
120    fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
121        match idx {
122            0 => array.ends().clone(),
123            1 => array.values().clone(),
124            _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
125        }
126    }
127
128    fn child_name(_array: &RunEndArray, idx: usize) -> String {
129        match idx {
130            0 => "ends".to_string(),
131            1 => "values".to_string(),
132            _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
133        }
134    }
135
136    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
137        Ok(ProstMetadata(RunEndMetadata {
138            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
139                as i32,
140            num_runs: array.ends().len() as u64,
141            offset: array.offset() as u64,
142        }))
143    }
144
145    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
146        Ok(Some(metadata.serialize()))
147    }
148
149    fn deserialize(
150        bytes: &[u8],
151        _dtype: &DType,
152        _len: usize,
153        _buffers: &[BufferHandle],
154        _session: &VortexSession,
155    ) -> VortexResult<Self::Metadata> {
156        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
157        Ok(ProstMetadata(inner))
158    }
159
160    fn build(
161        dtype: &DType,
162        len: usize,
163        metadata: &Self::Metadata,
164        _buffers: &[BufferHandle],
165        children: &dyn ArrayChildren,
166    ) -> VortexResult<RunEndArray> {
167        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
168        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
169        let ends = children.get(0, &ends_dtype, runs)?;
170
171        let values = children.get(1, dtype, runs)?;
172
173        RunEndArray::try_new_offset_length(
174            ends,
175            values,
176            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
177            len,
178        )
179    }
180
181    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
182        vortex_ensure!(
183            children.len() == 2,
184            "RunEndArray expects 2 children, got {}",
185            children.len()
186        );
187
188        let mut children_iter = children.into_iter();
189        array.ends = children_iter.next().vortex_expect("ends child");
190        array.values = children_iter.next().vortex_expect("values child");
191
192        Ok(())
193    }
194
195    fn reduce_parent(
196        array: &Array<Self>,
197        parent: &ArrayRef,
198        child_idx: usize,
199    ) -> VortexResult<Option<ArrayRef>> {
200        RULES.evaluate(array, parent, child_idx)
201    }
202
203    fn execute_parent(
204        array: &Array<Self>,
205        parent: &ArrayRef,
206        child_idx: usize,
207        ctx: &mut ExecutionCtx,
208    ) -> VortexResult<Option<ArrayRef>> {
209        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
210    }
211
212    fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
213        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
214    }
215}
216
217#[derive(Clone, Debug)]
218pub struct RunEndArray {
219    ends: ArrayRef,
220    values: ArrayRef,
221    offset: usize,
222    length: usize,
223    stats_set: ArrayStats,
224}
225
226pub struct RunEndArrayParts {
227    pub ends: ArrayRef,
228    pub values: ArrayRef,
229}
230
231#[derive(Clone, Debug)]
232pub struct RunEnd;
233
234impl RunEnd {
235    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
236}
237
238impl RunEndArray {
239    fn validate(
240        ends: &ArrayRef,
241        values: &ArrayRef,
242        offset: usize,
243        length: usize,
244    ) -> VortexResult<()> {
245        // DType validation
246        vortex_ensure!(
247            ends.dtype().is_unsigned_int(),
248            "run ends must be unsigned integers, was {}",
249            ends.dtype(),
250        );
251        vortex_ensure!(
252            ends.len() == values.len(),
253            "run ends len != run values len, {} != {}",
254            ends.len(),
255            values.len()
256        );
257
258        // Handle empty run-ends
259        if ends.is_empty() {
260            vortex_ensure!(
261                offset == 0,
262                "non-zero offset provided for empty RunEndArray"
263            );
264            return Ok(());
265        }
266
267        // Avoid building a non-empty array with zero logical length.
268        if length == 0 {
269            vortex_ensure!(
270                ends.is_empty(),
271                "run ends must be empty when length is zero"
272            );
273            return Ok(());
274        }
275
276        debug_assert!({
277            // Run ends must be strictly sorted for binary search to work correctly.
278            let pre_validation = ends.statistics().to_owned();
279
280            let is_sorted = ends
281                .statistics()
282                .compute_is_strict_sorted()
283                .unwrap_or(false);
284
285            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
286            // We don't want to run with different stats in debug mode and outside.
287            ends.statistics().inherit(pre_validation.iter());
288            is_sorted
289        });
290
291        // Skip host-only validation when ends are not host-resident.
292        if !ends.is_host() {
293            return Ok(());
294        }
295
296        // Validate the offset and length are valid for the given ends and values
297        if offset != 0 && length != 0 {
298            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
299            if first_run_end <= offset {
300                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
301            }
302        }
303
304        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
305        let min_required_end = offset + length;
306        if last_run_end < min_required_end {
307            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
308        }
309
310        Ok(())
311    }
312}
313
314impl RunEndArray {
315    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
316    ///
317    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
318    /// not satisfied.
319    ///
320    /// # Examples
321    ///
322    /// ```
323    /// # use vortex_array::arrays::BoolArray;
324    /// # use vortex_array::IntoArray;
325    /// # use vortex_buffer::buffer;
326    /// # use vortex_error::VortexResult;
327    /// # use vortex_runend::RunEndArray;
328    /// # fn main() -> VortexResult<()> {
329    /// let ends = buffer![2u8, 3u8].into_array();
330    /// let values = BoolArray::from_iter([false, true]).into_array();
331    /// let run_end = RunEndArray::new(ends, values);
332    ///
333    /// // Array encodes
334    /// assert_eq!(run_end.scalar_at(0)?, false.into());
335    /// assert_eq!(run_end.scalar_at(1)?, false.into());
336    /// assert_eq!(run_end.scalar_at(2)?, true.into());
337    /// # Ok(())
338    /// # }
339    /// ```
340    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
341        Self::try_new(ends, values).vortex_expect("RunEndArray new")
342    }
343
344    /// Build a new `RunEndArray` from components.
345    ///
346    /// # Validation
347    ///
348    /// The `ends` must be non-nullable unsigned integers.
349    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
350        let length: usize = if ends.is_empty() {
351            0
352        } else {
353            usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
354        };
355
356        Self::try_new_offset_length(ends, values, 0, length)
357    }
358
359    /// Construct a new sliced `RunEndArray` with the provided offset and length.
360    ///
361    /// This performs all the same validation as [`RunEndArray::try_new`].
362    pub fn try_new_offset_length(
363        ends: ArrayRef,
364        values: ArrayRef,
365        offset: usize,
366        length: usize,
367    ) -> VortexResult<Self> {
368        Self::validate(&ends, &values, offset, length)?;
369
370        Ok(Self {
371            ends,
372            values,
373            offset,
374            length,
375            stats_set: Default::default(),
376        })
377    }
378
379    /// Build a new `RunEndArray` without validation.
380    ///
381    /// # Safety
382    ///
383    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
384    /// satisfied before calling this function.
385    ///
386    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
387    pub unsafe fn new_unchecked(
388        ends: ArrayRef,
389        values: ArrayRef,
390        offset: usize,
391        length: usize,
392    ) -> Self {
393        Self {
394            ends,
395            values,
396            offset,
397            length,
398            stats_set: Default::default(),
399        }
400    }
401
402    /// Convert the given logical index to an index into the `values` array
403    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
404        Ok(self
405            .ends()
406            .as_primitive_typed()
407            .search_sorted(
408                &PValue::from(index + self.offset()),
409                SearchSortedSide::Right,
410            )?
411            .to_ends_index(self.ends().len()))
412    }
413
414    /// Run the array through run-end encoding.
415    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
416        if let Some(parray) = array.as_opt::<Primitive>() {
417            let (ends, values) = runend_encode(parray);
418            // SAFETY: runend_encode handles this
419            unsafe {
420                Ok(Self::new_unchecked(
421                    ends.into_array(),
422                    values,
423                    0,
424                    array.len(),
425                ))
426            }
427        } else {
428            vortex_bail!("REE can only encode primitive arrays")
429        }
430    }
431
432    /// The offset that the `ends` is relative to.
433    ///
434    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
435    #[inline]
436    pub fn offset(&self) -> usize {
437        self.offset
438    }
439
440    /// The encoded "ends" of value runs.
441    ///
442    /// The `i`-th element indicates that there is a run of the same value, beginning
443    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
444    #[inline]
445    pub fn ends(&self) -> &ArrayRef {
446        &self.ends
447    }
448
449    /// The scalar values.
450    ///
451    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
452    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
453    #[inline]
454    pub fn values(&self) -> &ArrayRef {
455        &self.values
456    }
457
458    /// Split an `RunEndArray` into parts.
459    #[inline]
460    pub fn into_parts(self) -> RunEndArrayParts {
461        RunEndArrayParts {
462            ends: self.ends,
463            values: self.values,
464        }
465    }
466}
467
468impl ValidityVTable<RunEnd> for RunEnd {
469    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
470        Ok(match array.values().validity()? {
471            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
472            Validity::AllInvalid => Validity::AllInvalid,
473            Validity::Array(values_validity) => Validity::Array(unsafe {
474                RunEndArray::new_unchecked(
475                    array.ends().clone(),
476                    values_validity,
477                    array.offset(),
478                    array.len(),
479                )
480                .into_array()
481            }),
482        })
483    }
484}
485
486pub(super) fn run_end_canonicalize(
487    array: &RunEndArray,
488    ctx: &mut ExecutionCtx,
489) -> VortexResult<ArrayRef> {
490    let pends = array.ends().clone().execute_as("ends", ctx)?;
491
492    Ok(match array.dtype() {
493        DType::Bool(_) => {
494            let bools = array.values().clone().execute_as("values", ctx)?;
495            runend_decode_bools(pends, bools, array.offset(), array.len())?
496        }
497        DType::Primitive(..) => {
498            let pvalues = array.values().clone().execute_as("values", ctx)?;
499            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
500        }
501        DType::Utf8(_) | DType::Binary(_) => {
502            let values = array
503                .values()
504                .clone()
505                .execute_as::<VarBinViewArray>("values", ctx)?;
506            runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
507        }
508        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
509    })
510}
511
512#[cfg(test)]
513mod tests {
514    use vortex_array::IntoArray;
515    use vortex_array::arrays::DictArray;
516    use vortex_array::arrays::VarBinViewArray;
517    use vortex_array::assert_arrays_eq;
518    use vortex_array::dtype::DType;
519    use vortex_array::dtype::Nullability;
520    use vortex_array::dtype::PType;
521    use vortex_buffer::buffer;
522
523    use crate::RunEndArray;
524
525    #[test]
526    fn test_runend_constructor() {
527        let arr = RunEndArray::new(
528            buffer![2u32, 5, 10].into_array(),
529            buffer![1i32, 2, 3].into_array(),
530        );
531        assert_eq!(arr.len(), 10);
532        assert_eq!(
533            arr.dtype(),
534            &DType::Primitive(PType::I32, Nullability::NonNullable)
535        );
536
537        // 0, 1 => 1
538        // 2, 3, 4 => 2
539        // 5, 6, 7, 8, 9 => 3
540        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
541        assert_arrays_eq!(arr.into_array(), expected);
542    }
543
544    #[test]
545    fn test_runend_utf8() {
546        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
547        let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
548        assert_eq!(arr.len(), 10);
549        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
550
551        let expected =
552            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
553                .into_array();
554        assert_arrays_eq!(arr.into_array(), expected);
555    }
556
557    #[test]
558    fn test_runend_dict() {
559        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
560        let dict_codes = buffer![0u32, 1, 2].into_array();
561        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
562
563        let arr =
564            RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
565        assert_eq!(arr.len(), 10);
566
567        let expected =
568            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
569                .into_array();
570        assert_arrays_eq!(arr.into_array(), expected);
571    }
572}