vortex_runend/
array.rs

1use std::fmt::Debug;
2
3use vortex_array::arrays::PrimitiveArray;
4use vortex_array::compute::{
5    SearchSortedSide, scalar_at, search_sorted_usize, search_sorted_usize_many,
6};
7use vortex_array::stats::{ArrayStats, StatsSetRef};
8use vortex_array::variants::{BoolArrayTrait, PrimitiveArrayTrait};
9use vortex_array::vtable::{EncodingVTable, VTableRef};
10use vortex_array::{
11    Array, ArrayCanonicalImpl, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl,
12    ArrayVariantsImpl, Canonical, Encoding, EncodingId, IntoArray, SerdeMetadata, ToCanonical,
13    try_from_array_ref,
14};
15use vortex_buffer::Buffer;
16use vortex_dtype::DType;
17use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
18use vortex_mask::Mask;
19
20use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
21use crate::serde::RunEndMetadata;
22
23#[derive(Clone, Debug)]
24pub struct RunEndArray {
25    ends: ArrayRef,
26    values: ArrayRef,
27    offset: usize,
28    length: usize,
29    stats_set: ArrayStats,
30}
31
32try_from_array_ref!(RunEndArray);
33
34pub struct RunEndEncoding;
35impl Encoding for RunEndEncoding {
36    type Array = RunEndArray;
37    type Metadata = SerdeMetadata<RunEndMetadata>;
38}
39
40impl EncodingVTable for RunEndEncoding {
41    fn id(&self) -> EncodingId {
42        EncodingId::new_ref("vortex.runend")
43    }
44}
45
46impl RunEndArray {
47    pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
48        let length = if ends.is_empty() {
49            0
50        } else {
51            scalar_at(&ends, ends.len() - 1)?.as_ref().try_into()?
52        };
53        Self::with_offset_and_length(ends, values, 0, length)
54    }
55
56    pub(crate) fn with_offset_and_length(
57        ends: ArrayRef,
58        values: ArrayRef,
59        offset: usize,
60        length: usize,
61    ) -> VortexResult<Self> {
62        if !matches!(values.dtype(), &DType::Bool(_) | &DType::Primitive(_, _)) {
63            vortex_bail!(
64                "RunEnd array can only have Bool or Primitive values, {} given",
65                values.dtype()
66            );
67        }
68
69        if offset != 0 {
70            let first_run_end: usize = scalar_at(&ends, 0)?.as_ref().try_into()?;
71            if first_run_end <= offset {
72                vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
73            }
74        }
75
76        if !ends.dtype().is_unsigned_int() || ends.dtype().is_nullable() {
77            vortex_bail!(MismatchedTypes: "non-nullable unsigned int", ends.dtype());
78        }
79        if !ends.statistics().compute_is_strict_sorted().unwrap_or(true) {
80            vortex_bail!("Ends array must be strictly sorted");
81        }
82
83        Ok(Self {
84            ends,
85            values,
86            offset,
87            length,
88            stats_set: Default::default(),
89        })
90    }
91
92    /// Convert the given logical index to an index into the `values` array
93    pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
94        search_sorted_usize(self.ends(), index + self.offset(), SearchSortedSide::Right)
95            .map(|s| s.to_ends_index(self.ends().len()))
96    }
97
98    /// Convert a batch of logical indices into an index for the values. Expects indices to be adjusted by offset unlike
99    /// [Self::find_physical_index]
100    ///
101    /// See: [find_physical_index][Self::find_physical_index].
102    pub fn find_physical_indices(&self, indices: &[usize]) -> VortexResult<Buffer<u64>> {
103        search_sorted_usize_many(self.ends(), indices, SearchSortedSide::Right).map(|results| {
104            results
105                .into_iter()
106                .map(|result| result.to_ends_index(self.ends().len()) as u64)
107                .collect()
108        })
109    }
110
111    /// Run the array through run-end encoding.
112    pub fn encode(array: ArrayRef) -> VortexResult<Self> {
113        if let Ok(parray) = PrimitiveArray::try_from(array) {
114            let (ends, values) = runend_encode(&parray)?;
115            Self::try_new(ends.into_array(), values)
116        } else {
117            vortex_bail!("REE can only encode primitive arrays")
118        }
119    }
120
121    /// The offset that the `ends` is relative to.
122    ///
123    /// This is generally zero for a "new" array, and non-zero after a slicing operation.
124    #[inline]
125    pub fn offset(&self) -> usize {
126        self.offset
127    }
128
129    /// The encoded "ends" of value runs.
130    ///
131    /// The `i`-th element indicates that there is a run of the same value, beginning
132    /// at `ends[i]` (inclusive) and terminating at `ends[i+1]` (exclusive).
133    #[inline]
134    pub fn ends(&self) -> &ArrayRef {
135        &self.ends
136    }
137
138    /// The scalar values.
139    ///
140    /// The `i`-th element is the scalar value for the `i`-th repeated run. The run begins
141    /// at `ends[i]` (inclusive) and terminates at `ends[i+1]` (exclusive).
142    #[inline]
143    pub fn values(&self) -> &ArrayRef {
144        &self.values
145    }
146}
147
148impl ArrayImpl for RunEndArray {
149    type Encoding = RunEndEncoding;
150
151    fn _len(&self) -> usize {
152        self.length
153    }
154
155    fn _dtype(&self) -> &DType {
156        self.values.dtype()
157    }
158
159    fn _vtable(&self) -> VTableRef {
160        VTableRef::new_ref(&RunEndEncoding)
161    }
162}
163
164impl ArrayVariantsImpl for RunEndArray {
165    fn _as_bool_typed(&self) -> Option<&dyn BoolArrayTrait> {
166        Some(self)
167    }
168
169    fn _as_primitive_typed(&self) -> Option<&dyn PrimitiveArrayTrait> {
170        Some(self)
171    }
172}
173
174impl PrimitiveArrayTrait for RunEndArray {}
175
176impl BoolArrayTrait for RunEndArray {}
177
178impl ArrayValidityImpl for RunEndArray {
179    fn _is_valid(&self, index: usize) -> VortexResult<bool> {
180        let physical_idx = self
181            .find_physical_index(index)
182            .vortex_expect("Invalid index");
183        self.values().is_valid(physical_idx)
184    }
185
186    fn _all_valid(&self) -> VortexResult<bool> {
187        self.values().all_valid()
188    }
189
190    fn _all_invalid(&self) -> VortexResult<bool> {
191        self.values().all_invalid()
192    }
193
194    fn _validity_mask(&self) -> VortexResult<Mask> {
195        Ok(match self.values().validity_mask()? {
196            Mask::AllTrue(_) => Mask::AllTrue(self.len()),
197            Mask::AllFalse(_) => Mask::AllFalse(self.len()),
198            Mask::Values(values) => {
199                let ree_validity = RunEndArray::with_offset_and_length(
200                    self.ends().clone(),
201                    values.into_array(),
202                    self.offset(),
203                    self.len(),
204                )
205                .vortex_expect("invalid array")
206                .into_array();
207                Mask::from_buffer(ree_validity.to_bool()?.boolean_buffer().clone())
208            }
209        })
210    }
211}
212
213impl ArrayCanonicalImpl for RunEndArray {
214    fn _to_canonical(&self) -> VortexResult<Canonical> {
215        let pends = self.ends().to_primitive()?;
216        match self.dtype() {
217            DType::Bool(_) => {
218                let bools = self.values().to_bool()?;
219                runend_decode_bools(pends, bools, self.offset(), self.len()).map(Canonical::Bool)
220            }
221            DType::Primitive(..) => {
222                let pvalues = self.values().to_primitive()?;
223                runend_decode_primitive(pends, pvalues, self.offset(), self.len())
224                    .map(Canonical::Primitive)
225            }
226            _ => vortex_bail!("Only Primitive and Bool values are supported"),
227        }
228    }
229}
230
231impl ArrayStatisticsImpl for RunEndArray {
232    fn _stats_ref(&self) -> StatsSetRef<'_> {
233        self.stats_set.to_ref(self)
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use vortex_array::compute::scalar_at;
240    use vortex_array::{Array, IntoArray};
241    use vortex_buffer::buffer;
242    use vortex_dtype::{DType, Nullability, PType};
243
244    use crate::RunEndArray;
245
246    #[test]
247    fn test_runend_constructor() {
248        let arr = RunEndArray::try_new(
249            buffer![2u32, 5, 10].into_array(),
250            buffer![1i32, 2, 3].into_array(),
251        )
252        .unwrap();
253        assert_eq!(arr.len(), 10);
254        assert_eq!(
255            arr.dtype(),
256            &DType::Primitive(PType::I32, Nullability::NonNullable)
257        );
258
259        // 0, 1 => 1
260        // 2, 3, 4 => 2
261        // 5, 6, 7, 8, 9 => 3
262        assert_eq!(scalar_at(&arr, 0).unwrap(), 1.into());
263        assert_eq!(scalar_at(&arr, 2).unwrap(), 2.into());
264        assert_eq!(scalar_at(&arr, 5).unwrap(), 3.into());
265        assert_eq!(scalar_at(&arr, 9).unwrap(), 3.into());
266    }
267}