1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::DeserializeMetadata;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::ToCanonical;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::buffer::BufferHandle;
20use vortex_array::search_sorted::SearchSorted;
21use vortex_array::search_sorted::SearchSortedSide;
22use vortex_array::serde::ArrayChildren;
23use vortex_array::stats::ArrayStats;
24use vortex_array::stats::StatsSetRef;
25use vortex_array::validity::Validity;
26use vortex_array::vtable;
27use vortex_array::vtable::ArrayId;
28use vortex_array::vtable::ArrayVTable;
29use vortex_array::vtable::ArrayVTableExt;
30use vortex_array::vtable::BaseArrayVTable;
31use vortex_array::vtable::CanonicalVTable;
32use vortex_array::vtable::NotSupported;
33use vortex_array::vtable::VTable;
34use vortex_array::vtable::ValidityVTable;
35use vortex_dtype::DType;
36use vortex_dtype::Nullability;
37use vortex_dtype::PType;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_mask::Mask;
44use vortex_scalar::PValue;
45
46use crate::compress::runend_decode_bools;
47use crate::compress::runend_decode_primitive;
48use crate::compress::runend_encode;
49use crate::rules::RULES;
50
51vtable!(RunEnd);
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55 #[prost(enumeration = "PType", tag = "1")]
56 pub ends_ptype: i32,
57 #[prost(uint64, tag = "2")]
58 pub num_runs: u64,
59 #[prost(uint64, tag = "3")]
60 pub offset: u64,
61}
62
63impl VTable for RunEndVTable {
64 type Array = RunEndArray;
65
66 type Metadata = ProstMetadata<RunEndMetadata>;
67
68 type ArrayVTable = Self;
69 type CanonicalVTable = Self;
70 type OperationsVTable = Self;
71 type ValidityVTable = Self;
72 type VisitorVTable = Self;
73 type ComputeVTable = NotSupported;
74 type EncodeVTable = Self;
75
76 fn id(&self) -> ArrayId {
77 ArrayId::new_ref("vortex.runend")
78 }
79
80 fn encoding(_array: &Self::Array) -> ArrayVTable {
81 RunEndVTable.as_vtable()
82 }
83
84 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
85 Ok(ProstMetadata(RunEndMetadata {
86 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
87 as i32,
88 num_runs: array.ends().len() as u64,
89 offset: array.offset() as u64,
90 }))
91 }
92
93 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
94 Ok(Some(metadata.serialize()))
95 }
96
97 fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
98 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(buffer)?;
99 Ok(ProstMetadata(inner))
100 }
101
102 fn build(
103 &self,
104 dtype: &DType,
105 len: usize,
106 metadata: &Self::Metadata,
107 _buffers: &[BufferHandle],
108 children: &dyn ArrayChildren,
109 ) -> VortexResult<RunEndArray> {
110 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
111 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
112 let ends = children.get(0, &ends_dtype, runs)?;
113
114 let values = children.get(1, dtype, runs)?;
115
116 RunEndArray::try_new_offset_length(
117 ends,
118 values,
119 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
120 len,
121 )
122 }
123
124 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
125 vortex_ensure!(
126 children.len() == 2,
127 "RunEndArray expects 2 children, got {}",
128 children.len()
129 );
130
131 let mut children_iter = children.into_iter();
132 array.ends = children_iter.next().vortex_expect("ends child");
133 array.values = children_iter.next().vortex_expect("values child");
134
135 Ok(())
136 }
137
138 fn reduce_parent(
139 array: &Self::Array,
140 parent: &ArrayRef,
141 child_idx: usize,
142 ) -> VortexResult<Option<ArrayRef>> {
143 RULES.evaluate(array, parent, child_idx)
144 }
145}
146
147#[derive(Clone, Debug)]
148pub struct RunEndArray {
149 ends: ArrayRef,
150 values: ArrayRef,
151 offset: usize,
152 length: usize,
153 stats_set: ArrayStats,
154}
155
156#[derive(Debug)]
157pub struct RunEndVTable;
158
159impl RunEndArray {
160 fn validate(
161 ends: &dyn Array,
162 values: &dyn Array,
163 offset: usize,
164 length: usize,
165 ) -> VortexResult<()> {
166 vortex_ensure!(
168 ends.dtype().is_unsigned_int(),
169 "run ends must be unsigned integers, was {}",
170 ends.dtype(),
171 );
172 vortex_ensure!(
173 values.dtype().is_primitive() || values.dtype().is_boolean(),
174 "RunEnd array can only have Bool or Primitive values, {} given",
175 values.dtype()
176 );
177
178 vortex_ensure!(
179 ends.len() == values.len(),
180 "run ends len != run values len, {} != {}",
181 ends.len(),
182 values.len()
183 );
184
185 if ends.is_empty() {
187 vortex_ensure!(
188 offset == 0,
189 "non-zero offset provided for empty RunEndArray"
190 );
191 return Ok(());
192 }
193
194 if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
196 vortex_ensure!(is_sorted, "run ends must be monotonic");
197 } else {
198 vortex_bail!("run ends must report IsStrictSorted");
200 }
201
202 if offset != 0 && length != 0 {
204 let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
205 if first_run_end <= offset {
206 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
207 }
208 }
209
210 Ok(())
211 }
212}
213
214impl RunEndArray {
215 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
237 Self::try_new(ends, values).vortex_expect("RunEndArray new")
238 }
239
240 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
281 let length: usize = if ends.is_empty() {
282 0
283 } else {
284 ends.scalar_at(ends.len() - 1).as_ref().try_into()?
285 };
286
287 Self::try_new_offset_length(ends, values, 0, length)
288 }
289
290 pub fn try_new_offset_length(
294 ends: ArrayRef,
295 values: ArrayRef,
296 offset: usize,
297 length: usize,
298 ) -> VortexResult<Self> {
299 Self::validate(&ends, &values, offset, length)?;
300
301 Ok(Self {
302 ends,
303 values,
304 offset,
305 length,
306 stats_set: Default::default(),
307 })
308 }
309
310 pub unsafe fn new_unchecked(
319 ends: ArrayRef,
320 values: ArrayRef,
321 offset: usize,
322 length: usize,
323 ) -> Self {
324 Self {
325 ends,
326 values,
327 offset,
328 length,
329 stats_set: Default::default(),
330 }
331 }
332
333 pub fn find_physical_index(&self, index: usize) -> usize {
335 self.ends()
336 .as_primitive_typed()
337 .search_sorted(
338 &PValue::from(index + self.offset()),
339 SearchSortedSide::Right,
340 )
341 .to_ends_index(self.ends().len())
342 }
343
344 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
346 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
347 let (ends, values) = runend_encode(parray);
348 unsafe {
350 Ok(Self::new_unchecked(
351 ends.into_array(),
352 values,
353 0,
354 array.len(),
355 ))
356 }
357 } else {
358 vortex_bail!("REE can only encode primitive arrays")
359 }
360 }
361
362 #[inline]
366 pub fn offset(&self) -> usize {
367 self.offset
368 }
369
370 #[inline]
375 pub fn ends(&self) -> &ArrayRef {
376 &self.ends
377 }
378
379 #[inline]
384 pub fn values(&self) -> &ArrayRef {
385 &self.values
386 }
387}
388
389impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
390 fn len(array: &RunEndArray) -> usize {
391 array.length
392 }
393
394 fn dtype(array: &RunEndArray) -> &DType {
395 array.values.dtype()
396 }
397
398 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
399 array.stats_set.to_ref(array.as_ref())
400 }
401
402 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
403 array.ends.array_hash(state, precision);
404 array.values.array_hash(state, precision);
405 array.offset.hash(state);
406 array.length.hash(state);
407 }
408
409 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
410 array.ends.array_eq(&other.ends, precision)
411 && array.values.array_eq(&other.values, precision)
412 && array.offset == other.offset
413 && array.length == other.length
414 }
415}
416
417impl ValidityVTable<RunEndVTable> for RunEndVTable {
418 fn is_valid(array: &RunEndArray, index: usize) -> bool {
419 let physical_idx = array.find_physical_index(index);
420 array.values().is_valid(physical_idx)
421 }
422
423 fn all_valid(array: &RunEndArray) -> bool {
424 array.values().all_valid()
425 }
426
427 fn all_invalid(array: &RunEndArray) -> bool {
428 array.values().all_invalid()
429 }
430
431 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
432 Ok(match array.values().validity()? {
433 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
434 Validity::AllInvalid => Validity::AllInvalid,
435 Validity::Array(values_validity) => Validity::Array(unsafe {
436 RunEndArray::new_unchecked(
437 array.ends().clone(),
438 values_validity,
439 array.offset(),
440 array.len(),
441 )
442 .into_array()
443 }),
444 })
445 }
446
447 fn validity_mask(array: &RunEndArray) -> Mask {
448 match array.values().validity_mask() {
449 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
450 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
451 Mask::Values(values) => {
452 let ree_validity = unsafe {
455 RunEndArray::new_unchecked(
456 array.ends().clone(),
457 values.into_array(),
458 array.offset(),
459 array.len(),
460 )
461 .into_array()
462 };
463 Mask::from_buffer(ree_validity.to_bool().bit_buffer().clone())
464 }
465 }
466 }
467}
468
469impl CanonicalVTable<RunEndVTable> for RunEndVTable {
470 fn canonicalize(array: &RunEndArray) -> Canonical {
471 let pends = array.ends().to_primitive();
472 match array.dtype() {
473 DType::Bool(_) => {
474 let bools = array.values().to_bool();
475 Canonical::Bool(runend_decode_bools(
476 pends,
477 bools,
478 array.offset(),
479 array.len(),
480 ))
481 }
482 DType::Primitive(..) => {
483 let pvalues = array.values().to_primitive();
484 Canonical::Primitive(runend_decode_primitive(
485 pends,
486 pvalues,
487 array.offset(),
488 array.len(),
489 ))
490 }
491 _ => vortex_panic!("Only Primitive and Bool values are supported"),
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use vortex_array::IntoArray;
499 use vortex_array::assert_arrays_eq;
500 use vortex_buffer::buffer;
501 use vortex_dtype::DType;
502 use vortex_dtype::Nullability;
503 use vortex_dtype::PType;
504
505 use crate::RunEndArray;
506
507 #[test]
508 fn test_runend_constructor() {
509 let arr = RunEndArray::new(
510 buffer![2u32, 5, 10].into_array(),
511 buffer![1i32, 2, 3].into_array(),
512 );
513 assert_eq!(arr.len(), 10);
514 assert_eq!(
515 arr.dtype(),
516 &DType::Primitive(PType::I32, Nullability::NonNullable)
517 );
518
519 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
523 assert_arrays_eq!(arr.to_array(), expected);
524 }
525}