Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::ExecutionCtx;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::arrays::PrimitiveVTable;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::scalar::PValue;
20use vortex_array::search_sorted::SearchSorted;
21use vortex_array::search_sorted::SearchSortedSide;
22use vortex_array::serde::ArrayChildren;
23use vortex_array::stats::ArrayStats;
24use vortex_array::stats::StatsSetRef;
25use vortex_array::validity::Validity;
26use vortex_array::vtable;
27use vortex_array::vtable::ArrayId;
28use vortex_array::vtable::BaseArrayVTable;
29use vortex_array::vtable::VTable;
30use vortex_array::vtable::ValidityVTable;
31use vortex_dtype::DType;
32use vortex_dtype::Nullability;
33use vortex_dtype::PType;
34use vortex_error::VortexExpect as _;
35use vortex_error::VortexResult;
36use vortex_error::vortex_bail;
37use vortex_error::vortex_ensure;
38use vortex_error::vortex_panic;
39use vortex_session::VortexSession;
40
41use crate::compress::runend_decode_bools;
42use crate::compress::runend_decode_primitive;
43use crate::compress::runend_encode;
44use crate::kernel::PARENT_KERNELS;
45use crate::rules::RULES;
46
47vtable!(RunEnd);
48
49#[derive(Clone, prost::Message)]
50pub struct RunEndMetadata {
51    #[prost(enumeration = "PType", tag = "1")]
52    pub ends_ptype: i32,
53    #[prost(uint64, tag = "2")]
54    pub num_runs: u64,
55    #[prost(uint64, tag = "3")]
56    pub offset: u64,
57}
58
59impl VTable for RunEndVTable {
60    type Array = RunEndArray;
61
62    type Metadata = ProstMetadata<RunEndMetadata>;
63
64    type ArrayVTable = Self;
65    type OperationsVTable = Self;
66    type ValidityVTable = Self;
67    type VisitorVTable = Self;
68
69    fn id(_array: &Self::Array) -> ArrayId {
70        Self::ID
71    }
72
73    fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
74        Ok(ProstMetadata(RunEndMetadata {
75            ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
76                as i32,
77            num_runs: array.ends().len() as u64,
78            offset: array.offset() as u64,
79        }))
80    }
81
82    fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
83        Ok(Some(metadata.serialize()))
84    }
85
86    fn deserialize(
87        bytes: &[u8],
88        _dtype: &DType,
89        _len: usize,
90        _buffers: &[BufferHandle],
91        _session: &VortexSession,
92    ) -> VortexResult<Self::Metadata> {
93        let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
94        Ok(ProstMetadata(inner))
95    }
96
97    fn build(
98        dtype: &DType,
99        len: usize,
100        metadata: &Self::Metadata,
101        _buffers: &[BufferHandle],
102        children: &dyn ArrayChildren,
103    ) -> VortexResult<RunEndArray> {
104        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
105        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
106        let ends = children.get(0, &ends_dtype, runs)?;
107
108        let values = children.get(1, dtype, runs)?;
109
110        RunEndArray::try_new_offset_length(
111            ends,
112            values,
113            usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
114            len,
115        )
116    }
117
118    fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
119        vortex_ensure!(
120            children.len() == 2,
121            "RunEndArray expects 2 children, got {}",
122            children.len()
123        );
124
125        let mut children_iter = children.into_iter();
126        array.ends = children_iter.next().vortex_expect("ends child");
127        array.values = children_iter.next().vortex_expect("values child");
128
129        Ok(())
130    }
131
132    fn reduce_parent(
133        array: &Self::Array,
134        parent: &ArrayRef,
135        child_idx: usize,
136    ) -> VortexResult<Option<ArrayRef>> {
137        RULES.evaluate(array, parent, child_idx)
138    }
139
140    fn execute_parent(
141        array: &Self::Array,
142        parent: &ArrayRef,
143        child_idx: usize,
144        ctx: &mut ExecutionCtx,
145    ) -> VortexResult<Option<ArrayRef>> {
146        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
147    }
148
149    fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
150        run_end_canonicalize(array, ctx)
151    }
152}
153
154#[derive(Clone, Debug)]
155pub struct RunEndArray {
156    ends: ArrayRef,
157    values: ArrayRef,
158    offset: usize,
159    length: usize,
160    stats_set: ArrayStats,
161}
162
163pub struct RunEndArrayParts {
164    pub ends: ArrayRef,
165    pub values: ArrayRef,
166}
167
168#[derive(Debug)]
169pub struct RunEndVTable;
170
171impl RunEndVTable {
172    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
173}
174
175impl RunEndArray {
176    fn validate(
177        ends: &dyn Array,
178        values: &dyn Array,
179        offset: usize,
180        length: usize,
181    ) -> VortexResult<()> {
182        // DType validation
183        vortex_ensure!(
184            ends.dtype().is_unsigned_int(),
185            "run ends must be unsigned integers, was {}",
186            ends.dtype(),
187        );
188        vortex_ensure!(
189            values.dtype().is_primitive() || values.dtype().is_boolean(),
190            "RunEnd array can only have Bool or Primitive values, {} given",
191            values.dtype()
192        );
193
194        vortex_ensure!(
195            ends.len() == values.len(),
196            "run ends len != run values len, {} != {}",
197            ends.len(),
198            values.len()
199        );
200
201        // Handle empty run-ends
202        if ends.is_empty() {
203            vortex_ensure!(
204                offset == 0,
205                "non-zero offset provided for empty RunEndArray"
206            );
207            return Ok(());
208        }
209
210        // Avoid building a non-empty array with zero logical length.
211        if length == 0 {
212            vortex_ensure!(
213                ends.is_empty(),
214                "run ends must be empty when length is zero"
215            );
216            return Ok(());
217        }
218
219        debug_assert!({
220            // Run ends must be strictly sorted for binary search to work correctly.
221            let pre_validation = ends.statistics().to_owned();
222
223            let is_sorted = ends
224                .statistics()
225                .compute_is_strict_sorted()
226                .unwrap_or(false);
227
228            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
229            // We don't want to run with different stats in debug mode and outside.
230            ends.statistics().inherit(pre_validation.iter());
231            is_sorted
232        });
233
234        // Skip host-only validation when ends are not host-resident.
235        if !ends.is_host() {
236            return Ok(());
237        }
238
239        // Validate the offset and length are valid for the given ends and values
240        if offset != 0 && length != 0 {
241            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
242            if first_run_end <= offset {
243                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
244            }
245        }
246
247        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
248        let min_required_end = offset + length;
249        if last_run_end < min_required_end {
250            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
251        }
252
253        Ok(())
254    }
255}
256
257impl RunEndArray {
258    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
259    ///
260    /// Panics if any of the validation conditions described in [`RunEndArray::try_new`] is
261    /// not satisfied.
262    ///
263    /// # Examples
264    ///
265    /// ```
266    /// # use vortex_array::arrays::BoolArray;
267    /// # use vortex_array::IntoArray;
268    /// # use vortex_buffer::buffer;
269    /// # use vortex_error::VortexResult;
270    /// # use vortex_runend::RunEndArray;
271    /// # fn main() -> VortexResult<()> {
272    /// let ends = buffer![2u8, 3u8].into_array();
273    /// let values = BoolArray::from_iter([false, true]).into_array();
274    /// let run_end = RunEndArray::new(ends, values);
275    ///
276    /// // Array encodes
277    /// assert_eq!(run_end.scalar_at(0)?, false.into());
278    /// assert_eq!(run_end.scalar_at(1)?, false.into());
279    /// assert_eq!(run_end.scalar_at(2)?, true.into());
280    /// # Ok(())
281    /// # }
282    /// ```
283    pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
284        Self::try_new(ends, values).vortex_expect("RunEndArray new")
285    }
286
287    /// Build a new `RunEndArray` from components.
288    ///
289    /// # Validation
290    ///
291    /// The `ends` must be non-nullable unsigned integers. The values may be `Bool` or `Primitive`
292    /// types.
293    ///
294    /// # Examples
295    ///
296    /// ```
297    /// # use vortex_array::arrays::{BoolArray, VarBinViewArray};
298    /// # use vortex_array::IntoArray;
299    /// # use vortex_buffer::buffer;
300    /// # use vortex_runend::RunEndArray;
301    ///
302    /// // Error to provide incorrectly-typed values!
303    /// let result = RunEndArray::try_new(
304    ///     buffer![1u8, 2u8].into_array(),
305    ///     VarBinViewArray::from_iter_str(["bad", "dtype"]).into_array(),
306    /// );
307    /// assert!(result.is_err());
308    ///
309    /// // This array is happy
310    /// let result = RunEndArray::try_new(
311    ///     buffer![1u8, 2u8].into_array(),
312    ///     BoolArray::from_iter([false, true]).into_array(),
313    /// );
314    ///
315    /// assert!(result.is_ok());
316    /// ```
317    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
318        let length: usize = if ends.is_empty() {
319            0
320        } else {
321            usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
322        };
323
324        Self::try_new_offset_length(ends, values, 0, length)
325    }
326
327    /// Construct a new sliced `RunEndArray` with the provided offset and length.
328    ///
329    /// This performs all the same validation as [`RunEndArray::try_new`].
330    pub fn try_new_offset_length(
331        ends: ArrayRef,
332        values: ArrayRef,
333        offset: usize,
334        length: usize,
335    ) -> VortexResult<Self> {
336        Self::validate(&ends, &values, offset, length)?;
337
338        Ok(Self {
339            ends,
340            values,
341            offset,
342            length,
343            stats_set: Default::default(),
344        })
345    }
346
347    /// Build a new `RunEndArray` without validation.
348    ///
349    /// # Safety
350    ///
351    /// The caller must ensure that all the validation performed in [`RunEndArray::try_new`] is
352    /// satisfied before calling this function.
353    ///
354    /// See [`RunEndArray::try_new`] for the preconditions needed to build a new array.
355    pub unsafe fn new_unchecked(
356        ends: ArrayRef,
357        values: ArrayRef,
358        offset: usize,
359        length: usize,
360    ) -> Self {
361        Self {
362            ends,
363            values,
364            offset,
365            length,
366            stats_set: Default::default(),
367        }
368    }
369
370    /// Convert the given logical index to an index into the `values` array
371    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
372        Ok(self
373            .ends()
374            .as_primitive_typed()
375            .search_sorted(
376                &PValue::from(index + self.offset()),
377                SearchSortedSide::Right,
378            )?
379            .to_ends_index(self.ends().len()))
380    }
381
382    /// Run the array through run-end encoding.
383    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
384        if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
385            let (ends, values) = runend_encode(parray);
386            // SAFETY: runend_encode handles this
387            unsafe {
388                Ok(Self::new_unchecked(
389                    ends.into_array(),
390                    values,
391                    0,
392                    array.len(),
393                ))
394            }
395        } else {
396            vortex_bail!("REE can only encode primitive arrays")
397        }
398    }
399
400    /// The offset that the `ends` is relative to.
401    ///
402    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
403    #[inline]
404    pub fn offset(&self) -> usize {
405        self.offset
406    }
407
408    /// The encoded "ends" of value runs.
409    ///
410    /// The `i`-th element indicates that there is a run of the same value, beginning
411    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
412    #[inline]
413    pub fn ends(&self) -> &ArrayRef {
414        &self.ends
415    }
416
417    /// The scalar values.
418    ///
419    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
420    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
421    #[inline]
422    pub fn values(&self) -> &ArrayRef {
423        &self.values
424    }
425
426    /// Split an `RunEndArray` into parts.
427    #[inline]
428    pub fn into_parts(self) -> RunEndArrayParts {
429        RunEndArrayParts {
430            ends: self.ends,
431            values: self.values,
432        }
433    }
434}
435
436impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
437    fn len(array: &RunEndArray) -> usize {
438        array.length
439    }
440
441    fn dtype(array: &RunEndArray) -> &DType {
442        array.values.dtype()
443    }
444
445    fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
446        array.stats_set.to_ref(array.as_ref())
447    }
448
449    fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
450        array.ends.array_hash(state, precision);
451        array.values.array_hash(state, precision);
452        array.offset.hash(state);
453        array.length.hash(state);
454    }
455
456    fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
457        array.ends.array_eq(&other.ends, precision)
458            && array.values.array_eq(&other.values, precision)
459            && array.offset == other.offset
460            && array.length == other.length
461    }
462}
463
464impl ValidityVTable<RunEndVTable> for RunEndVTable {
465    fn validity(array: &RunEndArray) -> VortexResult<Validity> {
466        Ok(match array.values().validity()? {
467            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
468            Validity::AllInvalid => Validity::AllInvalid,
469            Validity::Array(values_validity) => Validity::Array(unsafe {
470                RunEndArray::new_unchecked(
471                    array.ends().clone(),
472                    values_validity,
473                    array.offset(),
474                    array.len(),
475                )
476                .into_array()
477            }),
478        })
479    }
480}
481
482pub(super) fn run_end_canonicalize(
483    array: &RunEndArray,
484    ctx: &mut ExecutionCtx,
485) -> VortexResult<ArrayRef> {
486    let pends = array.ends().clone().execute_as("ends", ctx)?;
487    Ok(match array.dtype() {
488        DType::Bool(_) => {
489            let bools = array.values().clone().execute_as("values", ctx)?;
490            runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
491        }
492        DType::Primitive(..) => {
493            let pvalues = array.values().clone().execute_as("values", ctx)?;
494            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
495        }
496        _ => vortex_panic!("Only Primitive and Bool values are supported"),
497    })
498}
499
500#[cfg(test)]
501mod tests {
502    use vortex_array::IntoArray;
503    use vortex_array::assert_arrays_eq;
504    use vortex_buffer::buffer;
505    use vortex_dtype::DType;
506    use vortex_dtype::Nullability;
507    use vortex_dtype::PType;
508
509    use crate::RunEndArray;
510
511    #[test]
512    fn test_runend_constructor() {
513        let arr = RunEndArray::new(
514            buffer![2u32, 5, 10].into_array(),
515            buffer![1i32, 2, 3].into_array(),
516        );
517        assert_eq!(arr.len(), 10);
518        assert_eq!(
519            arr.dtype(),
520            &DType::Primitive(PType::I32, Nullability::NonNullable)
521        );
522
523        // 0, 1 => 1
524        // 2, 3, 4 => 2
525        // 5, 6, 7, 8, 9 => 3
526        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
527        assert_arrays_eq!(arr.to_array(), expected);
528    }
529}