Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::smallvec::smallvec;
36use vortex_array::validity::Validity;
37use vortex_array::vtable::VTable;
38use vortex_array::vtable::ValidityVTable;
39use vortex_error::VortexExpect as _;
40use vortex_error::VortexResult;
41use vortex_error::vortex_bail;
42use vortex_error::vortex_ensure;
43use vortex_error::vortex_panic;
44use vortex_session::VortexSession;
45use vortex_session::registry::CachedId;
46
47use crate::compress::runend_decode_primitive;
48use crate::compress::runend_decode_varbinview;
49use crate::compress::runend_encode;
50use crate::decompress_bool::runend_decode_bools;
51use crate::kernel::PARENT_KERNELS;
52use crate::rules::RULES;
53
54/// A [`RunEnd`]-encoded Vortex array.
55pub type RunEndArray = Array<RunEnd>;
56
57#[derive(Clone, prost::Message)]
58pub struct RunEndMetadata {
59    #[prost(enumeration = "PType", tag = "1")]
60    pub ends_ptype: i32,
61    #[prost(uint64, tag = "2")]
62    pub num_runs: u64,
63    #[prost(uint64, tag = "3")]
64    pub offset: u64,
65}
66
67impl ArrayHash for RunEndData {
68    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
69        self.offset.hash(state);
70    }
71}
72
73impl ArrayEq for RunEndData {
74    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
75        self.offset == other.offset
76    }
77}
78
79impl VTable for RunEnd {
80    type TypedArrayData = RunEndData;
81
82    type OperationsVTable = Self;
83    type ValidityVTable = Self;
84
85    fn id(&self) -> ArrayId {
86        static ID: CachedId = CachedId::new("vortex.runend");
87        *ID
88    }
89
90    fn validate(
91        &self,
92        data: &Self::TypedArrayData,
93        dtype: &DType,
94        len: usize,
95        slots: &[Option<ArrayRef>],
96    ) -> VortexResult<()> {
97        let ends = slots[ENDS_SLOT]
98            .as_ref()
99            .vortex_expect("RunEndArray ends slot");
100        let values = slots[VALUES_SLOT]
101            .as_ref()
102            .vortex_expect("RunEndArray values slot");
103        // TODO(ctx): trait fixes - VTable::validate has a fixed signature.
104        let mut ctx = LEGACY_SESSION.create_execution_ctx();
105        RunEndData::validate_parts(ends, values, data.offset, len, &mut ctx)?;
106        vortex_ensure!(
107            values.dtype() == dtype,
108            "expected dtype {}, got {}",
109            dtype,
110            values.dtype()
111        );
112        Ok(())
113    }
114
115    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
116        0
117    }
118
119    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
120        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
121    }
122
123    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
124        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
125    }
126
127    fn serialize(
128        array: ArrayView<'_, Self>,
129        _session: &VortexSession,
130    ) -> VortexResult<Option<Vec<u8>>> {
131        Ok(Some(
132            RunEndMetadata {
133                ends_ptype: PType::try_from(array.ends().dtype())
134                    .vortex_expect("Must be a valid PType") as i32,
135                num_runs: array.ends().len() as u64,
136                offset: array.offset() as u64,
137            }
138            .encode_to_vec(),
139        ))
140    }
141
142    fn deserialize(
143        &self,
144        dtype: &DType,
145        len: usize,
146        metadata: &[u8],
147        _buffers: &[BufferHandle],
148        children: &dyn ArrayChildren,
149        _session: &VortexSession,
150    ) -> VortexResult<ArrayParts<Self>> {
151        let metadata = RunEndMetadata::decode(metadata)?;
152        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
153        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
154        let ends = children.get(0, &ends_dtype, runs)?;
155
156        let values = children.get(1, dtype, runs)?;
157        let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
158        let slots = smallvec![Some(ends), Some(values)];
159        let data = RunEndData::new(offset);
160        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
161    }
162
163    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
164        SLOT_NAMES[idx].to_string()
165    }
166
167    fn reduce_parent(
168        array: ArrayView<'_, Self>,
169        parent: &ArrayRef,
170        child_idx: usize,
171    ) -> VortexResult<Option<ArrayRef>> {
172        RULES.evaluate(array, parent, child_idx)
173    }
174
175    fn execute_parent(
176        array: ArrayView<'_, Self>,
177        parent: &ArrayRef,
178        child_idx: usize,
179        ctx: &mut ExecutionCtx,
180    ) -> VortexResult<Option<ArrayRef>> {
181        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
182    }
183
184    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
185        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
186    }
187}
188
189/// The run-end positions marking where each run terminates.
190pub(super) const ENDS_SLOT: usize = 0;
191/// The values for each run.
192pub(super) const VALUES_SLOT: usize = 1;
193pub(super) const NUM_SLOTS: usize = 2;
194pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
195
196#[derive(Clone, Debug)]
197pub struct RunEndData {
198    offset: usize,
199}
200
201impl Display for RunEndData {
202    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203        write!(f, "offset: {}", self.offset)
204    }
205}
206
207pub struct RunEndDataParts {
208    pub ends: ArrayRef,
209    pub values: ArrayRef,
210    pub offset: usize,
211}
212
213pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
214    fn offset(&self) -> usize {
215        self.offset
216    }
217
218    fn ends(&self) -> &ArrayRef {
219        self.as_ref().slots()[ENDS_SLOT]
220            .as_ref()
221            .vortex_expect("RunEndArray ends slot")
222    }
223
224    fn values(&self) -> &ArrayRef {
225        self.as_ref().slots()[VALUES_SLOT]
226            .as_ref()
227            .vortex_expect("RunEndArray values slot")
228    }
229
230    fn dtype(&self) -> &DType {
231        self.values().dtype()
232    }
233
234    fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
235        Ok(self
236            .ends()
237            .as_primitive_typed()
238            .search_sorted(
239                &PValue::from(index + self.offset()),
240                SearchSortedSide::Right,
241            )?
242            .to_ends_index(self.ends().len()))
243    }
244}
245impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
246
247#[derive(Clone, Debug)]
248pub struct RunEnd;
249
250impl RunEnd {
251    /// Build a new [`RunEndArray`] without validation.
252    ///
253    /// # Safety
254    /// See [`RunEndData::new_unchecked`] for preconditions.
255    pub unsafe fn new_unchecked(
256        ends: ArrayRef,
257        values: ArrayRef,
258        offset: usize,
259        length: usize,
260    ) -> RunEndArray {
261        let dtype = values.dtype().clone();
262        let slots = smallvec![Some(ends), Some(values)];
263        let data = unsafe { RunEndData::new_unchecked(offset) };
264        unsafe {
265            Array::from_parts_unchecked(
266                ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
267            )
268        }
269    }
270
271    /// Build a new [`RunEndArray`] from ends and values.
272    pub fn try_new(
273        ends: ArrayRef,
274        values: ArrayRef,
275        ctx: &mut ExecutionCtx,
276    ) -> VortexResult<RunEndArray> {
277        let len = RunEndData::logical_len_from_ends(&ends, ctx)?;
278        RunEndData::validate_parts(&ends, &values, 0, len, ctx)?;
279        let dtype = values.dtype().clone();
280        let slots = smallvec![Some(ends), Some(values)];
281        let data = RunEndData::new(0);
282        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
283    }
284
285    /// Build a new [`RunEndArray`] from ends, values, offset, and length.
286    pub fn try_new_offset_length(
287        ends: ArrayRef,
288        values: ArrayRef,
289        offset: usize,
290        length: usize,
291        ctx: &mut ExecutionCtx,
292    ) -> VortexResult<RunEndArray> {
293        RunEndData::validate_parts(&ends, &values, offset, length, ctx)?;
294        let dtype = values.dtype().clone();
295        let slots = smallvec![Some(ends), Some(values)];
296        let data = RunEndData::new(offset);
297        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
298    }
299
300    /// Build a new [`RunEndArray`] from ends and values (panics on invalid input).
301    pub fn new(ends: ArrayRef, values: ArrayRef, ctx: &mut ExecutionCtx) -> RunEndArray {
302        Self::try_new(ends, values, ctx).vortex_expect("RunEndData is always valid")
303    }
304
305    /// Run the array through run-end encoding.
306    pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<RunEndArray> {
307        if let Some(parray) = array.as_opt::<Primitive>() {
308            let (ends, values) = runend_encode(parray, ctx);
309            let ends = ends.into_array();
310            let len = array.len();
311            let dtype = values.dtype().clone();
312            let slots = smallvec![Some(ends), Some(values)];
313            let data = unsafe { RunEndData::new_unchecked(0) };
314            Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
315        } else {
316            vortex_bail!("REE can only encode primitive arrays")
317        }
318    }
319}
320
321impl RunEndData {
322    fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
323        if ends.is_empty() {
324            Ok(0)
325        } else {
326            usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)
327        }
328    }
329
330    pub(crate) fn validate_parts(
331        ends: &ArrayRef,
332        values: &ArrayRef,
333        offset: usize,
334        length: usize,
335        ctx: &mut ExecutionCtx,
336    ) -> VortexResult<()> {
337        // DType validation
338        vortex_ensure!(
339            ends.dtype().is_unsigned_int(),
340            "run ends must be unsigned integers, was {}",
341            ends.dtype(),
342        );
343        vortex_ensure!(
344            ends.len() == values.len(),
345            "run ends len != run values len, {} != {}",
346            ends.len(),
347            values.len()
348        );
349
350        // Handle empty run-ends
351        if ends.is_empty() {
352            vortex_ensure!(
353                offset == 0,
354                "non-zero offset provided for empty RunEndArray"
355            );
356            return Ok(());
357        }
358
359        // Zero-length logical slices may retain run metadata from the source array.
360        if length == 0 {
361            return Ok(());
362        }
363
364        #[cfg(debug_assertions)]
365        {
366            // Run ends must be strictly sorted for binary search to work correctly.
367            let pre_validation = ends.statistics().to_owned();
368
369            let is_sorted = ends
370                .statistics()
371                .compute_is_strict_sorted(ctx)
372                .unwrap_or(false);
373
374            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
375            // We don't want to run with different stats in debug mode and outside.
376            ends.statistics().inherit(pre_validation.iter());
377            debug_assert!(is_sorted);
378        }
379
380        // Skip host-only validation when ends are not host-resident.
381        if !ends.is_host() {
382            return Ok(());
383        }
384
385        // Validate the offset and length are valid for the given ends and values
386        if offset != 0 && length != 0 {
387            let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?;
388            if first_run_end < offset {
389                vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
390            }
391        }
392
393        let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?;
394        let min_required_end = offset + length;
395        if last_run_end < min_required_end {
396            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
397        }
398
399        Ok(())
400    }
401}
402
403impl RunEndData {
404    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
405    ///
406    /// Panics if any of the validation conditions described in [`RunEnd::try_new`] is
407    /// not satisfied.
408    ///
409    /// # Examples
410    ///
411    /// ```
412    /// # use vortex_array::arrays::BoolArray;
413    /// # use vortex_array::IntoArray;
414    /// # use vortex_array::{LEGACY_SESSION, VortexSessionExecute};
415    /// # use vortex_buffer::buffer;
416    /// # use vortex_error::VortexResult;
417    /// # use vortex_runend::RunEnd;
418    /// # fn main() -> VortexResult<()> {
419    /// let mut ctx = LEGACY_SESSION.create_execution_ctx();
420    /// let ends = buffer![2u8, 3u8].into_array();
421    /// let values = BoolArray::from_iter([false, true]).into_array();
422    /// let run_end = RunEnd::new(ends, values, &mut ctx);
423    ///
424    /// // Array encodes
425    /// assert_eq!(run_end.execute_scalar(0, &mut ctx)?, false.into());
426    /// assert_eq!(run_end.execute_scalar(1, &mut ctx)?, false.into());
427    /// assert_eq!(run_end.execute_scalar(2, &mut ctx)?, true.into());
428    /// # Ok(())
429    /// # }
430    /// ```
431    pub fn new(offset: usize) -> Self {
432        Self { offset }
433    }
434
435    /// Build a new `RunEndArray` without validation.
436    ///
437    /// # Safety
438    ///
439    /// The caller must ensure that all the validation performed in
440    /// [`RunEnd::try_new_offset_length`] is
441    /// satisfied before calling this function.
442    ///
443    /// See [`RunEnd::try_new_offset_length`] for the preconditions needed to build a new array.
444    pub unsafe fn new_unchecked(offset: usize) -> Self {
445        Self { offset }
446    }
447
448    /// Run the array through run-end encoding.
449    pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
450        if let Some(parray) = array.as_opt::<Primitive>() {
451            let (_ends, _values) = runend_encode(parray, ctx);
452            // SAFETY: runend_encode handles this
453            unsafe { Ok(Self::new_unchecked(0)) }
454        } else {
455            vortex_bail!("REE can only encode primitive arrays")
456        }
457    }
458
459    pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
460        RunEndDataParts {
461            ends,
462            values,
463            offset: self.offset,
464        }
465    }
466}
467
468impl ValidityVTable<RunEnd> for RunEnd {
469    fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
470        Ok(match array.values().validity()? {
471            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
472            Validity::AllInvalid => Validity::AllInvalid,
473            Validity::Array(values_validity) => Validity::Array(unsafe {
474                RunEnd::new_unchecked(
475                    array.ends().clone(),
476                    values_validity,
477                    array.offset(),
478                    array.len(),
479                )
480                .into_array()
481            }),
482        })
483    }
484}
485
486pub(super) fn run_end_canonicalize(
487    array: &RunEndArray,
488    ctx: &mut ExecutionCtx,
489) -> VortexResult<ArrayRef> {
490    let pends = array.ends().clone().execute_as("ends", ctx)?;
491
492    Ok(match array.dtype() {
493        DType::Bool(_) => {
494            let bools = array.values().clone().execute_as("values", ctx)?;
495            runend_decode_bools(pends, bools, array.offset(), array.len(), ctx)?
496        }
497        DType::Primitive(..) => {
498            let pvalues = array.values().clone().execute_as("values", ctx)?;
499            runend_decode_primitive(pends, pvalues, array.offset(), array.len(), ctx)?.into_array()
500        }
501        DType::Utf8(_) | DType::Binary(_) => {
502            let values = array
503                .values()
504                .clone()
505                .execute_as::<VarBinViewArray>("values", ctx)?;
506            runend_decode_varbinview(pends, values, array.offset(), array.len(), ctx)?.into_array()
507        }
508        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
509    })
510}
511
512#[cfg(test)]
513mod tests {
514    use std::sync::LazyLock;
515
516    use vortex_array::IntoArray;
517    use vortex_array::VortexSessionExecute;
518    use vortex_array::arrays::DictArray;
519    use vortex_array::arrays::VarBinViewArray;
520    use vortex_array::assert_arrays_eq;
521    use vortex_array::dtype::DType;
522    use vortex_array::dtype::Nullability;
523    use vortex_array::dtype::PType;
524    use vortex_array::session::ArraySession;
525    use vortex_buffer::buffer;
526    use vortex_session::VortexSession;
527
528    use crate::RunEnd;
529
530    static SESSION: LazyLock<VortexSession> =
531        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
532
533    #[test]
534    fn test_runend_constructor() {
535        let mut ctx = SESSION.create_execution_ctx();
536        let arr = RunEnd::new(
537            buffer![2u32, 5, 10].into_array(),
538            buffer![1i32, 2, 3].into_array(),
539            &mut ctx,
540        );
541        assert_eq!(arr.len(), 10);
542        assert_eq!(
543            arr.dtype(),
544            &DType::Primitive(PType::I32, Nullability::NonNullable)
545        );
546
547        // 0, 1 => 1
548        // 2, 3, 4 => 2
549        // 5, 6, 7, 8, 9 => 3
550        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
551        assert_arrays_eq!(arr.into_array(), expected);
552    }
553
554    #[test]
555    fn test_runend_utf8() {
556        let mut ctx = SESSION.create_execution_ctx();
557        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
558        let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values, &mut ctx);
559        assert_eq!(arr.len(), 10);
560        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
561
562        let expected =
563            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
564                .into_array();
565        assert_arrays_eq!(arr.into_array(), expected);
566    }
567
568    #[test]
569    fn test_runend_dict() {
570        let mut ctx = SESSION.create_execution_ctx();
571        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
572        let dict_codes = buffer![0u32, 1, 2].into_array();
573        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
574
575        let arr = RunEnd::try_new(
576            buffer![2u32, 5, 10].into_array(),
577            dict.into_array(),
578            &mut ctx,
579        )
580        .unwrap();
581        assert_eq!(arr.len(), 10);
582
583        let expected =
584            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
585                .into_array();
586        assert_arrays_eq!(arr.into_array(), expected);
587    }
588}