Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::Precision;
22use vortex_array::TypedArrayRef;
23use vortex_array::arrays::Primitive;
24use vortex_array::arrays::VarBinViewArray;
25use vortex_array::buffer::BufferHandle;
26use vortex_array::dtype::DType;
27use vortex_array::dtype::Nullability;
28use vortex_array::dtype::PType;
29use vortex_array::scalar::PValue;
30use vortex_array::search_sorted::SearchSorted;
31use vortex_array::search_sorted::SearchSortedSide;
32use vortex_array::serde::ArrayChildren;
33use vortex_array::validity::Validity;
34use vortex_array::vtable::VTable;
35use vortex_array::vtable::ValidityVTable;
36use vortex_error::VortexExpect as _;
37use vortex_error::VortexResult;
38use vortex_error::vortex_bail;
39use vortex_error::vortex_ensure;
40use vortex_error::vortex_panic;
41use vortex_session::VortexSession;
42
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::decompress_bool::runend_decode_bools;
47use crate::kernel::PARENT_KERNELS;
48use crate::rules::RULES;
49
50/// A [`RunEnd`]-encoded Vortex array.
51pub type RunEndArray = Array<RunEnd>;
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55    #[prost(enumeration = "PType", tag = "1")]
56    pub ends_ptype: i32,
57    #[prost(uint64, tag = "2")]
58    pub num_runs: u64,
59    #[prost(uint64, tag = "3")]
60    pub offset: u64,
61}
62
63impl ArrayHash for RunEndData {
64    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
65        self.offset.hash(state);
66    }
67}
68
69impl ArrayEq for RunEndData {
70    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
71        self.offset == other.offset
72    }
73}
74
75impl VTable for RunEnd {
76    type ArrayData = RunEndData;
77
78    type OperationsVTable = Self;
79    type ValidityVTable = Self;
80
81    fn id(&self) -> ArrayId {
82        Self::ID
83    }
84
85    fn validate(
86        &self,
87        data: &Self::ArrayData,
88        dtype: &DType,
89        len: usize,
90        slots: &[Option<ArrayRef>],
91    ) -> VortexResult<()> {
92        let ends = slots[ENDS_SLOT]
93            .as_ref()
94            .vortex_expect("RunEndArray ends slot");
95        let values = slots[VALUES_SLOT]
96            .as_ref()
97            .vortex_expect("RunEndArray values slot");
98        RunEndData::validate_parts(ends, values, data.offset, len)?;
99        vortex_ensure!(
100            values.dtype() == dtype,
101            "expected dtype {}, got {}",
102            dtype,
103            values.dtype()
104        );
105        Ok(())
106    }
107
108    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
109        0
110    }
111
112    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
113        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
114    }
115
116    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
117        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
118    }
119
120    fn serialize(
121        array: ArrayView<'_, Self>,
122        _session: &VortexSession,
123    ) -> VortexResult<Option<Vec<u8>>> {
124        Ok(Some(
125            RunEndMetadata {
126                ends_ptype: PType::try_from(array.ends().dtype())
127                    .vortex_expect("Must be a valid PType") as i32,
128                num_runs: array.ends().len() as u64,
129                offset: array.offset() as u64,
130            }
131            .encode_to_vec(),
132        ))
133    }
134
135    fn deserialize(
136        &self,
137        dtype: &DType,
138        len: usize,
139        metadata: &[u8],
140        _buffers: &[BufferHandle],
141        children: &dyn ArrayChildren,
142        _session: &VortexSession,
143    ) -> VortexResult<ArrayParts<Self>> {
144        let metadata = RunEndMetadata::decode(metadata)?;
145        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
146        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
147        let ends = children.get(0, &ends_dtype, runs)?;
148
149        let values = children.get(1, dtype, runs)?;
150        let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
151        let slots = vec![Some(ends), Some(values)];
152        let data = RunEndData::new(offset);
153        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
154    }
155
156    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
157        SLOT_NAMES[idx].to_string()
158    }
159
160    fn reduce_parent(
161        array: ArrayView<'_, Self>,
162        parent: &ArrayRef,
163        child_idx: usize,
164    ) -> VortexResult<Option<ArrayRef>> {
165        RULES.evaluate(array, parent, child_idx)
166    }
167
168    fn execute_parent(
169        array: ArrayView<'_, Self>,
170        parent: &ArrayRef,
171        child_idx: usize,
172        ctx: &mut ExecutionCtx,
173    ) -> VortexResult<Option<ArrayRef>> {
174        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
175    }
176
177    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
178        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
179    }
180}
181
182/// The run-end positions marking where each run terminates.
183pub(super) const ENDS_SLOT: usize = 0;
184/// The values for each run.
185pub(super) const VALUES_SLOT: usize = 1;
186pub(super) const NUM_SLOTS: usize = 2;
187pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
188
189#[derive(Clone, Debug)]
190pub struct RunEndData {
191    offset: usize,
192}
193
194impl Display for RunEndData {
195    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196        write!(f, "offset: {}", self.offset)
197    }
198}
199
200pub struct RunEndDataParts {
201    pub ends: ArrayRef,
202    pub values: ArrayRef,
203    pub offset: usize,
204}
205
206pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
207    fn offset(&self) -> usize {
208        self.offset
209    }
210
211    fn ends(&self) -> &ArrayRef {
212        self.as_ref().slots()[ENDS_SLOT]
213            .as_ref()
214            .vortex_expect("RunEndArray ends slot")
215    }
216
217    fn values(&self) -> &ArrayRef {
218        self.as_ref().slots()[VALUES_SLOT]
219            .as_ref()
220            .vortex_expect("RunEndArray values slot")
221    }
222
223    fn dtype(&self) -> &DType {
224        self.values().dtype()
225    }
226
227    fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
228        Ok(self
229            .ends()
230            .as_primitive_typed()
231            .search_sorted(
232                &PValue::from(index + self.offset()),
233                SearchSortedSide::Right,
234            )?
235            .to_ends_index(self.ends().len()))
236    }
237}
238impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
239
240#[derive(Clone, Debug)]
241pub struct RunEnd;
242
243impl RunEnd {
244    pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
245
246    /// Build a new [`RunEndArray`] without validation.
247    ///
248    /// # Safety
249    /// See [`RunEndData::new_unchecked`] for preconditions.
250    pub unsafe fn new_unchecked(
251        ends: ArrayRef,
252        values: ArrayRef,
253        offset: usize,
254        length: usize,
255    ) -> RunEndArray {
256        let dtype = values.dtype().clone();
257        let slots = vec![Some(ends.clone()), Some(values.clone())];
258        RunEndData::validate_parts(&ends, &values, offset, length)
259            .vortex_expect("RunEndArray validation failed");
260        let data = unsafe { RunEndData::new_unchecked(offset) };
261        unsafe {
262            Array::from_parts_unchecked(
263                ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
264            )
265        }
266    }
267
268    /// Build a new [`RunEndArray`] from ends and values.
269    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<RunEndArray> {
270        let len = RunEndData::logical_len_from_ends(&ends)?;
271        let dtype = values.dtype().clone();
272        let slots = vec![Some(ends), Some(values)];
273        let data = RunEndData::new(0);
274        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
275    }
276
277    /// Build a new [`RunEndArray`] from ends, values, offset, and length.
278    pub fn try_new_offset_length(
279        ends: ArrayRef,
280        values: ArrayRef,
281        offset: usize,
282        length: usize,
283    ) -> VortexResult<RunEndArray> {
284        let dtype = values.dtype().clone();
285        let slots = vec![Some(ends), Some(values)];
286        let data = RunEndData::new(offset);
287        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
288    }
289
290    /// Build a new [`RunEndArray`] from ends and values (panics on invalid input).
291    pub fn new(ends: ArrayRef, values: ArrayRef) -> RunEndArray {
292        Self::try_new(ends, values).vortex_expect("RunEndData is always valid")
293    }
294
295    /// Run the array through run-end encoding.
296    pub fn encode(array: ArrayRef) -> VortexResult<RunEndArray> {
297        if let Some(parray) = array.as_opt::<Primitive>() {
298            let (ends, values) = runend_encode(parray);
299            let ends = ends.into_array();
300            let len = array.len();
301            let dtype = values.dtype().clone();
302            let slots = vec![Some(ends), Some(values)];
303            let data = unsafe { RunEndData::new_unchecked(0) };
304            Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
305        } else {
306            vortex_bail!("REE can only encode primitive arrays")
307        }
308    }
309}
310
311impl RunEndData {
312    fn logical_len_from_ends(ends: &ArrayRef) -> VortexResult<usize> {
313        if ends.is_empty() {
314            Ok(0)
315        } else {
316            usize::try_from(&ends.scalar_at(ends.len() - 1)?)
317        }
318    }
319
320    pub(crate) fn validate_parts(
321        ends: &ArrayRef,
322        values: &ArrayRef,
323        offset: usize,
324        length: usize,
325    ) -> VortexResult<()> {
326        // DType validation
327        vortex_ensure!(
328            ends.dtype().is_unsigned_int(),
329            "run ends must be unsigned integers, was {}",
330            ends.dtype(),
331        );
332        vortex_ensure!(
333            ends.len() == values.len(),
334            "run ends len != run values len, {} != {}",
335            ends.len(),
336            values.len()
337        );
338
339        // Handle empty run-ends
340        if ends.is_empty() {
341            vortex_ensure!(
342                offset == 0,
343                "non-zero offset provided for empty RunEndArray"
344            );
345            return Ok(());
346        }
347
348        // Zero-length logical slices may retain run metadata from the source array.
349        if length == 0 {
350            return Ok(());
351        }
352
353        debug_assert!({
354            // Run ends must be strictly sorted for binary search to work correctly.
355            let pre_validation = ends.statistics().to_owned();
356
357            let is_sorted = ends
358                .statistics()
359                .compute_is_strict_sorted()
360                .unwrap_or(false);
361
362            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
363            // We don't want to run with different stats in debug mode and outside.
364            ends.statistics().inherit(pre_validation.iter());
365            is_sorted
366        });
367
368        // Skip host-only validation when ends are not host-resident.
369        if !ends.is_host() {
370            return Ok(());
371        }
372
373        // Validate the offset and length are valid for the given ends and values
374        if offset != 0 && length != 0 {
375            let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
376            if first_run_end < offset {
377                vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
378            }
379        }
380
381        let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
382        let min_required_end = offset + length;
383        if last_run_end < min_required_end {
384            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
385        }
386
387        Ok(())
388    }
389}
390
391impl RunEndData {
392    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
393    ///
394    /// Panics if any of the validation conditions described in [`RunEnd::try_new`] is
395    /// not satisfied.
396    ///
397    /// # Examples
398    ///
399    /// ```
400    /// # use vortex_array::arrays::BoolArray;
401    /// # use vortex_array::IntoArray;
402    /// # use vortex_buffer::buffer;
403    /// # use vortex_error::VortexResult;
404    /// # use vortex_runend::RunEnd;
405    /// # fn main() -> VortexResult<()> {
406    /// let ends = buffer![2u8, 3u8].into_array();
407    /// let values = BoolArray::from_iter([false, true]).into_array();
408    /// let run_end = RunEnd::new(ends, values);
409    ///
410    /// // Array encodes
411    /// assert_eq!(run_end.scalar_at(0)?, false.into());
412    /// assert_eq!(run_end.scalar_at(1)?, false.into());
413    /// assert_eq!(run_end.scalar_at(2)?, true.into());
414    /// # Ok(())
415    /// # }
416    /// ```
417    pub fn new(offset: usize) -> Self {
418        Self { offset }
419    }
420
421    /// Build a new `RunEndArray` without validation.
422    ///
423    /// # Safety
424    ///
425    /// The caller must ensure that all the validation performed in
426    /// [`RunEnd::try_new_offset_length`] is
427    /// satisfied before calling this function.
428    ///
429    /// See [`RunEnd::try_new_offset_length`] for the preconditions needed to build a new array.
430    pub unsafe fn new_unchecked(offset: usize) -> Self {
431        Self { offset }
432    }
433
434    /// Run the array through run-end encoding.
435    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
436        if let Some(parray) = array.as_opt::<Primitive>() {
437            let (_ends, _values) = runend_encode(parray);
438            // SAFETY: runend_encode handles this
439            unsafe { Ok(Self::new_unchecked(0)) }
440        } else {
441            vortex_bail!("REE can only encode primitive arrays")
442        }
443    }
444
445    pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
446        RunEndDataParts {
447            ends,
448            values,
449            offset: self.offset,
450        }
451    }
452}
453
454impl ValidityVTable<RunEnd> for RunEnd {
455    fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
456        Ok(match array.values().validity()? {
457            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
458            Validity::AllInvalid => Validity::AllInvalid,
459            Validity::Array(values_validity) => Validity::Array(unsafe {
460                RunEnd::new_unchecked(
461                    array.ends().clone(),
462                    values_validity,
463                    array.offset(),
464                    array.len(),
465                )
466                .into_array()
467            }),
468        })
469    }
470}
471
472pub(super) fn run_end_canonicalize(
473    array: &RunEndArray,
474    ctx: &mut ExecutionCtx,
475) -> VortexResult<ArrayRef> {
476    let pends = array.ends().clone().execute_as("ends", ctx)?;
477
478    Ok(match array.dtype() {
479        DType::Bool(_) => {
480            let bools = array.values().clone().execute_as("values", ctx)?;
481            runend_decode_bools(pends, bools, array.offset(), array.len())?
482        }
483        DType::Primitive(..) => {
484            let pvalues = array.values().clone().execute_as("values", ctx)?;
485            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
486        }
487        DType::Utf8(_) | DType::Binary(_) => {
488            let values = array
489                .values()
490                .clone()
491                .execute_as::<VarBinViewArray>("values", ctx)?;
492            runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
493        }
494        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
495    })
496}
497
498#[cfg(test)]
499mod tests {
500    use vortex_array::IntoArray;
501    use vortex_array::arrays::DictArray;
502    use vortex_array::arrays::VarBinViewArray;
503    use vortex_array::assert_arrays_eq;
504    use vortex_array::dtype::DType;
505    use vortex_array::dtype::Nullability;
506    use vortex_array::dtype::PType;
507    use vortex_buffer::buffer;
508
509    use crate::RunEnd;
510
511    #[test]
512    fn test_runend_constructor() {
513        let arr = RunEnd::new(
514            buffer![2u32, 5, 10].into_array(),
515            buffer![1i32, 2, 3].into_array(),
516        );
517        assert_eq!(arr.len(), 10);
518        assert_eq!(
519            arr.dtype(),
520            &DType::Primitive(PType::I32, Nullability::NonNullable)
521        );
522
523        // 0, 1 => 1
524        // 2, 3, 4 => 2
525        // 5, 6, 7, 8, 9 => 3
526        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
527        assert_arrays_eq!(arr.into_array(), expected);
528    }
529
530    #[test]
531    fn test_runend_utf8() {
532        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
533        let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values);
534        assert_eq!(arr.len(), 10);
535        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
536
537        let expected =
538            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
539                .into_array();
540        assert_arrays_eq!(arr.into_array(), expected);
541    }
542
543    #[test]
544    fn test_runend_dict() {
545        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
546        let dict_codes = buffer![0u32, 1, 2].into_array();
547        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
548
549        let arr = RunEnd::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
550        assert_eq!(arr.len(), 10);
551
552        let expected =
553            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
554                .into_array();
555        assert_arrays_eq!(arr.into_array(), expected);
556    }
557}