1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::ExecutionCtx;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::arrays::PrimitiveVTable;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::scalar::PValue;
20use vortex_array::search_sorted::SearchSorted;
21use vortex_array::search_sorted::SearchSortedSide;
22use vortex_array::serde::ArrayChildren;
23use vortex_array::stats::ArrayStats;
24use vortex_array::stats::StatsSetRef;
25use vortex_array::validity::Validity;
26use vortex_array::vtable;
27use vortex_array::vtable::ArrayId;
28use vortex_array::vtable::BaseArrayVTable;
29use vortex_array::vtable::VTable;
30use vortex_array::vtable::ValidityVTable;
31use vortex_dtype::DType;
32use vortex_dtype::Nullability;
33use vortex_dtype::PType;
34use vortex_error::VortexExpect as _;
35use vortex_error::VortexResult;
36use vortex_error::vortex_bail;
37use vortex_error::vortex_ensure;
38use vortex_error::vortex_panic;
39use vortex_session::VortexSession;
40
41use crate::compress::runend_decode_bools;
42use crate::compress::runend_decode_primitive;
43use crate::compress::runend_encode;
44use crate::kernel::PARENT_KERNELS;
45use crate::rules::RULES;
46
47vtable!(RunEnd);
48
49#[derive(Clone, prost::Message)]
50pub struct RunEndMetadata {
51 #[prost(enumeration = "PType", tag = "1")]
52 pub ends_ptype: i32,
53 #[prost(uint64, tag = "2")]
54 pub num_runs: u64,
55 #[prost(uint64, tag = "3")]
56 pub offset: u64,
57}
58
59impl VTable for RunEndVTable {
60 type Array = RunEndArray;
61
62 type Metadata = ProstMetadata<RunEndMetadata>;
63
64 type ArrayVTable = Self;
65 type OperationsVTable = Self;
66 type ValidityVTable = Self;
67 type VisitorVTable = Self;
68
69 fn id(_array: &Self::Array) -> ArrayId {
70 Self::ID
71 }
72
73 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
74 Ok(ProstMetadata(RunEndMetadata {
75 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
76 as i32,
77 num_runs: array.ends().len() as u64,
78 offset: array.offset() as u64,
79 }))
80 }
81
82 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
83 Ok(Some(metadata.serialize()))
84 }
85
86 fn deserialize(
87 bytes: &[u8],
88 _dtype: &DType,
89 _len: usize,
90 _buffers: &[BufferHandle],
91 _session: &VortexSession,
92 ) -> VortexResult<Self::Metadata> {
93 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
94 Ok(ProstMetadata(inner))
95 }
96
97 fn build(
98 dtype: &DType,
99 len: usize,
100 metadata: &Self::Metadata,
101 _buffers: &[BufferHandle],
102 children: &dyn ArrayChildren,
103 ) -> VortexResult<RunEndArray> {
104 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
105 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
106 let ends = children.get(0, &ends_dtype, runs)?;
107
108 let values = children.get(1, dtype, runs)?;
109
110 RunEndArray::try_new_offset_length(
111 ends,
112 values,
113 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
114 len,
115 )
116 }
117
118 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
119 vortex_ensure!(
120 children.len() == 2,
121 "RunEndArray expects 2 children, got {}",
122 children.len()
123 );
124
125 let mut children_iter = children.into_iter();
126 array.ends = children_iter.next().vortex_expect("ends child");
127 array.values = children_iter.next().vortex_expect("values child");
128
129 Ok(())
130 }
131
132 fn reduce_parent(
133 array: &Self::Array,
134 parent: &ArrayRef,
135 child_idx: usize,
136 ) -> VortexResult<Option<ArrayRef>> {
137 RULES.evaluate(array, parent, child_idx)
138 }
139
140 fn execute_parent(
141 array: &Self::Array,
142 parent: &ArrayRef,
143 child_idx: usize,
144 ctx: &mut ExecutionCtx,
145 ) -> VortexResult<Option<ArrayRef>> {
146 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
147 }
148
149 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
150 run_end_canonicalize(array, ctx)
151 }
152}
153
154#[derive(Clone, Debug)]
155pub struct RunEndArray {
156 ends: ArrayRef,
157 values: ArrayRef,
158 offset: usize,
159 length: usize,
160 stats_set: ArrayStats,
161}
162
163pub struct RunEndArrayParts {
164 pub ends: ArrayRef,
165 pub values: ArrayRef,
166}
167
168#[derive(Debug)]
169pub struct RunEndVTable;
170
171impl RunEndVTable {
172 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
173}
174
175impl RunEndArray {
176 fn validate(
177 ends: &dyn Array,
178 values: &dyn Array,
179 offset: usize,
180 length: usize,
181 ) -> VortexResult<()> {
182 vortex_ensure!(
184 ends.dtype().is_unsigned_int(),
185 "run ends must be unsigned integers, was {}",
186 ends.dtype(),
187 );
188 vortex_ensure!(
189 values.dtype().is_primitive() || values.dtype().is_boolean(),
190 "RunEnd array can only have Bool or Primitive values, {} given",
191 values.dtype()
192 );
193
194 vortex_ensure!(
195 ends.len() == values.len(),
196 "run ends len != run values len, {} != {}",
197 ends.len(),
198 values.len()
199 );
200
201 if ends.is_empty() {
203 vortex_ensure!(
204 offset == 0,
205 "non-zero offset provided for empty RunEndArray"
206 );
207 return Ok(());
208 }
209
210 if length == 0 {
212 vortex_ensure!(
213 ends.is_empty(),
214 "run ends must be empty when length is zero"
215 );
216 return Ok(());
217 }
218
219 debug_assert!({
220 let pre_validation = ends.statistics().to_owned();
222
223 let is_sorted = ends
224 .statistics()
225 .compute_is_strict_sorted()
226 .unwrap_or(false);
227
228 ends.statistics().inherit(pre_validation.iter());
231 is_sorted
232 });
233
234 if !ends.is_host() {
236 return Ok(());
237 }
238
239 if offset != 0 && length != 0 {
241 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
242 if first_run_end <= offset {
243 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
244 }
245 }
246
247 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
248 let min_required_end = offset + length;
249 if last_run_end < min_required_end {
250 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
251 }
252
253 Ok(())
254 }
255}
256
257impl RunEndArray {
258 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
284 Self::try_new(ends, values).vortex_expect("RunEndArray new")
285 }
286
287 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
318 let length: usize = if ends.is_empty() {
319 0
320 } else {
321 usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
322 };
323
324 Self::try_new_offset_length(ends, values, 0, length)
325 }
326
327 pub fn try_new_offset_length(
331 ends: ArrayRef,
332 values: ArrayRef,
333 offset: usize,
334 length: usize,
335 ) -> VortexResult<Self> {
336 Self::validate(&ends, &values, offset, length)?;
337
338 Ok(Self {
339 ends,
340 values,
341 offset,
342 length,
343 stats_set: Default::default(),
344 })
345 }
346
347 pub unsafe fn new_unchecked(
356 ends: ArrayRef,
357 values: ArrayRef,
358 offset: usize,
359 length: usize,
360 ) -> Self {
361 Self {
362 ends,
363 values,
364 offset,
365 length,
366 stats_set: Default::default(),
367 }
368 }
369
370 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
372 Ok(self
373 .ends()
374 .as_primitive_typed()
375 .search_sorted(
376 &PValue::from(index + self.offset()),
377 SearchSortedSide::Right,
378 )?
379 .to_ends_index(self.ends().len()))
380 }
381
382 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
384 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
385 let (ends, values) = runend_encode(parray);
386 unsafe {
388 Ok(Self::new_unchecked(
389 ends.into_array(),
390 values,
391 0,
392 array.len(),
393 ))
394 }
395 } else {
396 vortex_bail!("REE can only encode primitive arrays")
397 }
398 }
399
400 #[inline]
404 pub fn offset(&self) -> usize {
405 self.offset
406 }
407
408 #[inline]
413 pub fn ends(&self) -> &ArrayRef {
414 &self.ends
415 }
416
417 #[inline]
422 pub fn values(&self) -> &ArrayRef {
423 &self.values
424 }
425
426 #[inline]
428 pub fn into_parts(self) -> RunEndArrayParts {
429 RunEndArrayParts {
430 ends: self.ends,
431 values: self.values,
432 }
433 }
434}
435
436impl BaseArrayVTable<RunEndVTable> for RunEndVTable {
437 fn len(array: &RunEndArray) -> usize {
438 array.length
439 }
440
441 fn dtype(array: &RunEndArray) -> &DType {
442 array.values.dtype()
443 }
444
445 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
446 array.stats_set.to_ref(array.as_ref())
447 }
448
449 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
450 array.ends.array_hash(state, precision);
451 array.values.array_hash(state, precision);
452 array.offset.hash(state);
453 array.length.hash(state);
454 }
455
456 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
457 array.ends.array_eq(&other.ends, precision)
458 && array.values.array_eq(&other.values, precision)
459 && array.offset == other.offset
460 && array.length == other.length
461 }
462}
463
464impl ValidityVTable<RunEndVTable> for RunEndVTable {
465 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
466 Ok(match array.values().validity()? {
467 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
468 Validity::AllInvalid => Validity::AllInvalid,
469 Validity::Array(values_validity) => Validity::Array(unsafe {
470 RunEndArray::new_unchecked(
471 array.ends().clone(),
472 values_validity,
473 array.offset(),
474 array.len(),
475 )
476 .into_array()
477 }),
478 })
479 }
480}
481
482pub(super) fn run_end_canonicalize(
483 array: &RunEndArray,
484 ctx: &mut ExecutionCtx,
485) -> VortexResult<ArrayRef> {
486 let pends = array.ends().clone().execute_as("ends", ctx)?;
487 Ok(match array.dtype() {
488 DType::Bool(_) => {
489 let bools = array.values().clone().execute_as("values", ctx)?;
490 runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
491 }
492 DType::Primitive(..) => {
493 let pvalues = array.values().clone().execute_as("values", ctx)?;
494 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
495 }
496 _ => vortex_panic!("Only Primitive and Bool values are supported"),
497 })
498}
499
500#[cfg(test)]
501mod tests {
502 use vortex_array::IntoArray;
503 use vortex_array::assert_arrays_eq;
504 use vortex_buffer::buffer;
505 use vortex_dtype::DType;
506 use vortex_dtype::Nullability;
507 use vortex_dtype::PType;
508
509 use crate::RunEndArray;
510
511 #[test]
512 fn test_runend_constructor() {
513 let arr = RunEndArray::new(
514 buffer![2u32, 5, 10].into_array(),
515 buffer![1i32, 2, 3].into_array(),
516 );
517 assert_eq!(arr.len(), 10);
518 assert_eq!(
519 arr.dtype(),
520 &DType::Primitive(PType::I32, Nullability::NonNullable)
521 );
522
523 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
527 assert_arrays_eq!(arr.to_array(), expected);
528 }
529}