Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::ArrayEq;
8use vortex_array::ArrayHash;
9use vortex_array::ArrayRef;
10use vortex_array::DeserializeMetadata;
11use vortex_array::DynArray;
12use vortex_array::ExecutionCtx;
13use vortex_array::ExecutionStep;
14use vortex_array::IntoArray;
15use vortex_array::Precision;
16use vortex_array::ProstMetadata;
17use vortex_array::SerializeMetadata;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::arrays::VarBinViewArray;
20use vortex_array::buffer::BufferHandle;
21use vortex_array::dtype::DType;
22use vortex_array::dtype::Nullability;
23use vortex_array::dtype::PType;
24use vortex_array::scalar::PValue;
25use vortex_array::search_sorted::SearchSorted;
26use vortex_array::search_sorted::SearchSortedSide;
27use vortex_array::serde::ArrayChildren;
28use vortex_array::stats::ArrayStats;
29use vortex_array::stats::StatsSetRef;
30use vortex_array::validity::Validity;
31use vortex_array::vtable;
32use vortex_array::vtable::ArrayId;
33use vortex_array::vtable::VTable;
34use vortex_array::vtable::ValidityVTable;
35use vortex_error::VortexExpect as _;
36use vortex_error::VortexResult;
37use vortex_error::vortex_bail;
38use vortex_error::vortex_ensure;
39use vortex_error::vortex_panic;
40use vortex_session::VortexSession;
41
42use crate::compress::runend_decode_bools;
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::kernel::PARENT_KERNELS;
47use crate::rules::RULES;
48
49vtable!(RunEnd);
50
51#[derive(Clone, prost::Message)]
52pub struct RunEndMetadata {
53    #[prost(enumeration = "PType", tag = "1")]
54    pub ends_ptype: i32,
55    #[prost(uint64, tag = "2")]
56    pub num_runs: u64,
57    #[prost(uint64, tag = "3")]
58    pub offset: u64,
59}
60
61impl VTable for RunEndVTable {
62    type Array = RunEndArray;
63
64    type Metadata = ProstMetadata<RunEndMetadata>;
65    type OperationsVTable = Self;
66    type ValidityVTable = Self;
67
68    fn id(_array: &Self::Array) -> ArrayId {
69        Self::ID
70    }
71
72    fn len(array: &RunEndArray) -> usize {
73        array.length
74    }
75
76    fn dtype(array: &RunEndArray) -> &DType {
77        array.values.dtype()
78    }
79
80    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
81        array.stats_set.to_ref(array.as_ref())
82    }
83
84    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
85        array.ends.array_hash(state, precision);
86        array.values.array_hash(state, precision);
87        array.offset.hash(state);
88        array.length.hash(state);
89    }
90
91    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
92        array.ends.array_eq(&other.ends, precision)
93            && array.values.array_eq(&other.values, precision)
94            && array.offset == other.offset
95            && array.length == other.length
96    }
97
98    fn nbuffers(_array: &RunEndArray) -> usize {
99        0
100    }
101
102    fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
103        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
104    }
105
106    fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
107        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
108    }
109
110    fn nchildren(_array: &RunEndArray) -> usize {
111        2
112    }
113
114    fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
115        match idx {
116            0 => array.ends().clone(),
117            1 => array.values().clone(),
118            _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
119        }
120    }
121
122    fn child_name(_array: &RunEndArray, idx: usize) -> String {
123        match idx {
124            0 => "ends".to_string(),
125            1 => "values".to_string(),
126            _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
127        }
128    }
129
130    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
131        Ok(ProstMetadata(RunEndMetadata {
132            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
133                as i32,
134            num_runs: array.ends().len() as u64,
135            offset: array.offset() as u64,
136        }))
137    }
138
139    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
140        Ok(Some(metadata.serialize()))
141    }
142
143    fn deserialize(
144        bytes: &[u8],
145        _dtype: &DType,
146        _len: usize,
147        _buffers: &[BufferHandle],
148        _session: &VortexSession,
149    ) -> VortexResult<Self::Metadata> {
150        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
151        Ok(ProstMetadata(inner))
152    }
153
154    fn build(
155        dtype: &DType,
156        len: usize,
157        metadata: &Self::Metadata,
158        _buffers: &[BufferHandle],
159        children: &dyn ArrayChildren,
160    ) -> VortexResult<RunEndArray> {
161        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
162        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
163        let ends = children.get(0, &ends_dtype, runs)?;
164
165        let values = children.get(1, dtype, runs)?;
166
167        RunEndArray::try_new_offset_length(
168            ends,
169            values,
170            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
171            len,
172        )
173    }
174
175    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
176        vortex_ensure!(
177            children.len() == 2,
178            "RunEndArray expects 2 children, got {}",
179            children.len()
180        );
181
182        let mut children_iter = children.into_iter();
183        array.ends = children_iter.next().vortex_expect("ends child");
184        array.values = children_iter.next().vortex_expect("values child");
185
186        Ok(())
187    }
188
189    fn reduce_parent(
190        array: &Self::Array,
191        parent: &ArrayRef,
192        child_idx: usize,
193    ) -> VortexResult<Option<ArrayRef>> {
194        RULES.evaluate(array, parent, child_idx)
195    }
196
197    fn execute_parent(
198        array: &Self::Array,
199        parent: &ArrayRef,
200        child_idx: usize,
201        ctx: &mut ExecutionCtx,
202    ) -> VortexResult<Option<ArrayRef>> {
203        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
204    }
205
206    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
207        run_end_canonicalize(array, ctx).map(ExecutionStep::Done)
208    }
209}
210
211#[derive(Clone, Debug)]
212pub struct RunEndArray {
213    ends: ArrayRef,
214    values: ArrayRef,
215    offset: usize,
216    length: usize,
217    stats_set: ArrayStats,
218}
219
220pub struct RunEndArrayParts {
221    pub ends: ArrayRef,
222    pub values: ArrayRef,
223}
224
225#[derive(Debug)]
226pub struct RunEndVTable;
227
228impl RunEndVTable {
229    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
230}
231
232impl RunEndArray {
233    fn validate(
234        ends: &ArrayRef,
235        values: &ArrayRef,
236        offset: usize,
237        length: usize,
238    ) -> VortexResult<()> {
239        // DType validation
240        vortex_ensure!(
241            ends.dtype().is_unsigned_int(),
242            "run ends must be unsigned integers, was {}",
243            ends.dtype(),
244        );
245        vortex_ensure!(
246            ends.len() == values.len(),
247            "run ends len != run values len, {} != {}",
248            ends.len(),
249            values.len()
250        );
251
252        // Handle empty run-ends
253        if ends.is_empty() {
254            vortex_ensure!(
255                offset == 0,
256                "non-zero offset provided for empty RunEndArray"
257            );
258            return Ok(());
259        }
260
261        // Avoid building a non-empty array with zero logical length.
262        if length == 0 {
263            vortex_ensure!(
264                ends.is_empty(),
265                "run ends must be empty when length is zero"
266            );
267            return Ok(());
268        }
269
270        debug_assert!({
271            // Run ends must be strictly sorted for binary search to work correctly.
272            let pre_validation = ends.statistics().to_owned();
273
274            let is_sorted = ends
275                .statistics()
276                .compute_is_strict_sorted()
277                .unwrap_or(false);
278
279            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
280            // We don't want to run with different stats in debug mode and outside.
281            ends.statistics().inherit(pre_validation.iter());
282            is_sorted
283        });
284
285        // Skip host-only validation when ends are not host-resident.
286        if !ends.is_host() {
287            return Ok(());
288        }
289
290        // Validate the offset and length are valid for the given ends and values
291        if offset != 0 && length != 0 {
292            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
293            if first_run_end <= offset {
294                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
295            }
296        }
297
298        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
299        let min_required_end = offset + length;
300        if last_run_end < min_required_end {
301            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
302        }
303
304        Ok(())
305    }
306}
307
308impl RunEndArray {
309    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
310    ///
311    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
312    /// not satisfied.
313    ///
314    /// # Examples
315    ///
316    /// ```
317    /// # use vortex_array::arrays::BoolArray;
318    /// # use vortex_array::IntoArray;
319    /// # use vortex_buffer::buffer;
320    /// # use vortex_error::VortexResult;
321    /// # use vortex_runend::RunEndArray;
322    /// # fn main() -> VortexResult<()> {
323    /// let ends = buffer![2u8, 3u8].into_array();
324    /// let values = BoolArray::from_iter([false, true]).into_array();
325    /// let run_end = RunEndArray::new(ends, values);
326    ///
327    /// // Array encodes
328    /// assert_eq!(run_end.scalar_at(0)?, false.into());
329    /// assert_eq!(run_end.scalar_at(1)?, false.into());
330    /// assert_eq!(run_end.scalar_at(2)?, true.into());
331    /// # Ok(())
332    /// # }
333    /// ```
334    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
335        Self::try_new(ends, values).vortex_expect("RunEndArray new")
336    }
337
338    /// Build a new `RunEndArray` from components.
339    ///
340    /// # Validation
341    ///
342    /// The `ends` must be non-nullable unsigned integers.
343    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
344        let length: usize = if ends.is_empty() {
345            0
346        } else {
347            usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
348        };
349
350        Self::try_new_offset_length(ends, values, 0, length)
351    }
352
353    /// Construct a new sliced `RunEndArray` with the provided offset and length.
354    ///
355    /// This performs all the same validation as [`RunEndArray::try_new`].
356    pub fn try_new_offset_length(
357        ends: ArrayRef,
358        values: ArrayRef,
359        offset: usize,
360        length: usize,
361    ) -> VortexResult<Self> {
362        Self::validate(&ends, &values, offset, length)?;
363
364        Ok(Self {
365            ends,
366            values,
367            offset,
368            length,
369            stats_set: Default::default(),
370        })
371    }
372
373    /// Build a new `RunEndArray` without validation.
374    ///
375    /// # Safety
376    ///
377    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
378    /// satisfied before calling this function.
379    ///
380    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
381    pub unsafe fn new_unchecked(
382        ends: ArrayRef,
383        values: ArrayRef,
384        offset: usize,
385        length: usize,
386    ) -> Self {
387        Self {
388            ends,
389            values,
390            offset,
391            length,
392            stats_set: Default::default(),
393        }
394    }
395
396    /// Convert the given logical index to an index into the `values` array
397    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
398        Ok(self
399            .ends()
400            .as_primitive_typed()
401            .search_sorted(
402                &PValue::from(index + self.offset()),
403                SearchSortedSide::Right,
404            )?
405            .to_ends_index(self.ends().len()))
406    }
407
408    /// Run the array through run-end encoding.
409    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
410        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
411            let (ends, values) = runend_encode(parray);
412            // SAFETY: runend_encode handles this
413            unsafe {
414                Ok(Self::new_unchecked(
415                    ends.into_array(),
416                    values,
417                    0,
418                    array.len(),
419                ))
420            }
421        } else {
422            vortex_bail!("REE can only encode primitive arrays")
423        }
424    }
425
426    /// The offset that the `ends` is relative to.
427    ///
428    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
429    #[inline]
430    pub fn offset(&self) -> usize {
431        self.offset
432    }
433
434    /// The encoded "ends" of value runs.
435    ///
436    /// The `i`-th element indicates that there is a run of the same value, beginning
437    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
438    #[inline]
439    pub fn ends(&self) -> &ArrayRef {
440        &self.ends
441    }
442
443    /// The scalar values.
444    ///
445    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
446    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
447    #[inline]
448    pub fn values(&self) -> &ArrayRef {
449        &self.values
450    }
451
452    /// Split an `RunEndArray` into parts.
453    #[inline]
454    pub fn into_parts(self) -> RunEndArrayParts {
455        RunEndArrayParts {
456            ends: self.ends,
457            values: self.values,
458        }
459    }
460}
461
462impl ValidityVTable<RunEndVTable> for RunEndVTable {
463    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
464        Ok(match array.values().validity()? {
465            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
466            Validity::AllInvalid => Validity::AllInvalid,
467            Validity::Array(values_validity) => Validity::Array(unsafe {
468                RunEndArray::new_unchecked(
469                    array.ends().clone(),
470                    values_validity,
471                    array.offset(),
472                    array.len(),
473                )
474                .into_array()
475            }),
476        })
477    }
478}
479
480pub(super) fn run_end_canonicalize(
481    array: &RunEndArray,
482    ctx: &mut ExecutionCtx,
483) -> VortexResult<ArrayRef> {
484    let pends = array.ends().clone().execute_as("ends", ctx)?;
485
486    Ok(match array.dtype() {
487        DType::Bool(_) => {
488            let bools = array.values().clone().execute_as("values", ctx)?;
489            runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
490        }
491        DType::Primitive(..) => {
492            let pvalues = array.values().clone().execute_as("values", ctx)?;
493            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
494        }
495        DType::Utf8(_) | DType::Binary(_) => {
496            let values = array
497                .values()
498                .clone()
499                .execute_as::<VarBinViewArray>("values", ctx)?;
500            runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
501        }
502        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
503    })
504}
505
506#[cfg(test)]
507mod tests {
508    use vortex_array::IntoArray;
509    use vortex_array::arrays::DictArray;
510    use vortex_array::arrays::VarBinViewArray;
511    use vortex_array::assert_arrays_eq;
512    use vortex_array::dtype::DType;
513    use vortex_array::dtype::Nullability;
514    use vortex_array::dtype::PType;
515    use vortex_buffer::buffer;
516
517    use crate::RunEndArray;
518
519    #[test]
520    fn test_runend_constructor() {
521        let arr = RunEndArray::new(
522            buffer![2u32, 5, 10].into_array(),
523            buffer![1i32, 2, 3].into_array(),
524        );
525        assert_eq!(arr.len(), 10);
526        assert_eq!(
527            arr.dtype(),
528            &DType::Primitive(PType::I32, Nullability::NonNullable)
529        );
530
531        // 0, 1 => 1
532        // 2, 3, 4 => 2
533        // 5, 6, 7, 8, 9 => 3
534        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
535        assert_arrays_eq!(arr.into_array(), expected);
536    }
537
538    #[test]
539    fn test_runend_utf8() {
540        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
541        let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
542        assert_eq!(arr.len(), 10);
543        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
544
545        let expected =
546            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
547                .into_array();
548        assert_arrays_eq!(arr.into_array(), expected);
549    }
550
551    #[test]
552    fn test_runend_dict() {
553        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
554        let dict_codes = buffer![0u32, 1, 2].into_array();
555        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
556
557        let arr =
558            RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
559        assert_eq!(arr.len(), 10);
560
561        let expected =
562            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
563                .into_array();
564        assert_arrays_eq!(arr.into_array(), expected);
565    }
566}