1use std::fmt::Debug;
5
6use vortex_array::arrays::PrimitiveVTable;
7use vortex_array::search_sorted::{SearchSorted, SearchSortedSide};
8use vortex_array::stats::{ArrayStats, StatsSetRef};
9use vortex_array::vtable::{ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityVTable};
10use vortex_array::{
11 Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, ToCanonical, vtable,
12};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, vortex_bail, vortex_ensure};
15use vortex_mask::Mask;
16use vortex_scalar::PValue;
17
18use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
19
20vtable!(RunEnd);
21
22impl VTable for RunEndVTable {
23 type Array = RunEndArray;
24 type Encoding = RunEndEncoding;
25
26 type ArrayVTable = Self;
27 type CanonicalVTable = Self;
28 type OperationsVTable = Self;
29 type ValidityVTable = Self;
30 type VisitorVTable = Self;
31 type ComputeVTable = NotSupported;
32 type EncodeVTable = Self;
33 type SerdeVTable = Self;
34
35 fn id(_encoding: &Self::Encoding) -> EncodingId {
36 EncodingId::new_ref("vortex.runend")
37 }
38
39 fn encoding(_array: &Self::Array) -> EncodingRef {
40 EncodingRef::new_ref(RunEndEncoding.as_ref())
41 }
42}
43
44#[derive(Clone, Debug)]
45pub struct RunEndArray {
46 ends: ArrayRef,
47 values: ArrayRef,
48 offset: usize,
49 length: usize,
50 stats_set: ArrayStats,
51}
52
53#[derive(Clone, Debug)]
54pub struct RunEndEncoding;
55
56impl RunEndArray {
57 fn validate(
58 ends: &dyn Array,
59 values: &dyn Array,
60 offset: usize,
61 length: usize,
62 ) -> VortexResult<()> {
63 vortex_ensure!(
65 ends.dtype().is_unsigned_int(),
66 "run ends must be unsigned integers, was {}",
67 ends.dtype(),
68 );
69 vortex_ensure!(
70 values.dtype().is_primitive() || values.dtype().is_boolean(),
71 "RunEnd array can only have Bool or Primitive values, {} given",
72 values.dtype()
73 );
74
75 vortex_ensure!(
76 ends.len() == values.len(),
77 "run ends len != run values len, {} != {}",
78 ends.len(),
79 values.len()
80 );
81
82 if ends.is_empty() {
84 vortex_ensure!(
85 offset == 0,
86 "non-zero offset provided for empty RunEndArray"
87 );
88 return Ok(());
89 }
90
91 if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
93 vortex_ensure!(is_sorted, "run ends must be monotonic");
94 } else {
95 vortex_bail!("run ends must report IsStrictSorted");
97 }
98
99 if offset != 0 && length != 0 {
101 let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
102 if first_run_end <= offset {
103 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
104 }
105 }
106
107 Ok(())
108 }
109}
110
111impl RunEndArray {
112 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
134 Self::try_new(ends, values).vortex_expect("RunEndArray new")
135 }
136
137 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
178 let length: usize = if ends.is_empty() {
179 0
180 } else {
181 ends.scalar_at(ends.len() - 1).as_ref().try_into()?
182 };
183
184 Self::try_new_offset_length(ends, values, 0, length)
185 }
186
187 pub fn try_new_offset_length(
191 ends: ArrayRef,
192 values: ArrayRef,
193 offset: usize,
194 length: usize,
195 ) -> VortexResult<Self> {
196 Self::validate(&ends, &values, offset, length)?;
197
198 Ok(Self {
199 ends,
200 values,
201 offset,
202 length,
203 stats_set: Default::default(),
204 })
205 }
206
207 pub unsafe fn new_unchecked(
216 ends: ArrayRef,
217 values: ArrayRef,
218 offset: usize,
219 length: usize,
220 ) -> Self {
221 Self {
222 ends,
223 values,
224 offset,
225 length,
226 stats_set: Default::default(),
227 }
228 }
229
230 pub fn find_physical_index(&self, index: usize) -> usize {
232 self.ends()
233 .as_primitive_typed()
234 .search_sorted(
235 &PValue::from(index + self.offset()),
236 SearchSortedSide::Right,
237 )
238 .to_ends_index(self.ends().len())
239 }
240
241 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
243 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
244 let (ends, values) = runend_encode(parray)?;
245 unsafe {
247 Ok(Self::new_unchecked(
248 ends.into_array(),
249 values,
250 0,
251 array.len(),
252 ))
253 }
254 } else {
255 vortex_bail!("REE can only encode primitive arrays")
256 }
257 }
258
259 #[inline]
263 pub fn offset(&self) -> usize {
264 self.offset
265 }
266
267 #[inline]
272 pub fn ends(&self) -> &ArrayRef {
273 &self.ends
274 }
275
276 #[inline]
281 pub fn values(&self) -> &ArrayRef {
282 &self.values
283 }
284}
285
286impl ArrayVTable<RunEndVTable> for RunEndVTable {
287 fn len(array: &RunEndArray) -> usize {
288 array.length
289 }
290
291 fn dtype(array: &RunEndArray) -> &DType {
292 array.values.dtype()
293 }
294
295 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
296 array.stats_set.to_ref(array.as_ref())
297 }
298}
299
300impl ValidityVTable<RunEndVTable> for RunEndVTable {
301 fn is_valid(array: &RunEndArray, index: usize) -> VortexResult<bool> {
302 let physical_idx = array.find_physical_index(index);
303 array.values().is_valid(physical_idx)
304 }
305
306 fn all_valid(array: &RunEndArray) -> VortexResult<bool> {
307 array.values().all_valid()
308 }
309
310 fn all_invalid(array: &RunEndArray) -> VortexResult<bool> {
311 array.values().all_invalid()
312 }
313
314 fn validity_mask(array: &RunEndArray) -> VortexResult<Mask> {
315 Ok(match array.values().validity_mask()? {
316 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
317 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
318 Mask::Values(values) => {
319 let ree_validity = unsafe {
322 RunEndArray::new_unchecked(
323 array.ends().clone(),
324 values.into_array(),
325 array.offset(),
326 array.len(),
327 )
328 .into_array()
329 };
330 Mask::from_buffer(ree_validity.to_bool()?.boolean_buffer().clone())
331 }
332 })
333 }
334}
335
336impl CanonicalVTable<RunEndVTable> for RunEndVTable {
337 fn canonicalize(array: &RunEndArray) -> VortexResult<Canonical> {
338 let pends = array.ends().to_primitive()?;
339 match array.dtype() {
340 DType::Bool(_) => {
341 let bools = array.values().to_bool()?;
342 runend_decode_bools(pends, bools, array.offset(), array.len()).map(Canonical::Bool)
343 }
344 DType::Primitive(..) => {
345 let pvalues = array.values().to_primitive()?;
346 runend_decode_primitive(pends, pvalues, array.offset(), array.len())
347 .map(Canonical::Primitive)
348 }
349 _ => vortex_bail!("Only Primitive and Bool values are supported"),
350 }
351 }
352}
353
354#[cfg(test)]
355mod tests {
356 use vortex_array::IntoArray;
357 use vortex_buffer::buffer;
358 use vortex_dtype::{DType, Nullability, PType};
359
360 use crate::RunEndArray;
361
362 #[test]
363 fn test_runend_constructor() {
364 let arr = RunEndArray::new(
365 buffer![2u32, 5, 10].into_array(),
366 buffer![1i32, 2, 3].into_array(),
367 );
368 assert_eq!(arr.len(), 10);
369 assert_eq!(
370 arr.dtype(),
371 &DType::Primitive(PType::I32, Nullability::NonNullable)
372 );
373
374 assert_eq!(arr.scalar_at(0), 1.into());
378 assert_eq!(arr.scalar_at(2), 2.into());
379 assert_eq!(arr.scalar_at(5), 3.into());
380 assert_eq!(arr.scalar_at(9), 3.into());
381 }
382}