Skip to main content

vortex_runend/
array.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::validity::Validity;
36use vortex_array::vtable::VTable;
37use vortex_array::vtable::ValidityVTable;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_session::VortexSession;
44use vortex_session::registry::CachedId;
45
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_decode_varbinview;
48use crate::compress::runend_encode;
49use crate::decompress_bool::runend_decode_bools;
50use crate::kernel::PARENT_KERNELS;
51use crate::rules::RULES;
52
53/// A [`RunEnd`]-encoded Vortex array.
54pub type RunEndArray = Array<RunEnd>;
55
56#[derive(Clone, prost::Message)]
57pub struct RunEndMetadata {
58    #[prost(enumeration = "PType", tag = "1")]
59    pub ends_ptype: i32,
60    #[prost(uint64, tag = "2")]
61    pub num_runs: u64,
62    #[prost(uint64, tag = "3")]
63    pub offset: u64,
64}
65
66impl ArrayHash for RunEndData {
67    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
68        self.offset.hash(state);
69    }
70}
71
72impl ArrayEq for RunEndData {
73    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
74        self.offset == other.offset
75    }
76}
77
78impl VTable for RunEnd {
79    type ArrayData = RunEndData;
80
81    type OperationsVTable = Self;
82    type ValidityVTable = Self;
83
84    fn id(&self) -> ArrayId {
85        static ID: CachedId = CachedId::new("vortex.runend");
86        *ID
87    }
88
89    fn validate(
90        &self,
91        data: &Self::ArrayData,
92        dtype: &DType,
93        len: usize,
94        slots: &[Option<ArrayRef>],
95    ) -> VortexResult<()> {
96        let ends = slots[ENDS_SLOT]
97            .as_ref()
98            .vortex_expect("RunEndArray ends slot");
99        let values = slots[VALUES_SLOT]
100            .as_ref()
101            .vortex_expect("RunEndArray values slot");
102        // TODO(ctx): trait fixes - VTable::validate has a fixed signature.
103        let mut ctx = LEGACY_SESSION.create_execution_ctx();
104        RunEndData::validate_parts(ends, values, data.offset, len, &mut ctx)?;
105        vortex_ensure!(
106            values.dtype() == dtype,
107            "expected dtype {}, got {}",
108            dtype,
109            values.dtype()
110        );
111        Ok(())
112    }
113
114    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
115        0
116    }
117
118    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
119        vortex_panic!("RunEndArray buffer index {idx} out of bounds")
120    }
121
122    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
123        vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
124    }
125
126    fn serialize(
127        array: ArrayView<'_, Self>,
128        _session: &VortexSession,
129    ) -> VortexResult<Option<Vec<u8>>> {
130        Ok(Some(
131            RunEndMetadata {
132                ends_ptype: PType::try_from(array.ends().dtype())
133                    .vortex_expect("Must be a valid PType") as i32,
134                num_runs: array.ends().len() as u64,
135                offset: array.offset() as u64,
136            }
137            .encode_to_vec(),
138        ))
139    }
140
141    fn deserialize(
142        &self,
143        dtype: &DType,
144        len: usize,
145        metadata: &[u8],
146        _buffers: &[BufferHandle],
147        children: &dyn ArrayChildren,
148        _session: &VortexSession,
149    ) -> VortexResult<ArrayParts<Self>> {
150        let metadata = RunEndMetadata::decode(metadata)?;
151        let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
152        let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
153        let ends = children.get(0, &ends_dtype, runs)?;
154
155        let values = children.get(1, dtype, runs)?;
156        let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
157        let slots = vec![Some(ends), Some(values)];
158        let data = RunEndData::new(offset);
159        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
160    }
161
162    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
163        SLOT_NAMES[idx].to_string()
164    }
165
166    fn reduce_parent(
167        array: ArrayView<'_, Self>,
168        parent: &ArrayRef,
169        child_idx: usize,
170    ) -> VortexResult<Option<ArrayRef>> {
171        RULES.evaluate(array, parent, child_idx)
172    }
173
174    fn execute_parent(
175        array: ArrayView<'_, Self>,
176        parent: &ArrayRef,
177        child_idx: usize,
178        ctx: &mut ExecutionCtx,
179    ) -> VortexResult<Option<ArrayRef>> {
180        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
181    }
182
183    fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
184        run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
185    }
186}
187
188/// The run-end positions marking where each run terminates.
189pub(super) const ENDS_SLOT: usize = 0;
190/// The values for each run.
191pub(super) const VALUES_SLOT: usize = 1;
192pub(super) const NUM_SLOTS: usize = 2;
193pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
194
195#[derive(Clone, Debug)]
196pub struct RunEndData {
197    offset: usize,
198}
199
200impl Display for RunEndData {
201    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202        write!(f, "offset: {}", self.offset)
203    }
204}
205
206pub struct RunEndDataParts {
207    pub ends: ArrayRef,
208    pub values: ArrayRef,
209    pub offset: usize,
210}
211
212pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
213    fn offset(&self) -> usize {
214        self.offset
215    }
216
217    fn ends(&self) -> &ArrayRef {
218        self.as_ref().slots()[ENDS_SLOT]
219            .as_ref()
220            .vortex_expect("RunEndArray ends slot")
221    }
222
223    fn values(&self) -> &ArrayRef {
224        self.as_ref().slots()[VALUES_SLOT]
225            .as_ref()
226            .vortex_expect("RunEndArray values slot")
227    }
228
229    fn dtype(&self) -> &DType {
230        self.values().dtype()
231    }
232
233    fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
234        Ok(self
235            .ends()
236            .as_primitive_typed()
237            .search_sorted(
238                &PValue::from(index + self.offset()),
239                SearchSortedSide::Right,
240            )?
241            .to_ends_index(self.ends().len()))
242    }
243}
244impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
245
246#[derive(Clone, Debug)]
247pub struct RunEnd;
248
249impl RunEnd {
250    /// Build a new [`RunEndArray`] without validation.
251    ///
252    /// # Safety
253    /// See [`RunEndData::new_unchecked`] for preconditions.
254    pub unsafe fn new_unchecked(
255        ends: ArrayRef,
256        values: ArrayRef,
257        offset: usize,
258        length: usize,
259    ) -> RunEndArray {
260        let dtype = values.dtype().clone();
261        let slots = vec![Some(ends), Some(values)];
262        let data = unsafe { RunEndData::new_unchecked(offset) };
263        unsafe {
264            Array::from_parts_unchecked(
265                ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
266            )
267        }
268    }
269
270    /// Build a new [`RunEndArray`] from ends and values.
271    pub fn try_new(
272        ends: ArrayRef,
273        values: ArrayRef,
274        ctx: &mut ExecutionCtx,
275    ) -> VortexResult<RunEndArray> {
276        let len = RunEndData::logical_len_from_ends(&ends, ctx)?;
277        RunEndData::validate_parts(&ends, &values, 0, len, ctx)?;
278        let dtype = values.dtype().clone();
279        let slots = vec![Some(ends), Some(values)];
280        let data = RunEndData::new(0);
281        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
282    }
283
284    /// Build a new [`RunEndArray`] from ends, values, offset, and length.
285    pub fn try_new_offset_length(
286        ends: ArrayRef,
287        values: ArrayRef,
288        offset: usize,
289        length: usize,
290        ctx: &mut ExecutionCtx,
291    ) -> VortexResult<RunEndArray> {
292        RunEndData::validate_parts(&ends, &values, offset, length, ctx)?;
293        let dtype = values.dtype().clone();
294        let slots = vec![Some(ends), Some(values)];
295        let data = RunEndData::new(offset);
296        Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
297    }
298
299    /// Build a new [`RunEndArray`] from ends and values (panics on invalid input).
300    pub fn new(ends: ArrayRef, values: ArrayRef, ctx: &mut ExecutionCtx) -> RunEndArray {
301        Self::try_new(ends, values, ctx).vortex_expect("RunEndData is always valid")
302    }
303
304    /// Run the array through run-end encoding.
305    pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<RunEndArray> {
306        if let Some(parray) = array.as_opt::<Primitive>() {
307            let (ends, values) = runend_encode(parray, ctx);
308            let ends = ends.into_array();
309            let len = array.len();
310            let dtype = values.dtype().clone();
311            let slots = vec![Some(ends), Some(values)];
312            let data = unsafe { RunEndData::new_unchecked(0) };
313            Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
314        } else {
315            vortex_bail!("REE can only encode primitive arrays")
316        }
317    }
318}
319
320impl RunEndData {
321    fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
322        if ends.is_empty() {
323            Ok(0)
324        } else {
325            usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)
326        }
327    }
328
329    pub(crate) fn validate_parts(
330        ends: &ArrayRef,
331        values: &ArrayRef,
332        offset: usize,
333        length: usize,
334        ctx: &mut ExecutionCtx,
335    ) -> VortexResult<()> {
336        // DType validation
337        vortex_ensure!(
338            ends.dtype().is_unsigned_int(),
339            "run ends must be unsigned integers, was {}",
340            ends.dtype(),
341        );
342        vortex_ensure!(
343            ends.len() == values.len(),
344            "run ends len != run values len, {} != {}",
345            ends.len(),
346            values.len()
347        );
348
349        // Handle empty run-ends
350        if ends.is_empty() {
351            vortex_ensure!(
352                offset == 0,
353                "non-zero offset provided for empty RunEndArray"
354            );
355            return Ok(());
356        }
357
358        // Zero-length logical slices may retain run metadata from the source array.
359        if length == 0 {
360            return Ok(());
361        }
362
363        #[cfg(debug_assertions)]
364        {
365            // Run ends must be strictly sorted for binary search to work correctly.
366            let pre_validation = ends.statistics().to_owned();
367
368            let is_sorted = ends
369                .statistics()
370                .compute_is_strict_sorted(ctx)
371                .unwrap_or(false);
372
373            // Preserve the original statistics since compute_is_strict_sorted may have mutated them.
374            // We don't want to run with different stats in debug mode and outside.
375            ends.statistics().inherit(pre_validation.iter());
376            debug_assert!(is_sorted);
377        }
378
379        // Skip host-only validation when ends are not host-resident.
380        if !ends.is_host() {
381            return Ok(());
382        }
383
384        // Validate the offset and length are valid for the given ends and values
385        if offset != 0 && length != 0 {
386            let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?;
387            if first_run_end < offset {
388                vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
389            }
390        }
391
392        let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?;
393        let min_required_end = offset + length;
394        if last_run_end < min_required_end {
395            vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
396        }
397
398        Ok(())
399    }
400}
401
402impl RunEndData {
403    /// Build a new `RunEndArray` from an array of run `ends` and an array of `values`.
404    ///
405    /// Panics if any of the validation conditions described in [`RunEnd::try_new`] is
406    /// not satisfied.
407    ///
408    /// # Examples
409    ///
410    /// ```
411    /// # use vortex_array::arrays::BoolArray;
412    /// # use vortex_array::IntoArray;
413    /// # use vortex_array::{LEGACY_SESSION, VortexSessionExecute};
414    /// # use vortex_buffer::buffer;
415    /// # use vortex_error::VortexResult;
416    /// # use vortex_runend::RunEnd;
417    /// # fn main() -> VortexResult<()> {
418    /// let mut ctx = LEGACY_SESSION.create_execution_ctx();
419    /// let ends = buffer![2u8, 3u8].into_array();
420    /// let values = BoolArray::from_iter([false, true]).into_array();
421    /// let run_end = RunEnd::new(ends, values, &mut ctx);
422    ///
423    /// // Array encodes
424    /// assert_eq!(run_end.execute_scalar(0, &mut ctx)?, false.into());
425    /// assert_eq!(run_end.execute_scalar(1, &mut ctx)?, false.into());
426    /// assert_eq!(run_end.execute_scalar(2, &mut ctx)?, true.into());
427    /// # Ok(())
428    /// # }
429    /// ```
430    pub fn new(offset: usize) -> Self {
431        Self { offset }
432    }
433
434    /// Build a new `RunEndArray` without validation.
435    ///
436    /// # Safety
437    ///
438    /// The caller must ensure that all the validation performed in
439    /// [`RunEnd::try_new_offset_length`] is
440    /// satisfied before calling this function.
441    ///
442    /// See [`RunEnd::try_new_offset_length`] for the preconditions needed to build a new array.
443    pub unsafe fn new_unchecked(offset: usize) -> Self {
444        Self { offset }
445    }
446
447    /// Run the array through run-end encoding.
448    pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
449        if let Some(parray) = array.as_opt::<Primitive>() {
450            let (_ends, _values) = runend_encode(parray, ctx);
451            // SAFETY: runend_encode handles this
452            unsafe { Ok(Self::new_unchecked(0)) }
453        } else {
454            vortex_bail!("REE can only encode primitive arrays")
455        }
456    }
457
458    pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
459        RunEndDataParts {
460            ends,
461            values,
462            offset: self.offset,
463        }
464    }
465}
466
467impl ValidityVTable<RunEnd> for RunEnd {
468    fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
469        Ok(match array.values().validity()? {
470            Validity::NonNullable | Validity::AllValid => Validity::AllValid,
471            Validity::AllInvalid => Validity::AllInvalid,
472            Validity::Array(values_validity) => Validity::Array(unsafe {
473                RunEnd::new_unchecked(
474                    array.ends().clone(),
475                    values_validity,
476                    array.offset(),
477                    array.len(),
478                )
479                .into_array()
480            }),
481        })
482    }
483}
484
485pub(super) fn run_end_canonicalize(
486    array: &RunEndArray,
487    ctx: &mut ExecutionCtx,
488) -> VortexResult<ArrayRef> {
489    let pends = array.ends().clone().execute_as("ends", ctx)?;
490
491    Ok(match array.dtype() {
492        DType::Bool(_) => {
493            let bools = array.values().clone().execute_as("values", ctx)?;
494            runend_decode_bools(pends, bools, array.offset(), array.len(), ctx)?
495        }
496        DType::Primitive(..) => {
497            let pvalues = array.values().clone().execute_as("values", ctx)?;
498            runend_decode_primitive(pends, pvalues, array.offset(), array.len(), ctx)?.into_array()
499        }
500        DType::Utf8(_) | DType::Binary(_) => {
501            let values = array
502                .values()
503                .clone()
504                .execute_as::<VarBinViewArray>("values", ctx)?;
505            runend_decode_varbinview(pends, values, array.offset(), array.len(), ctx)?.into_array()
506        }
507        _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
508    })
509}
510
511#[cfg(test)]
512mod tests {
513    use std::sync::LazyLock;
514
515    use vortex_array::IntoArray;
516    use vortex_array::VortexSessionExecute;
517    use vortex_array::arrays::DictArray;
518    use vortex_array::arrays::VarBinViewArray;
519    use vortex_array::assert_arrays_eq;
520    use vortex_array::dtype::DType;
521    use vortex_array::dtype::Nullability;
522    use vortex_array::dtype::PType;
523    use vortex_array::session::ArraySession;
524    use vortex_buffer::buffer;
525    use vortex_session::VortexSession;
526
527    use crate::RunEnd;
528
529    static SESSION: LazyLock<VortexSession> =
530        LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
531
532    #[test]
533    fn test_runend_constructor() {
534        let mut ctx = SESSION.create_execution_ctx();
535        let arr = RunEnd::new(
536            buffer![2u32, 5, 10].into_array(),
537            buffer![1i32, 2, 3].into_array(),
538            &mut ctx,
539        );
540        assert_eq!(arr.len(), 10);
541        assert_eq!(
542            arr.dtype(),
543            &DType::Primitive(PType::I32, Nullability::NonNullable)
544        );
545
546        // 0, 1 => 1
547        // 2, 3, 4 => 2
548        // 5, 6, 7, 8, 9 => 3
549        let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
550        assert_arrays_eq!(arr.into_array(), expected);
551    }
552
553    #[test]
554    fn test_runend_utf8() {
555        let mut ctx = SESSION.create_execution_ctx();
556        let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
557        let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values, &mut ctx);
558        assert_eq!(arr.len(), 10);
559        assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
560
561        let expected =
562            VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
563                .into_array();
564        assert_arrays_eq!(arr.into_array(), expected);
565    }
566
567    #[test]
568    fn test_runend_dict() {
569        let mut ctx = SESSION.create_execution_ctx();
570        let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
571        let dict_codes = buffer![0u32, 1, 2].into_array();
572        let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
573
574        let arr = RunEnd::try_new(
575            buffer![2u32, 5, 10].into_array(),
576            dict.into_array(),
577            &mut ctx,
578        )
579        .unwrap();
580        assert_eq!(arr.len(), 10);
581
582        let expected =
583            VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
584                .into_array();
585        assert_arrays_eq!(arr.into_array(), expected);
586    }
587}