1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::Array;
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::ExecutionCtx;
13use vortex_array::IntoArray;
14use vortex_array::Precision;
15use vortex_array::ProstMetadata;
16use vortex_array::SerializeMetadata;
17use vortex_array::arrays::PrimitiveVTable;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::dtype::DType;
20use vortex_array::dtype::Nullability;
21use vortex_array::dtype::PType;
22use vortex_array::scalar::PValue;
23use vortex_array::search_sorted::SearchSorted;
24use vortex_array::search_sorted::SearchSortedSide;
25use vortex_array::serde::ArrayChildren;
26use vortex_array::stats::ArrayStats;
27use vortex_array::stats::StatsSetRef;
28use vortex_array::validity::Validity;
29use vortex_array::vtable;
30use vortex_array::vtable::ArrayId;
31use vortex_array::vtable::VTable;
32use vortex_array::vtable::ValidityVTable;
33use vortex_error::VortexExpect as _;
34use vortex_error::VortexResult;
35use vortex_error::vortex_bail;
36use vortex_error::vortex_ensure;
37use vortex_error::vortex_panic;
38use vortex_session::VortexSession;
39
40use crate::compress::runend_decode_bools;
41use crate::compress::runend_decode_primitive;
42use crate::compress::runend_encode;
43use crate::kernel::PARENT_KERNELS;
44use crate::rules::RULES;
45
46vtable!(RunEnd);
47
48#[derive(Clone, prost::Message)]
49pub struct RunEndMetadata {
50 #[prost(enumeration = "PType", tag = "1")]
51 pub ends_ptype: i32,
52 #[prost(uint64, tag = "2")]
53 pub num_runs: u64,
54 #[prost(uint64, tag = "3")]
55 pub offset: u64,
56}
57
58impl VTable for RunEndVTable {
59 type Array = RunEndArray;
60
61 type Metadata = ProstMetadata<RunEndMetadata>;
62 type OperationsVTable = Self;
63 type ValidityVTable = Self;
64
65 fn id(_array: &Self::Array) -> ArrayId {
66 Self::ID
67 }
68
69 fn len(array: &RunEndArray) -> usize {
70 array.length
71 }
72
73 fn dtype(array: &RunEndArray) -> &DType {
74 array.values.dtype()
75 }
76
77 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
78 array.stats_set.to_ref(array.as_ref())
79 }
80
81 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
82 array.ends.array_hash(state, precision);
83 array.values.array_hash(state, precision);
84 array.offset.hash(state);
85 array.length.hash(state);
86 }
87
88 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
89 array.ends.array_eq(&other.ends, precision)
90 && array.values.array_eq(&other.values, precision)
91 && array.offset == other.offset
92 && array.length == other.length
93 }
94
95 fn nbuffers(_array: &RunEndArray) -> usize {
96 0
97 }
98
99 fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
100 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
101 }
102
103 fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
104 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
105 }
106
107 fn nchildren(_array: &RunEndArray) -> usize {
108 2
109 }
110
111 fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
112 match idx {
113 0 => array.ends().clone(),
114 1 => array.values().clone(),
115 _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
116 }
117 }
118
119 fn child_name(_array: &RunEndArray, idx: usize) -> String {
120 match idx {
121 0 => "ends".to_string(),
122 1 => "values".to_string(),
123 _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
124 }
125 }
126
127 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
128 Ok(ProstMetadata(RunEndMetadata {
129 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
130 as i32,
131 num_runs: array.ends().len() as u64,
132 offset: array.offset() as u64,
133 }))
134 }
135
136 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
137 Ok(Some(metadata.serialize()))
138 }
139
140 fn deserialize(
141 bytes: &[u8],
142 _dtype: &DType,
143 _len: usize,
144 _buffers: &[BufferHandle],
145 _session: &VortexSession,
146 ) -> VortexResult<Self::Metadata> {
147 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
148 Ok(ProstMetadata(inner))
149 }
150
151 fn build(
152 dtype: &DType,
153 len: usize,
154 metadata: &Self::Metadata,
155 _buffers: &[BufferHandle],
156 children: &dyn ArrayChildren,
157 ) -> VortexResult<RunEndArray> {
158 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
159 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
160 let ends = children.get(0, &ends_dtype, runs)?;
161
162 let values = children.get(1, dtype, runs)?;
163
164 RunEndArray::try_new_offset_length(
165 ends,
166 values,
167 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
168 len,
169 )
170 }
171
172 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
173 vortex_ensure!(
174 children.len() == 2,
175 "RunEndArray expects 2 children, got {}",
176 children.len()
177 );
178
179 let mut children_iter = children.into_iter();
180 array.ends = children_iter.next().vortex_expect("ends child");
181 array.values = children_iter.next().vortex_expect("values child");
182
183 Ok(())
184 }
185
186 fn reduce_parent(
187 array: &Self::Array,
188 parent: &ArrayRef,
189 child_idx: usize,
190 ) -> VortexResult<Option<ArrayRef>> {
191 RULES.evaluate(array, parent, child_idx)
192 }
193
194 fn execute_parent(
195 array: &Self::Array,
196 parent: &ArrayRef,
197 child_idx: usize,
198 ctx: &mut ExecutionCtx,
199 ) -> VortexResult<Option<ArrayRef>> {
200 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
201 }
202
203 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ArrayRef> {
204 run_end_canonicalize(array, ctx)
205 }
206}
207
208#[derive(Clone, Debug)]
209pub struct RunEndArray {
210 ends: ArrayRef,
211 values: ArrayRef,
212 offset: usize,
213 length: usize,
214 stats_set: ArrayStats,
215}
216
217pub struct RunEndArrayParts {
218 pub ends: ArrayRef,
219 pub values: ArrayRef,
220}
221
222#[derive(Debug)]
223pub struct RunEndVTable;
224
225impl RunEndVTable {
226 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
227}
228
229impl RunEndArray {
230 fn validate(
231 ends: &ArrayRef,
232 values: &ArrayRef,
233 offset: usize,
234 length: usize,
235 ) -> VortexResult<()> {
236 vortex_ensure!(
238 ends.dtype().is_unsigned_int(),
239 "run ends must be unsigned integers, was {}",
240 ends.dtype(),
241 );
242 vortex_ensure!(
243 values.dtype().is_primitive() || values.dtype().is_boolean(),
244 "RunEnd array can only have Bool or Primitive values, {} given",
245 values.dtype()
246 );
247
248 vortex_ensure!(
249 ends.len() == values.len(),
250 "run ends len != run values len, {} != {}",
251 ends.len(),
252 values.len()
253 );
254
255 if ends.is_empty() {
257 vortex_ensure!(
258 offset == 0,
259 "non-zero offset provided for empty RunEndArray"
260 );
261 return Ok(());
262 }
263
264 if length == 0 {
266 vortex_ensure!(
267 ends.is_empty(),
268 "run ends must be empty when length is zero"
269 );
270 return Ok(());
271 }
272
273 debug_assert!({
274 let pre_validation = ends.statistics().to_owned();
276
277 let is_sorted = ends
278 .statistics()
279 .compute_is_strict_sorted()
280 .unwrap_or(false);
281
282 ends.statistics().inherit(pre_validation.iter());
285 is_sorted
286 });
287
288 if !ends.is_host() {
290 return Ok(());
291 }
292
293 if offset != 0 && length != 0 {
295 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
296 if first_run_end <= offset {
297 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
298 }
299 }
300
301 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
302 let min_required_end = offset + length;
303 if last_run_end < min_required_end {
304 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
305 }
306
307 Ok(())
308 }
309}
310
311impl RunEndArray {
312 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
338 Self::try_new(ends, values).vortex_expect("RunEndArray new")
339 }
340
341 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
372 let length: usize = if ends.is_empty() {
373 0
374 } else {
375 usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
376 };
377
378 Self::try_new_offset_length(ends, values, 0, length)
379 }
380
381 pub fn try_new_offset_length(
385 ends: ArrayRef,
386 values: ArrayRef,
387 offset: usize,
388 length: usize,
389 ) -> VortexResult<Self> {
390 Self::validate(&ends, &values, offset, length)?;
391
392 Ok(Self {
393 ends,
394 values,
395 offset,
396 length,
397 stats_set: Default::default(),
398 })
399 }
400
401 pub unsafe fn new_unchecked(
410 ends: ArrayRef,
411 values: ArrayRef,
412 offset: usize,
413 length: usize,
414 ) -> Self {
415 Self {
416 ends,
417 values,
418 offset,
419 length,
420 stats_set: Default::default(),
421 }
422 }
423
424 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
426 Ok(self
427 .ends()
428 .as_primitive_typed()
429 .search_sorted(
430 &PValue::from(index + self.offset()),
431 SearchSortedSide::Right,
432 )?
433 .to_ends_index(self.ends().len()))
434 }
435
436 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
438 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
439 let (ends, values) = runend_encode(parray);
440 unsafe {
442 Ok(Self::new_unchecked(
443 ends.into_array(),
444 values,
445 0,
446 array.len(),
447 ))
448 }
449 } else {
450 vortex_bail!("REE can only encode primitive arrays")
451 }
452 }
453
454 #[inline]
458 pub fn offset(&self) -> usize {
459 self.offset
460 }
461
462 #[inline]
467 pub fn ends(&self) -> &ArrayRef {
468 &self.ends
469 }
470
471 #[inline]
476 pub fn values(&self) -> &ArrayRef {
477 &self.values
478 }
479
480 #[inline]
482 pub fn into_parts(self) -> RunEndArrayParts {
483 RunEndArrayParts {
484 ends: self.ends,
485 values: self.values,
486 }
487 }
488}
489
490impl ValidityVTable<RunEndVTable> for RunEndVTable {
491 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
492 Ok(match array.values().validity()? {
493 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
494 Validity::AllInvalid => Validity::AllInvalid,
495 Validity::Array(values_validity) => Validity::Array(unsafe {
496 RunEndArray::new_unchecked(
497 array.ends().clone(),
498 values_validity,
499 array.offset(),
500 array.len(),
501 )
502 .into_array()
503 }),
504 })
505 }
506}
507
508pub(super) fn run_end_canonicalize(
509 array: &RunEndArray,
510 ctx: &mut ExecutionCtx,
511) -> VortexResult<ArrayRef> {
512 let pends = array.ends().clone().execute_as("ends", ctx)?;
513 Ok(match array.dtype() {
514 DType::Bool(_) => {
515 let bools = array.values().clone().execute_as("values", ctx)?;
516 runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
517 }
518 DType::Primitive(..) => {
519 let pvalues = array.values().clone().execute_as("values", ctx)?;
520 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
521 }
522 _ => vortex_panic!("Only Primitive and Bool values are supported"),
523 })
524}
525
526#[cfg(test)]
527mod tests {
528 use vortex_array::IntoArray;
529 use vortex_array::assert_arrays_eq;
530 use vortex_array::dtype::DType;
531 use vortex_array::dtype::Nullability;
532 use vortex_array::dtype::PType;
533 use vortex_buffer::buffer;
534
535 use crate::RunEndArray;
536
537 #[test]
538 fn test_runend_constructor() {
539 let arr = RunEndArray::new(
540 buffer![2u32, 5, 10].into_array(),
541 buffer![1i32, 2, 3].into_array(),
542 );
543 assert_eq!(arr.len(), 10);
544 assert_eq!(
545 arr.dtype(),
546 &DType::Primitive(PType::I32, Nullability::NonNullable)
547 );
548
549 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
553 assert_arrays_eq!(arr.to_array(), expected);
554 }
555}