1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::arrays::PrimitiveVTable;
8use vortex_array::search_sorted::{SearchSorted, SearchSortedSide};
9use vortex_array::stats::{ArrayStats, StatsSetRef};
10use vortex_array::vtable::{ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityVTable};
11use vortex_array::{
12 Array, ArrayEq, ArrayHash, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, Precision,
13 ToCanonical, vtable,
14};
15use vortex_dtype::DType;
16use vortex_error::{VortexExpect as _, VortexResult, vortex_bail, vortex_ensure, vortex_panic};
17use vortex_mask::Mask;
18use vortex_scalar::PValue;
19
20use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
21
22vtable!(RunEnd);
23
24impl VTable for RunEndVTable {
25 type Array = RunEndArray;
26 type Encoding = RunEndEncoding;
27
28 type ArrayVTable = Self;
29 type CanonicalVTable = Self;
30 type OperationsVTable = Self;
31 type ValidityVTable = Self;
32 type VisitorVTable = Self;
33 type ComputeVTable = NotSupported;
34 type EncodeVTable = Self;
35 type SerdeVTable = Self;
36 type OperatorVTable = NotSupported;
37
38 fn id(_encoding: &Self::Encoding) -> EncodingId {
39 EncodingId::new_ref("vortex.runend")
40 }
41
42 fn encoding(_array: &Self::Array) -> EncodingRef {
43 EncodingRef::new_ref(RunEndEncoding.as_ref())
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct RunEndArray {
49 ends: ArrayRef,
50 values: ArrayRef,
51 offset: usize,
52 length: usize,
53 stats_set: ArrayStats,
54}
55
56#[derive(Clone, Debug)]
57pub struct RunEndEncoding;
58
59impl RunEndArray {
60 fn validate(
61 ends: &dyn Array,
62 values: &dyn Array,
63 offset: usize,
64 length: usize,
65 ) -> VortexResult<()> {
66 vortex_ensure!(
68 ends.dtype().is_unsigned_int(),
69 "run ends must be unsigned integers, was {}",
70 ends.dtype(),
71 );
72 vortex_ensure!(
73 values.dtype().is_primitive() || values.dtype().is_boolean(),
74 "RunEnd array can only have Bool or Primitive values, {} given",
75 values.dtype()
76 );
77
78 vortex_ensure!(
79 ends.len() == values.len(),
80 "run ends len != run values len, {} != {}",
81 ends.len(),
82 values.len()
83 );
84
85 if ends.is_empty() {
87 vortex_ensure!(
88 offset == 0,
89 "non-zero offset provided for empty RunEndArray"
90 );
91 return Ok(());
92 }
93
94 if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
96 vortex_ensure!(is_sorted, "run ends must be monotonic");
97 } else {
98 vortex_bail!("run ends must report IsStrictSorted");
100 }
101
102 if offset != 0 && length != 0 {
104 let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
105 if first_run_end <= offset {
106 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
107 }
108 }
109
110 Ok(())
111 }
112}
113
114impl RunEndArray {
115 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
137 Self::try_new(ends, values).vortex_expect("RunEndArray new")
138 }
139
140 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
181 let length: usize = if ends.is_empty() {
182 0
183 } else {
184 ends.scalar_at(ends.len() - 1).as_ref().try_into()?
185 };
186
187 Self::try_new_offset_length(ends, values, 0, length)
188 }
189
190 pub fn try_new_offset_length(
194 ends: ArrayRef,
195 values: ArrayRef,
196 offset: usize,
197 length: usize,
198 ) -> VortexResult<Self> {
199 Self::validate(&ends, &values, offset, length)?;
200
201 Ok(Self {
202 ends,
203 values,
204 offset,
205 length,
206 stats_set: Default::default(),
207 })
208 }
209
210 pub unsafe fn new_unchecked(
219 ends: ArrayRef,
220 values: ArrayRef,
221 offset: usize,
222 length: usize,
223 ) -> Self {
224 Self {
225 ends,
226 values,
227 offset,
228 length,
229 stats_set: Default::default(),
230 }
231 }
232
233 pub fn find_physical_index(&self, index: usize) -> usize {
235 self.ends()
236 .as_primitive_typed()
237 .search_sorted(
238 &PValue::from(index + self.offset()),
239 SearchSortedSide::Right,
240 )
241 .to_ends_index(self.ends().len())
242 }
243
244 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
246 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
247 let (ends, values) = runend_encode(parray);
248 unsafe {
250 Ok(Self::new_unchecked(
251 ends.into_array(),
252 values,
253 0,
254 array.len(),
255 ))
256 }
257 } else {
258 vortex_bail!("REE can only encode primitive arrays")
259 }
260 }
261
262 #[inline]
266 pub fn offset(&self) -> usize {
267 self.offset
268 }
269
270 #[inline]
275 pub fn ends(&self) -> &ArrayRef {
276 &self.ends
277 }
278
279 #[inline]
284 pub fn values(&self) -> &ArrayRef {
285 &self.values
286 }
287}
288
289impl ArrayVTable<RunEndVTable> for RunEndVTable {
290 fn len(array: &RunEndArray) -> usize {
291 array.length
292 }
293
294 fn dtype(array: &RunEndArray) -> &DType {
295 array.values.dtype()
296 }
297
298 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
299 array.stats_set.to_ref(array.as_ref())
300 }
301
302 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
303 array.ends.array_hash(state, precision);
304 array.values.array_hash(state, precision);
305 array.offset.hash(state);
306 array.length.hash(state);
307 }
308
309 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
310 array.ends.array_eq(&other.ends, precision)
311 && array.values.array_eq(&other.values, precision)
312 && array.offset == other.offset
313 && array.length == other.length
314 }
315}
316
317impl ValidityVTable<RunEndVTable> for RunEndVTable {
318 fn is_valid(array: &RunEndArray, index: usize) -> bool {
319 let physical_idx = array.find_physical_index(index);
320 array.values().is_valid(physical_idx)
321 }
322
323 fn all_valid(array: &RunEndArray) -> bool {
324 array.values().all_valid()
325 }
326
327 fn all_invalid(array: &RunEndArray) -> bool {
328 array.values().all_invalid()
329 }
330
331 fn validity_mask(array: &RunEndArray) -> Mask {
332 match array.values().validity_mask() {
333 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
334 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
335 Mask::Values(values) => {
336 let ree_validity = unsafe {
339 RunEndArray::new_unchecked(
340 array.ends().clone(),
341 values.into_array(),
342 array.offset(),
343 array.len(),
344 )
345 .into_array()
346 };
347 Mask::from_buffer(ree_validity.to_bool().bit_buffer().clone())
348 }
349 }
350 }
351}
352
353impl CanonicalVTable<RunEndVTable> for RunEndVTable {
354 fn canonicalize(array: &RunEndArray) -> Canonical {
355 let pends = array.ends().to_primitive();
356 match array.dtype() {
357 DType::Bool(_) => {
358 let bools = array.values().to_bool();
359 Canonical::Bool(runend_decode_bools(
360 pends,
361 bools,
362 array.offset(),
363 array.len(),
364 ))
365 }
366 DType::Primitive(..) => {
367 let pvalues = array.values().to_primitive();
368 Canonical::Primitive(runend_decode_primitive(
369 pends,
370 pvalues,
371 array.offset(),
372 array.len(),
373 ))
374 }
375 _ => vortex_panic!("Only Primitive and Bool values are supported"),
376 }
377 }
378}
379
380#[cfg(test)]
381mod tests {
382 use vortex_array::{IntoArray, assert_arrays_eq};
383 use vortex_buffer::buffer;
384 use vortex_dtype::{DType, Nullability, PType};
385
386 use crate::RunEndArray;
387
388 #[test]
389 fn test_runend_constructor() {
390 let arr = RunEndArray::new(
391 buffer![2u32, 5, 10].into_array(),
392 buffer![1i32, 2, 3].into_array(),
393 );
394 assert_eq!(arr.len(), 10);
395 assert_eq!(
396 arr.dtype(),
397 &DType::Primitive(PType::I32, Nullability::NonNullable)
398 );
399
400 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
404 assert_arrays_eq!(arr.to_array(), expected);
405 }
406}