Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::validity::Validity;
36use vortex_array::vtable::VTable;
37use vortex_array::vtable::ValidityVTable;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_session::VortexSession;
44use vortex_session::registry::CachedId;
45
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_decode_varbinview;
48use crate::compress::runend_encode;
49use crate::decompress_bool::runend_decode_bools;
50use crate::kernel::PARENT_KERNELS;
51use crate::rules::RULES;
52
53/// A [`RunEnd`]-encoded Vortex array.
54pub type RunEndArray = Array<RunEnd>;
55
56#[derive(Clone, prost::Message)]
57pub struct RunEndMetadata {
58    #[prost(enumeration = "PType", tag = "1")]
59    pub ends_ptype: i32,
60    #[prost(uint64, tag = "2")]
61    pub num_runs: u64,
62    #[prost(uint64, tag = "3")]
63    pub offset: u64,
64}
65
66impl ArrayHash for RunEndData {
67    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
68        self.offset.hash(state);
69    }
70}
71
72impl ArrayEq for RunEndData {
73    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
74        self.offset == other.offset
75    }
76}
77
78impl VTable for RunEnd {
79    type ArrayData = RunEndData;
80
81    type OperationsVTable = Self;
82    type ValidityVTable = Self;
83
84    fn id(&self) -> ArrayId {
85        static ID: CachedId = CachedId::new("vortex.runend");
86        *ID
87    }
88
89    fn validate(
90        &self,
91        data: &Self::ArrayData,
92        dtype: &DType,
93        len: usize,
94        slots: &[Option<ArrayRef>],
95    ) -> VortexResult<()> {
96        let ends = slots[ENDS_SLOT]
97            .as_ref()
98            .vortex_expect("RunEndArray ends slot");
99        let values = slots[VALUES_SLOT]
100            .as_ref()
101            .vortex_expect("RunEndArray values slot");
102        RunEndData::validate_parts(ends, values, data.offset, len)?;
103        vortex_ensure!(
104            values.dtype() == dtype,
105            "expected dtype {}, got {}",
106            dtype,
107            values.dtype()
108        );
109        Ok(())
110    }
111
112    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
113        0
114    }
115
116    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
117        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
118    }
119
120    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
121        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
122    }
123
124    fn serialize(
125        array: ArrayView<'_, Self>,
126        _session: &VortexSession,
127    ) -> VortexResult<Option<Vec<u8>>> {
128        Ok(Some(
129            RunEndMetadata {
130                ends_ptype: PType::try_from(array.ends().dtype())
131                    .vortex_expect("Must be a valid PType") as i32,
132                num_runs: array.ends().len() as u64,
133                offset: array.offset() as u64,
134            }
135            .encode_to_vec(),
136        ))
137    }
138
139    fn deserialize(
140        &self,
141        dtype: &DType,
142        len: usize,
143        metadata: &[u8],
144        _buffers: &[BufferHandle],
145        children: &dyn ArrayChildren,
146        _session: &VortexSession,
147    ) -> VortexResult<ArrayParts<Self>> {
148        let metadata = RunEndMetadata::decode(metadata)?;
149        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
150        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
151        let ends = children.get(0, &ends_dtype, runs)?;
152
153        let values = children.get(1, dtype, runs)?;
154        let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
155        let slots = vec![Some(ends), Some(values)];
156        let data = RunEndData::new(offset);
157        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
158    }
159
160    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
161        SLOT_NAMES[idx].to_string()
162    }
163
164    fn reduce_parent(
165        array: ArrayView<'_, Self>,
166        parent: &ArrayRef,
167        child_idx: usize,
168    ) -> VortexResult<Option<ArrayRef>> {
169        RULES.evaluate(array, parent, child_idx)
170    }
171
172    fn execute_parent(
173        array: ArrayView<'_, Self>,
174        parent: &ArrayRef,
175        child_idx: usize,
176        ctx: &mut ExecutionCtx,
177    ) -> VortexResult<Option<ArrayRef>> {
178        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
179    }
180
181    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
182        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
183    }
184}
185
186/// The run-end positions marking where each run terminates.
187pub(super) const ENDS_SLOT: usize = 0;
188/// The values for each run.
189pub(super) const VALUES_SLOT: usize = 1;
190pub(super) const NUM_SLOTS: usize = 2;
191pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
192
193#[derive(Clone, Debug)]
194pub struct RunEndData {
195    offset: usize,
196}
197
198impl Display for RunEndData {
199    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200        write!(f, "offset: {}", self.offset)
201    }
202}
203
204pub struct RunEndDataParts {
205    pub ends: ArrayRef,
206    pub values: ArrayRef,
207    pub offset: usize,
208}
209
210pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
211    fn offset(&self) -> usize {
212        self.offset
213    }
214
215    fn ends(&self) -> &ArrayRef {
216        self.as_ref().slots()[ENDS_SLOT]
217            .as_ref()
218            .vortex_expect("RunEndArray ends slot")
219    }
220
221    fn values(&self) -> &ArrayRef {
222        self.as_ref().slots()[VALUES_SLOT]
223            .as_ref()
224            .vortex_expect("RunEndArray values slot")
225    }
226
227    fn dtype(&self) -> &DType {
228        self.values().dtype()
229    }
230
231    fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
232        Ok(self
233            .ends()
234            .as_primitive_typed()
235            .search_sorted(
236                &PValue::from(index + self.offset()),
237                SearchSortedSide::Right,
238            )?
239            .to_ends_index(self.ends().len()))
240    }
241}
242impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
243
244#[derive(Clone, Debug)]
245pub struct RunEnd;
246
247impl RunEnd {
248    /// Build a new [`RunEndArray`] without validation.
249    ///
250    /// # Safety
251    /// See [`RunEndData::new_unchecked`] for preconditions.
252    pub unsafe fn new_unchecked(
253        ends: ArrayRef,
254        values: ArrayRef,
255        offset: usize,
256        length: usize,
257    ) -> RunEndArray {
258        let dtype = values.dtype().clone();
259        let slots = vec![Some(ends.clone()), Some(values.clone())];
260        RunEndData::validate_parts(&ends, &values, offset, length)
261            .vortex_expect("RunEndArray validation failed");
262        let data = unsafe { RunEndData::new_unchecked(offset) };
263        unsafe {
264            Array::from_parts_unchecked(
265                ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
266            )
267        }
268    }
269
270    /// Build a new [`RunEndArray`] from ends and values.
271    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<RunEndArray> {
272        let len = RunEndData::logical_len_from_ends(&ends)?;
273        let dtype = values.dtype().clone();
274        let slots = vec![Some(ends), Some(values)];
275        let data = RunEndData::new(0);
276        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
277    }
278
279    /// Build a new [`RunEndArray`] from ends, values, offset, and length.
280    pub fn try_new_offset_length(
281        ends: ArrayRef,
282        values: ArrayRef,
283        offset: usize,
284        length: usize,
285    ) -> VortexResult<RunEndArray> {
286        let dtype = values.dtype().clone();
287        let slots = vec![Some(ends), Some(values)];
288        let data = RunEndData::new(offset);
289        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
290    }
291
292    /// Build a new [`RunEndArray`] from ends and values (panics on invalid input).
293    pub fn new(ends: ArrayRef, values: ArrayRef) -> RunEndArray {
294        Self::try_new(ends, values).vortex_expect("RunEndData is always valid")
295    }
296
297    /// Run the array through run-end encoding.
298    pub fn encode(array: ArrayRef) -> VortexResult<RunEndArray> {
299        if let Some(parray) = array.as_opt::<Primitive>() {
300            let (ends, values) = runend_encode(parray);
301            let ends = ends.into_array();
302            let len = array.len();
303            let dtype = values.dtype().clone();
304            let slots = vec![Some(ends), Some(values)];
305            let data = unsafe { RunEndData::new_unchecked(0) };
306            Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
307        } else {
308            vortex_bail!("REE can only encode primitive arrays")
309        }
310    }
311}
312
313impl RunEndData {
314    fn logical_len_from_ends(ends: &ArrayRef) -> VortexResult<usize> {
315        if ends.is_empty() {
316            Ok(0)
317        } else {
318            usize::try_from(
319                &ends.execute_scalar(ends.len() - 1, &mut LEGACY_SESSION.create_execution_ctx())?,
320            )
321        }
322    }
323
324    pub(crate) fn validate_parts(
325        ends: &ArrayRef,
326        values: &ArrayRef,
327        offset: usize,
328        length: usize,
329    ) -> VortexResult<()> {
330        // DType validation
331        vortex_ensure!(
332            ends.dtype().is_unsigned_int(),
333            "run ends must be unsigned integers, was {}",
334            ends.dtype(),
335        );
336        vortex_ensure!(
337            ends.len() == values.len(),
338            "run ends len != run values len, {} != {}",
339            ends.len(),
340            values.len()
341        );
342
343        // Handle empty run-ends
344        if ends.is_empty() {
345            vortex_ensure!(
346                offset == 0,
347                "non-zero offset provided for empty RunEndArray"
348            );
349            return Ok(());
350        }
351
352        // Zero-length logical slices may retain run metadata from the source array.
353        if length == 0 {
354            return Ok(());
355        }
356
357        #[cfg(debug_assertions)]
358        {
359            // Run ends must be strictly sorted for binary search to work correctly.
360            let pre_validation = ends.statistics().to_owned();
361
362            let mut ctx = LEGACY_SESSION.create_execution_ctx();
363            let is_sorted = ends
364                .statistics()
365                .compute_is_strict_sorted(&mut ctx)
366                .unwrap_or(false);
367
368            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
369            // We don't want to run with different stats in debug mode and outside.
370            ends.statistics().inherit(pre_validation.iter());
371            debug_assert!(is_sorted);
372        }
373
374        // Skip host-only validation when ends are not host-resident.
375        if !ends.is_host() {
376            return Ok(());
377        }
378
379        // Validate the offset and length are valid for the given ends and values
380        if offset != 0 && length != 0 {
381            let first_run_end = usize::try_from(
382                &ends.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?,
383            )?;
384            if first_run_end < offset {
385                vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
386            }
387        }
388
389        let last_run_end = usize::try_from(
390            &ends.execute_scalar(ends.len() - 1, &mut LEGACY_SESSION.create_execution_ctx())?,
391        )?;
392        let min_required_end = offset + length;
393        if last_run_end < min_required_end {
394            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
395        }
396
397        Ok(())
398    }
399}
400
401impl RunEndData {
402    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
403    ///
404    /// Panics if any of the validation conditions described in [`RunEnd::try_new`] is
405    /// not satisfied.
406    ///
407    /// # Examples
408    ///
409    /// ```
410    /// # use vortex_array::arrays::BoolArray;
411    /// # use vortex_array::IntoArray;
412    /// # use vortex_array::{LEGACY_SESSION, VortexSessionExecute};
413    /// # use vortex_buffer::buffer;
414    /// # use vortex_error::VortexResult;
415    /// # use vortex_runend::RunEnd;
416    /// # fn main() -> VortexResult<()> {
417    /// let ends = buffer![2u8, 3u8].into_array();
418    /// let values = BoolArray::from_iter([false, true]).into_array();
419    /// let run_end = RunEnd::new(ends, values);
420    ///
421    /// // Array encodes
422    /// let mut ctx = LEGACY_SESSION.create_execution_ctx();
423    /// assert_eq!(run_end.execute_scalar(0, &mut ctx)?, false.into());
424    /// assert_eq!(run_end.execute_scalar(1, &mut ctx)?, false.into());
425    /// assert_eq!(run_end.execute_scalar(2, &mut ctx)?, true.into());
426    /// # Ok(())
427    /// # }
428    /// ```
429    pub fn new(offset: usize) -> Self {
430        Self { offset }
431    }
432
433    /// Build a new `RunEndArray` without validation.
434    ///
435    /// # Safety
436    ///
437    /// The caller must ensure that all the validation performed in
438    /// [`RunEnd::try_new_offset_length`] is
439    /// satisfied before calling this function.
440    ///
441    /// See [`RunEnd::try_new_offset_length`] for the preconditions needed to build a new array.
442    pub unsafe fn new_unchecked(offset: usize) -> Self {
443        Self { offset }
444    }
445
446    /// Run the array through run-end encoding.
447    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
448        if let Some(parray) = array.as_opt::<Primitive>() {
449            let (_ends, _values) = runend_encode(parray);
450            // SAFETY: runend_encode handles this
451            unsafe { Ok(Self::new_unchecked(0)) }
452        } else {
453            vortex_bail!("REE can only encode primitive arrays")
454        }
455    }
456
457    pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
458        RunEndDataParts {
459            ends,
460            values,
461            offset: self.offset,
462        }
463    }
464}
465
466impl ValidityVTable<RunEnd> for RunEnd {
467    fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
468        Ok(match array.values().validity()? {
469            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
470            Validity::AllInvalid => Validity::AllInvalid,
471            Validity::Array(values_validity) => Validity::Array(unsafe {
472                RunEnd::new_unchecked(
473                    array.ends().clone(),
474                    values_validity,
475                    array.offset(),
476                    array.len(),
477                )
478                .into_array()
479            }),
480        })
481    }
482}
483
484pub(super) fn run_end_canonicalize(
485    array: &RunEndArray,
486    ctx: &mut ExecutionCtx,
487) -> VortexResult<ArrayRef> {
488    let pends = array.ends().clone().execute_as("ends", ctx)?;
489
490    Ok(match array.dtype() {
491        DType::Bool(_) => {
492            let bools = array.values().clone().execute_as("values", ctx)?;
493            runend_decode_bools(pends, bools, array.offset(), array.len())?
494        }
495        DType::Primitive(..) => {
496            let pvalues = array.values().clone().execute_as("values", ctx)?;
497            runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
498        }
499        DType::Utf8(_) | DType::Binary(_) => {
500            let values = array
501                .values()
502                .clone()
503                .execute_as::<VarBinViewArray>("values", ctx)?;
504            runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
505        }
506        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
507    })
508}
509
510#[cfg(test)]
511mod tests {
512    use vortex_array::IntoArray;
513    use vortex_array::arrays::DictArray;
514    use vortex_array::arrays::VarBinViewArray;
515    use vortex_array::assert_arrays_eq;
516    use vortex_array::dtype::DType;
517    use vortex_array::dtype::Nullability;
518    use vortex_array::dtype::PType;
519    use vortex_buffer::buffer;
520
521    use crate::RunEnd;
522
523    #[test]
524    fn test_runend_constructor() {
525        let arr = RunEnd::new(
526            buffer![2u32, 5, 10].into_array(),
527            buffer![1i32, 2, 3].into_array(),
528        );
529        assert_eq!(arr.len(), 10);
530        assert_eq!(
531            arr.dtype(),
532            &DType::Primitive(PType::I32, Nullability::NonNullable)
533        );
534
535        // 0, 1 => 1
536        // 2, 3, 4 => 2
537        // 5, 6, 7, 8, 9 => 3
538        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
539        assert_arrays_eq!(arr.into_array(), expected);
540    }
541
542    #[test]
543    fn test_runend_utf8() {
544        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
545        let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values);
546        assert_eq!(arr.len(), 10);
547        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
548
549        let expected =
550            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
551                .into_array();
552        assert_arrays_eq!(arr.into_array(), expected);
553    }
554
555    #[test]
556    fn test_runend_dict() {
557        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
558        let dict_codes = buffer![0u32, 1, 2].into_array();
559        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
560
561        let arr = RunEnd::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
562        assert_eq!(arr.len(), 10);
563
564        let expected =
565            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
566                .into_array();
567        assert_arrays_eq!(arr.into_array(), expected);
568    }
569}