1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::Canonical;
12use vortex_array::DeserializeMetadata;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::ToCanonical;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::search_sorted::SearchSorted;
20use vortex_array::search_sorted::SearchSortedSide;
21use vortex_array::serde::ArrayChildren;
22use vortex_array::stats::ArrayStats;
23use vortex_array::stats::StatsSetRef;
24use vortex_array::vtable;
25use vortex_array::vtable::ArrayId;
26use vortex_array::vtable::ArrayVTable;
27use vortex_array::vtable::ArrayVTableExt;
28use vortex_array::vtable::BaseArrayVTable;
29use vortex_array::vtable::CanonicalVTable;
30use vortex_array::vtable::NotSupported;
31use vortex_array::vtable::VTable;
32use vortex_array::vtable::ValidityVTable;
33use vortex_buffer::BufferHandle;
34use vortex_dtype::DType;
35use vortex_dtype::Nullability;
36use vortex_dtype::PType;
37use vortex_error::VortexExpect as _;
38use vortex_error::VortexResult;
39use vortex_error::vortex_bail;
40use vortex_error::vortex_ensure;
41use vortex_error::vortex_panic;
42use vortex_mask::Mask;
43use vortex_scalar::PValue;
44
45use crate::compress::runend_decode_bools;
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_encode;
48
49vtable!(RunEnd);
50
51#[derive(Clone, prost::Message)]
52pub struct RunEndMetadata {
53 #[prost(enumeration = "PType", tag = "1")]
54 pub ends_ptype: i32,
55 #[prost(uint64, tag = "2")]
56 pub num_runs: u64,
57 #[prost(uint64, tag = "3")]
58 pub offset: u64,
59}
60
61impl VTable for RunEndVTable {
62 type Array = RunEndArray;
63
64 type Metadata = ProstMetadata<RunEndMetadata>;
65
66 type ArrayVTable = Self;
67 type CanonicalVTable = Self;
68 type OperationsVTable = Self;
69 type ValidityVTable = Self;
70 type VisitorVTable = Self;
71 type ComputeVTable = NotSupported;
72 type EncodeVTable = Self;
73
74 fn id(&self) -> ArrayId {
75 ArrayId::new_ref("vortex.runend")
76 }
77
78 fn encoding(_array: &Self::Array) -> ArrayVTable {
79 RunEndVTable.as_vtable()
80 }
81
82 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
83 Ok(ProstMetadata(RunEndMetadata {
84 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
85 as i32,
86 num_runs: array.ends().len() as u64,
87 offset: array.offset() as u64,
88 }))
89 }
90
91 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
92 Ok(Some(metadata.serialize()))
93 }
94
95 fn deserialize(buffer: &[u8]) -> VortexResult<Self::Metadata> {
96 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(buffer)?;
97 Ok(ProstMetadata(inner))
98 }
99
100 fn build(
101 &self,
102 dtype: &DType,
103 len: usize,
104 metadata: &Self::Metadata,
105 _buffers: &[BufferHandle],
106 children: &dyn ArrayChildren,
107 ) -> VortexResult<RunEndArray> {
108 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
109 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
110 let ends = children.get(0, &ends_dtype, runs)?;
111
112 let values = children.get(1, dtype, runs)?;
113
114 RunEndArray::try_new_offset_length(
115 ends,
116 values,
117 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
118 len,
119 )
120 }
121}
122
123#[derive(Clone, Debug)]
124pub struct RunEndArray {
125 ends: ArrayRef,
126 values: ArrayRef,
127 offset: usize,
128 length: usize,
129 stats_set: ArrayStats,
130}
131
132#[derive(Debug)]
133pub struct RunEndVTable;
134
135impl RunEndArray {
136 fn validate(
137 ends: &dyn Array,
138 values: &dyn Array,
139 offset: usize,
140 length: usize,
141 ) -> VortexResult<()> {
142 vortex_ensure!(
144 ends.dtype().is_unsigned_int(),
145 "run ends must be unsigned integers, was {}",
146 ends.dtype(),
147 );
148 vortex_ensure!(
149 values.dtype().is_primitive() || values.dtype().is_boolean(),
150 "RunEnd array can only have Bool or Primitive values, {} given",
151 values.dtype()
152 );
153
154 vortex_ensure!(
155 ends.len() == values.len(),
156 "run ends len != run values len, {} != {}",
157 ends.len(),
158 values.len()
159 );
160
161 if ends.is_empty() {
163 vortex_ensure!(
164 offset == 0,
165 "non-zero offset provided for empty RunEndArray"
166 );
167 return Ok(());
168 }
169
170 if let Some(is_sorted) = ends.statistics().compute_is_strict_sorted() {
172 vortex_ensure!(is_sorted, "run ends must be monotonic");
173 } else {
174 vortex_bail!("run ends must report IsStrictSorted");
176 }
177
178 if offset != 0 && length != 0 {
180 let first_run_end: usize = ends.scalar_at(0).as_ref().try_into()?;
181 if first_run_end <= offset {
182 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
183 }
184 }
185
186 Ok(())
187 }
188}
189
190impl RunEndArray {
191 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
213 Self::try_new(ends, values).vortex_expect("RunEndArray new")
214 }
215
216 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
257 let length: usize = if ends.is_empty() {
258 0
259 } else {
260 ends.scalar_at(ends.len() - 1).as_ref().try_into()?
261 };
262
263 Self::try_new_offset_length(ends, values, 0, length)
264 }
265
266 pub fn try_new_offset_length(
270 ends: ArrayRef,
271 values: ArrayRef,
272 offset: usize,
273 length: usize,
274 ) -> VortexResult<Self> {
275 Self::validate(&ends, &values, offset, length)?;
276
277 Ok(Self {
278 ends,
279 values,
280 offset,
281 length,
282 stats_set: Default::default(),
283 })
284 }
285
286 pub unsafe fn new_unchecked(
295 ends: ArrayRef,
296 values: ArrayRef,
297 offset: usize,
298 length: usize,
299 ) -> Self {
300 Self {
301 ends,
302 values,
303 offset,
304 length,
305 stats_set: Default::default(),
306 }
307 }
308
309 pub fn find_physical_index(&self, index: usize) -> usize {
311 self.ends()
312 .as_primitive_typed()
313 .search_sorted(
314 &PValue::from(index + self.offset()),
315 SearchSortedSide::Right,
316 )
317 .to_ends_index(self.ends().len())
318 }
319
320 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
322 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
323 let (ends, values) = runend_encode(parray);
324 unsafe {
326 Ok(Self::new_unchecked(
327 ends.into_array(),
328 values,
329 0,
330 array.len(),
331 ))
332 }
333 } else {
334 vortex_bail!("REE can only encode primitive arrays")
335 }
336 }
337
338 #[inline]
342 pub fn offset(&self) -> usize {
343 self.offset
344 }
345
346 #[inline]
351 pub fn ends(&self) -> &ArrayRef {
352 &self.ends
353 }
354
355 #[inline]
360 pub fn values(&self) -> &ArrayRef {
361 &self.values
362 }
363}
364
365impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
366 fn len(array: &RunEndArray) -> usize {
367 array.length
368 }
369
370 fn dtype(array: &RunEndArray) -> &DType {
371 array.values.dtype()
372 }
373
374 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
375 array.stats_set.to_ref(array.as_ref())
376 }
377
378 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
379 array.ends.array_hash(state, precision);
380 array.values.array_hash(state, precision);
381 array.offset.hash(state);
382 array.length.hash(state);
383 }
384
385 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
386 array.ends.array_eq(&other.ends, precision)
387 && array.values.array_eq(&other.values, precision)
388 && array.offset == other.offset
389 && array.length == other.length
390 }
391}
392
393impl ValidityVTable<RunEndVTable> for RunEndVTable {
394 fn is_valid(array: &RunEndArray, index: usize) -> bool {
395 let physical_idx = array.find_physical_index(index);
396 array.values().is_valid(physical_idx)
397 }
398
399 fn all_valid(array: &RunEndArray) -> bool {
400 array.values().all_valid()
401 }
402
403 fn all_invalid(array: &RunEndArray) -> bool {
404 array.values().all_invalid()
405 }
406
407 fn validity_mask(array: &RunEndArray) -> Mask {
408 match array.values().validity_mask() {
409 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
410 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
411 Mask::Values(values) => {
412 let ree_validity = unsafe {
415 RunEndArray::new_unchecked(
416 array.ends().clone(),
417 values.into_array(),
418 array.offset(),
419 array.len(),
420 )
421 .into_array()
422 };
423 Mask::from_buffer(ree_validity.to_bool().bit_buffer().clone())
424 }
425 }
426 }
427}
428
429impl CanonicalVTable<RunEndVTable> for RunEndVTable {
430 fn canonicalize(array: &RunEndArray) -> Canonical {
431 let pends = array.ends().to_primitive();
432 match array.dtype() {
433 DType::Bool(_) => {
434 let bools = array.values().to_bool();
435 Canonical::Bool(runend_decode_bools(
436 pends,
437 bools,
438 array.offset(),
439 array.len(),
440 ))
441 }
442 DType::Primitive(..) => {
443 let pvalues = array.values().to_primitive();
444 Canonical::Primitive(runend_decode_primitive(
445 pends,
446 pvalues,
447 array.offset(),
448 array.len(),
449 ))
450 }
451 _ => vortex_panic!("Only Primitive and Bool values are supported"),
452 }
453 }
454}
455
456#[cfg(test)]
457mod tests {
458 use vortex_array::IntoArray;
459 use vortex_array::assert_arrays_eq;
460 use vortex_buffer::buffer;
461 use vortex_dtype::DType;
462 use vortex_dtype::Nullability;
463 use vortex_dtype::PType;
464
465 use crate::RunEndArray;
466
467 #[test]
468 fn test_runend_constructor() {
469 let arr = RunEndArray::new(
470 buffer![2u32, 5, 10].into_array(),
471 buffer![1i32, 2, 3].into_array(),
472 );
473 assert_eq!(arr.len(), 10);
474 assert_eq!(
475 arr.dtype(),
476 &DType::Primitive(PType::I32, Nullability::NonNullable)
477 );
478
479 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
483 assert_arrays_eq!(arr.to_array(), expected);
484 }
485}