1use std::fmt::Debug;
5
6use vortex_array::arrays::PrimitiveVTable;
7use vortex_array::search_sorted::{SearchSorted, SearchSortedSide};
8use vortex_array::stats::{ArrayStats, StatsSetRef};
9use vortex_array::vtable::{ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityVTable};
10use vortex_array::{
11 Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, ToCanonical, vtable,
12};
13use vortex_dtype::DType;
14use vortex_error::{VortexExpect as _, VortexResult, vortex_bail, vortex_ensure, vortex_panic};
15use vortex_mask::Mask;
16use vortex_scalar::PValue;
17
18use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
19
20vtable!(RunEnd);
21
22impl VTable for RunEndVTable {
23 type Array = RunEndArray;
24 type Encoding = RunEndEncoding;
25
26 type ArrayVTable = Self;
27 type CanonicalVTable = Self;
28 type OperationsVTable = Self;
29 type ValidityVTable = Self;
30 type VisitorVTable = Self;
31 type ComputeVTable = NotSupported;
32 type EncodeVTable = Self;
33 type SerdeVTable = Self;
34 type PipelineVTable = NotSupported;
35
36 fn id(_encoding: &Self::Encoding) -> EncodingId {
37 EncodingId::new_ref("vortex.runend")
38 }
39
40 fn encoding(_array: &Self::Array) -> EncodingRef {
41 EncodingRef::new_ref(RunEndEncoding.as_ref())
42 }
43}
44
45#[derive(Clone, Debug)]
46pub struct RunEndArray {
47 ends: ArrayRef,
48 values: ArrayRef,
49 offset: usize,
50 length: usize,
51 stats_set: ArrayStats,
52}
53
54#[derive(Clone, Debug)]
55pub struct RunEndEncoding;
56
57impl RunEndArray {
58 fn validate(
59 ends: &dyn Array,
60 values: &dyn Array,
61 offset: usize,
62 length: usize,
63 ) -> VortexResult<()> {
64 vortex_ensure!(
66 ends.dtype().is_unsigned_int(),
67 "run ends must be unsigned integers, was {}",
68 ends.dtype(),
69 );
70 vortex_ensure!(
71 values.dtype().is_primitive() || values.dtype().is_boolean(),
72 "RunEnd array can only have Bool or Primitive values, {} given",
73 values.dtype()
74 );
75
76 vortex_ensure!(
77 ends.len() == values.len(),
78 "run ends len != run values len, {} != {}",
79 ends.len(),
80 values.len()
81 );
82
83 if ends.is_empty() {
85 vortex_ensure!(
86 offset == 0,
87 "non-zero offset provided for empty RunEndArray"
88 );
89 return Ok(());
90 }
91
92 if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
94 vortex_ensure!(is_sorted, "run ends must be monotonic");
95 } else {
96 vortex_bail!("run ends must report IsStrictSorted");
98 }
99
100 if offset != 0 && length != 0 {
102 let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
103 if first_run_end <= offset {
104 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
105 }
106 }
107
108 Ok(())
109 }
110}
111
112impl RunEndArray {
113 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
135 Self::try_new(ends, values).vortex_expect("RunEndArray new")
136 }
137
138 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
179 let length: usize = if ends.is_empty() {
180 0
181 } else {
182 ends.scalar_at(ends.len() - 1).as_ref().try_into()?
183 };
184
185 Self::try_new_offset_length(ends, values, 0, length)
186 }
187
188 pub fn try_new_offset_length(
192 ends: ArrayRef,
193 values: ArrayRef,
194 offset: usize,
195 length: usize,
196 ) -> VortexResult<Self> {
197 Self::validate(&ends, &values, offset, length)?;
198
199 Ok(Self {
200 ends,
201 values,
202 offset,
203 length,
204 stats_set: Default::default(),
205 })
206 }
207
208 pub unsafe fn new_unchecked(
217 ends: ArrayRef,
218 values: ArrayRef,
219 offset: usize,
220 length: usize,
221 ) -> Self {
222 Self {
223 ends,
224 values,
225 offset,
226 length,
227 stats_set: Default::default(),
228 }
229 }
230
231 pub fn find_physical_index(&self, index: usize) -> usize {
233 self.ends()
234 .as_primitive_typed()
235 .search_sorted(
236 &PValue::from(index + self.offset()),
237 SearchSortedSide::Right,
238 )
239 .to_ends_index(self.ends().len())
240 }
241
242 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
244 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
245 let (ends, values) = runend_encode(parray);
246 unsafe {
248 Ok(Self::new_unchecked(
249 ends.into_array(),
250 values,
251 0,
252 array.len(),
253 ))
254 }
255 } else {
256 vortex_bail!("REE can only encode primitive arrays")
257 }
258 }
259
260 #[inline]
264 pub fn offset(&self) -> usize {
265 self.offset
266 }
267
268 #[inline]
273 pub fn ends(&self) -> &ArrayRef {
274 &self.ends
275 }
276
277 #[inline]
282 pub fn values(&self) -> &ArrayRef {
283 &self.values
284 }
285}
286
287impl ArrayVTable<RunEndVTable> for RunEndVTable {
288 fn len(array: &RunEndArray) -> usize {
289 array.length
290 }
291
292 fn dtype(array: &RunEndArray) -> &DType {
293 array.values.dtype()
294 }
295
296 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
297 array.stats_set.to_ref(array.as_ref())
298 }
299}
300
301impl ValidityVTable<RunEndVTable> for RunEndVTable {
302 fn is_valid(array: &RunEndArray, index: usize) -> bool {
303 let physical_idx = array.find_physical_index(index);
304 array.values().is_valid(physical_idx)
305 }
306
307 fn all_valid(array: &RunEndArray) -> bool {
308 array.values().all_valid()
309 }
310
311 fn all_invalid(array: &RunEndArray) -> bool {
312 array.values().all_invalid()
313 }
314
315 fn validity_mask(array: &RunEndArray) -> Mask {
316 match array.values().validity_mask() {
317 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
318 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
319 Mask::Values(values) => {
320 let ree_validity = unsafe {
323 RunEndArray::new_unchecked(
324 array.ends().clone(),
325 values.into_array(),
326 array.offset(),
327 array.len(),
328 )
329 .into_array()
330 };
331 Mask::from_buffer(ree_validity.to_bool().boolean_buffer().clone())
332 }
333 }
334 }
335}
336
337impl CanonicalVTable<RunEndVTable> for RunEndVTable {
338 fn canonicalize(array: &RunEndArray) -> Canonical {
339 let pends = array.ends().to_primitive();
340 match array.dtype() {
341 DType::Bool(_) => {
342 let bools = array.values().to_bool();
343 Canonical::Bool(runend_decode_bools(
344 pends,
345 bools,
346 array.offset(),
347 array.len(),
348 ))
349 }
350 DType::Primitive(..) => {
351 let pvalues = array.values().to_primitive();
352 Canonical::Primitive(runend_decode_primitive(
353 pends,
354 pvalues,
355 array.offset(),
356 array.len(),
357 ))
358 }
359 _ => vortex_panic!("Only Primitive and Bool values are supported"),
360 }
361 }
362}
363
364#[cfg(test)]
365mod tests {
366 use vortex_array::IntoArray;
367 use vortex_buffer::buffer;
368 use vortex_dtype::{DType, Nullability, PType};
369
370 use crate::RunEndArray;
371
372 #[test]
373 fn test_runend_constructor() {
374 let arr = RunEndArray::new(
375 buffer![2u32, 5, 10].into_array(),
376 buffer![1i32, 2, 3].into_array(),
377 );
378 assert_eq!(arr.len(), 10);
379 assert_eq!(
380 arr.dtype(),
381 &DType::Primitive(PType::I32, Nullability::NonNullable)
382 );
383
384 assert_eq!(arr.scalar_at(0), 1.into());
388 assert_eq!(arr.scalar_at(2), 2.into());
389 assert_eq!(arr.scalar_at(5), 3.into());
390 assert_eq!(arr.scalar_at(9), 3.into());
391 }
392}