Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::DynArray;
13use vortex_array::ExecutionCtx;
14use vortex_array::ExecutionResult;
15use vortex_array::IntoArray;
16use vortex_array::Precision;
17use vortex_array::ProstMetadata;
18use vortex_array::SerializeMetadata;
19use vortex_array::arrays::Primitive;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::Nullability;
24use vortex_array::dtype::PType;
25use vortex_array::scalar::PValue;
26use vortex_array::search_sorted::SearchSorted;
27use vortex_array::search_sorted::SearchSortedSide;
28use vortex_array::serde::ArrayChildren;
29use vortex_array::stats::ArrayStats;
30use vortex_array::stats::StatsSetRef;
31use vortex_array::validity::Validity;
32use vortex_array::vtable;
33use vortex_array::vtable::ArrayId;
34use vortex_array::vtable::VTable;
35use vortex_array::vtable::ValidityVTable;
36use vortex_error::VortexExpect as _;
37use vortex_error::VortexResult;
38use vortex_error::vortex_bail;
39use vortex_error::vortex_ensure;
40use vortex_error::vortex_panic;
41use vortex_session::VortexSession;
42
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::decompress_bool::runend_decode_bools;
47use crate::kernel::PARENT_KERNELS;
48use crate::rules::RULES;
49
50vtable!(RunEnd);
51
52#[derive(Clone, prost::Message)]
53pub struct RunEndMetadata {
54    #[prost(enumeration = "PType", tag = "1")]
55    pub ends_ptype: i32,
56    #[prost(uint64, tag = "2")]
57    pub num_runs: u64,
58    #[prost(uint64, tag = "3")]
59    pub offset: u64,
60}
61
62impl VTable for RunEnd {
63    type Array = RunEndArray;
64
65    type Metadata = ProstMetadata<RunEndMetadata>;
66    type OperationsVTable = Self;
67    type ValidityVTable = Self;
68
69    fn vtable(_array: &Self::Array) -> &Self {
70        &RunEnd
71    }
72
73    fn id(&self) -> ArrayId {
74        Self::ID
75    }
76
77    fn len(array: &RunEndArray) -> usize {
78        array.length
79    }
80
81    fn dtype(array: &RunEndArray) -> &DType {
82        array.values.dtype()
83    }
84
85    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
86        array.stats_set.to_ref(array.as_ref())
87    }
88
89    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
90        array.ends.array_hash(state, precision);
91        array.values.array_hash(state, precision);
92        array.offset.hash(state);
93        array.length.hash(state);
94    }
95
96    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
97        array.ends.array_eq(&other.ends, precision)
98            && array.values.array_eq(&other.values, precision)
99            && array.offset == other.offset
100            && array.length == other.length
101    }
102
103    fn nbuffers(_array: &RunEndArray) -> usize {
104        0
105    }
106
107    fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
108        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
109    }
110
111    fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
112        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
113    }
114
115    fn nchildren(_array: &RunEndArray) -> usize {
116        2
117    }
118
119    fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
120        match idx {
121            0 => array.ends().clone(),
122            1 => array.values().clone(),
123            _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
124        }
125    }
126
127    fn child_name(_array: &RunEndArray, idx: usize) -> String {
128        match idx {
129            0 => "ends".to_string(),
130            1 => "values".to_string(),
131            _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
132        }
133    }
134
135    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
136        Ok(ProstMetadata(RunEndMetadata {
137            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
138                as i32,
139            num_runs: array.ends().len() as u64,
140            offset: array.offset() as u64,
141        }))
142    }
143
144    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
145        Ok(Some(metadata.serialize()))
146    }
147
148    fn deserialize(
149        bytes: &[u8],
150        _dtype: &DType,
151        _len: usize,
152        _buffers: &[BufferHandle],
153        _session: &VortexSession,
154    ) -> VortexResult<Self::Metadata> {
155        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
156        Ok(ProstMetadata(inner))
157    }
158
159    fn build(
160        dtype: &DType,
161        len: usize,
162        metadata: &Self::Metadata,
163        _buffers: &[BufferHandle],
164        children: &dyn ArrayChildren,
165    ) -> VortexResult<RunEndArray> {
166        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
167        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
168        let ends = children.get(0, &ends_dtype, runs)?;
169
170        let values = children.get(1, dtype, runs)?;
171
172        RunEndArray::try_new_offset_length(
173            ends,
174            values,
175            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
176            len,
177        )
178    }
179
180    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
181        vortex_ensure!(
182            children.len() == 2,
183            "RunEndArray expects 2 children, got {}",
184            children.len()
185        );
186
187        let mut children_iter = children.into_iter();
188        array.ends = children_iter.next().vortex_expect("ends child");
189        array.values = children_iter.next().vortex_expect("values child");
190
191        Ok(())
192    }
193
194    fn reduce_parent(
195        array: &Self::Array,
196        parent: &ArrayRef,
197        child_idx: usize,
198    ) -> VortexResult<Option<ArrayRef>> {
199        RULES.evaluate(array, parent, child_idx)
200    }
201
202    fn execute_parent(
203        array: &Self::Array,
204        parent: &ArrayRef,
205        child_idx: usize,
206        ctx: &mut ExecutionCtx,
207    ) -> VortexResult<Option<ArrayRef>> {
208        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
209    }
210
211    fn execute(array: Arc<Self::Array>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
212        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
213    }
214}
215
216#[derive(Clone, Debug)]
217pub struct RunEndArray {
218    ends: ArrayRef,
219    values: ArrayRef,
220    offset: usize,
221    length: usize,
222    stats_set: ArrayStats,
223}
224
225pub struct RunEndArrayParts {
226    pub ends: ArrayRef,
227    pub values: ArrayRef,
228}
229
230#[derive(Clone, Debug)]
231pub struct RunEnd;
232
233impl RunEnd {
234    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
235}
236
237impl RunEndArray {
238    fn validate(
239        ends: &ArrayRef,
240        values: &ArrayRef,
241        offset: usize,
242        length: usize,
243    ) -> VortexResult<()> {
244        // DType validation
245        vortex_ensure!(
246            ends.dtype().is_unsigned_int(),
247            "run ends must be unsigned integers, was {}",
248            ends.dtype(),
249        );
250        vortex_ensure!(
251            ends.len() == values.len(),
252            "run ends len != run values len, {} != {}",
253            ends.len(),
254            values.len()
255        );
256
257        // Handle empty run-ends
258        if ends.is_empty() {
259            vortex_ensure!(
260                offset == 0,
261                "non-zero offset provided for empty RunEndArray"
262            );
263            return Ok(());
264        }
265
266        // Avoid building a non-empty array with zero logical length.
267        if length == 0 {
268            vortex_ensure!(
269                ends.is_empty(),
270                "run ends must be empty when length is zero"
271            );
272            return Ok(());
273        }
274
275        debug_assert!({
276            // Run ends must be strictly sorted for binary search to work correctly.
277            let pre_validation = ends.statistics().to_owned();
278
279            let is_sorted = ends
280                .statistics()
281                .compute_is_strict_sorted()
282                .unwrap_or(false);
283
284            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
285            // We don't want to run with different stats in debug mode and outside.
286            ends.statistics().inherit(pre_validation.iter());
287            is_sorted
288        });
289
290        // Skip host-only validation when ends are not host-resident.
291        if !ends.is_host() {
292            return Ok(());
293        }
294
295        // Validate the offset and length are valid for the given ends and values
296        if offset != 0 && length != 0 {
297            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
298            if first_run_end <= offset {
299                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
300            }
301        }
302
303        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
304        let min_required_end = offset + length;
305        if last_run_end < min_required_end {
306            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
307        }
308
309        Ok(())
310    }
311}
312
313impl RunEndArray {
314    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
315    ///
316    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
317    /// not satisfied.
318    ///
319    /// # Examples
320    ///
321    /// ```
322    /// # use vortex_array::arrays::BoolArray;
323    /// # use vortex_array::IntoArray;
324    /// # use vortex_buffer::buffer;
325    /// # use vortex_error::VortexResult;
326    /// # use vortex_runend::RunEndArray;
327    /// # fn main() -> VortexResult<()> {
328    /// let ends = buffer![2u8, 3u8].into_array();
329    /// let values = BoolArray::from_iter([false, true]).into_array();
330    /// let run_end = RunEndArray::new(ends, values);
331    ///
332    /// // Array encodes
333    /// assert_eq!(run_end.scalar_at(0)?, false.into());
334    /// assert_eq!(run_end.scalar_at(1)?, false.into());
335    /// assert_eq!(run_end.scalar_at(2)?, true.into());
336    /// # Ok(())
337    /// # }
338    /// ```
339    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
340        Self::try_new(ends, values).vortex_expect("RunEndArray new")
341    }
342
343    /// Build a new `RunEndArray` from components.
344    ///
345    /// # Validation
346    ///
347    /// The `ends` must be non-nullable unsigned integers.
348    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
349        let length: usize = if ends.is_empty() {
350            0
351        } else {
352            usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
353        };
354
355        Self::try_new_offset_length(ends, values, 0, length)
356    }
357
358    /// Construct a new sliced `RunEndArray` with the provided offset and length.
359    ///
360    /// This performs all the same validation as [`RunEndArray::try_new`].
361    pub fn try_new_offset_length(
362        ends: ArrayRef,
363        values: ArrayRef,
364        offset: usize,
365        length: usize,
366    ) -> VortexResult<Self> {
367        Self::validate(&ends, &values, offset, length)?;
368
369        Ok(Self {
370            ends,
371            values,
372            offset,
373            length,
374            stats_set: Default::default(),
375        })
376    }
377
378    /// Build a new `RunEndArray` without validation.
379    ///
380    /// # Safety
381    ///
382    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
383    /// satisfied before calling this function.
384    ///
385    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
386    pub unsafe fn new_unchecked(
387        ends: ArrayRef,
388        values: ArrayRef,
389        offset: usize,
390        length: usize,
391    ) -> Self {
392        Self {
393            ends,
394            values,
395            offset,
396            length,
397            stats_set: Default::default(),
398        }
399    }
400
401    /// Convert the given logical index to an index into the `values` array
402    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
403        Ok(self
404            .ends()
405            .as_primitive_typed()
406            .search_sorted(
407                &PValue::from(index + self.offset()),
408                SearchSortedSide::Right,
409            )?
410            .to_ends_index(self.ends().len()))
411    }
412
413    /// Run the array through run-end encoding.
414    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
415        if let Some(parray) = array.as_opt::<Primitive>() {
416            let (ends, values) = runend_encode(parray);
417            // SAFETY: runend_encode handles this
418            unsafe {
419                Ok(Self::new_unchecked(
420                    ends.into_array(),
421                    values,
422                    0,
423                    array.len(),
424                ))
425            }
426        } else {
427            vortex_bail!("REE can only encode primitive arrays")
428        }
429    }
430
431    /// The offset that the `ends` is relative to.
432    ///
433    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
434    #[inline]
435    pub fn offset(&self) -> usize {
436        self.offset
437    }
438
439    /// The encoded "ends" of value runs.
440    ///
441    /// The `i`-th element indicates that there is a run of the same value, beginning
442    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
443    #[inline]
444    pub fn ends(&self) -> &ArrayRef {
445        &self.ends
446    }
447
448    /// The scalar values.
449    ///
450    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
451    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
452    #[inline]
453    pub fn values(&self) -> &ArrayRef {
454        &self.values
455    }
456
457    /// Split an `RunEndArray` into parts.
458    #[inline]
459    pub fn into_parts(self) -> RunEndArrayParts {
460        RunEndArrayParts {
461            ends: self.ends,
462            values: self.values,
463        }
464    }
465}
466
467impl ValidityVTable<RunEnd> for RunEnd {
468    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
469        Ok(match array.values().validity()? {
470            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
471            Validity::AllInvalid => Validity::AllInvalid,
472            Validity::Array(values_validity) => Validity::Array(unsafe {
473                RunEndArray::new_unchecked(
474                    array.ends().clone(),
475                    values_validity,
476                    array.offset(),
477                    array.len(),
478                )
479                .into_array()
480            }),
481        })
482    }
483}
484
485pub(super) fn run_end_canonicalize(
486    array: &RunEndArray,
487    ctx: &mut ExecutionCtx,
488) -> VortexResult<ArrayRef> {
489    let pends = array.ends().clone().execute_as("ends", ctx)?;
490
491    Ok(match array.dtype() {
492        DType::Bool(_) => {
493            let bools = array.values().clone().execute_as("values", ctx)?;
494            runend_decode_bools(pends, bools, array.offset(), array.len())?
495        }
496        DType::Primitive(..) => {
497            let pvalues = array.values().clone().execute_as("values", ctx)?;
498            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
499        }
500        DType::Utf8(_) | DType::Binary(_) => {
501            let values = array
502                .values()
503                .clone()
504                .execute_as::<VarBinViewArray>("values", ctx)?;
505            runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
506        }
507        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
508    })
509}
510
511#[cfg(test)]
512mod tests {
513    use vortex_array::IntoArray;
514    use vortex_array::arrays::DictArray;
515    use vortex_array::arrays::VarBinViewArray;
516    use vortex_array::assert_arrays_eq;
517    use vortex_array::dtype::DType;
518    use vortex_array::dtype::Nullability;
519    use vortex_array::dtype::PType;
520    use vortex_buffer::buffer;
521
522    use crate::RunEndArray;
523
524    #[test]
525    fn test_runend_constructor() {
526        let arr = RunEndArray::new(
527            buffer![2u32, 5, 10].into_array(),
528            buffer![1i32, 2, 3].into_array(),
529        );
530        assert_eq!(arr.len(), 10);
531        assert_eq!(
532            arr.dtype(),
533            &DType::Primitive(PType::I32, Nullability::NonNullable)
534        );
535
536        // 0, 1 => 1
537        // 2, 3, 4 => 2
538        // 5, 6, 7, 8, 9 => 3
539        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
540        assert_arrays_eq!(arr.into_array(), expected);
541    }
542
543    #[test]
544    fn test_runend_utf8() {
545        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
546        let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
547        assert_eq!(arr.len(), 10);
548        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
549
550        let expected =
551            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
552                .into_array();
553        assert_arrays_eq!(arr.into_array(), expected);
554    }
555
556    #[test]
557    fn test_runend_dict() {
558        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
559        let dict_codes = buffer![0u32, 1, 2].into_array();
560        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
561
562        let arr =
563            RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
564        assert_eq!(arr.len(), 10);
565
566        let expected =
567            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
568                .into_array();
569        assert_arrays_eq!(arr.into_array(), expected);
570    }
571}