1use std::fmt::Debug;
5use std::hash::Hash;
6
7use vortex_array::ArrayEq;
8use vortex_array::ArrayHash;
9use vortex_array::ArrayRef;
10use vortex_array::DeserializeMetadata;
11use vortex_array::DynArray;
12use vortex_array::ExecutionCtx;
13use vortex_array::ExecutionStep;
14use vortex_array::IntoArray;
15use vortex_array::Precision;
16use vortex_array::ProstMetadata;
17use vortex_array::SerializeMetadata;
18use vortex_array::arrays::PrimitiveVTable;
19use vortex_array::arrays::VarBinViewArray;
20use vortex_array::buffer::BufferHandle;
21use vortex_array::dtype::DType;
22use vortex_array::dtype::Nullability;
23use vortex_array::dtype::PType;
24use vortex_array::scalar::PValue;
25use vortex_array::search_sorted::SearchSorted;
26use vortex_array::search_sorted::SearchSortedSide;
27use vortex_array::serde::ArrayChildren;
28use vortex_array::stats::ArrayStats;
29use vortex_array::stats::StatsSetRef;
30use vortex_array::validity::Validity;
31use vortex_array::vtable;
32use vortex_array::vtable::ArrayId;
33use vortex_array::vtable::VTable;
34use vortex_array::vtable::ValidityVTable;
35use vortex_error::VortexExpect as _;
36use vortex_error::VortexResult;
37use vortex_error::vortex_bail;
38use vortex_error::vortex_ensure;
39use vortex_error::vortex_panic;
40use vortex_session::VortexSession;
41
42use crate::compress::runend_decode_bools;
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::kernel::PARENT_KERNELS;
47use crate::rules::RULES;
48
49vtable!(RunEnd);
50
51#[derive(Clone, prost::Message)]
52pub struct RunEndMetadata {
53 #[prost(enumeration = "PType", tag = "1")]
54 pub ends_ptype: i32,
55 #[prost(uint64, tag = "2")]
56 pub num_runs: u64,
57 #[prost(uint64, tag = "3")]
58 pub offset: u64,
59}
60
61impl VTable for RunEndVTable {
62 type Array = RunEndArray;
63
64 type Metadata = ProstMetadata<RunEndMetadata>;
65 type OperationsVTable = Self;
66 type ValidityVTable = Self;
67
68 fn id(_array: &Self::Array) -> ArrayId {
69 Self::ID
70 }
71
72 fn len(array: &RunEndArray) -> usize {
73 array.length
74 }
75
76 fn dtype(array: &RunEndArray) -> &DType {
77 array.values.dtype()
78 }
79
80 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
81 array.stats_set.to_ref(array.as_ref())
82 }
83
84 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
85 array.ends.array_hash(state, precision);
86 array.values.array_hash(state, precision);
87 array.offset.hash(state);
88 array.length.hash(state);
89 }
90
91 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
92 array.ends.array_eq(&other.ends, precision)
93 && array.values.array_eq(&other.values, precision)
94 && array.offset == other.offset
95 && array.length == other.length
96 }
97
98 fn nbuffers(_array: &RunEndArray) -> usize {
99 0
100 }
101
102 fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
103 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
104 }
105
106 fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
107 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
108 }
109
110 fn nchildren(_array: &RunEndArray) -> usize {
111 2
112 }
113
114 fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
115 match idx {
116 0 => array.ends().clone(),
117 1 => array.values().clone(),
118 _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
119 }
120 }
121
122 fn child_name(_array: &RunEndArray, idx: usize) -> String {
123 match idx {
124 0 => "ends".to_string(),
125 1 => "values".to_string(),
126 _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
127 }
128 }
129
130 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
131 Ok(ProstMetadata(RunEndMetadata {
132 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
133 as i32,
134 num_runs: array.ends().len() as u64,
135 offset: array.offset() as u64,
136 }))
137 }
138
139 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
140 Ok(Some(metadata.serialize()))
141 }
142
143 fn deserialize(
144 bytes: &[u8],
145 _dtype: &DType,
146 _len: usize,
147 _buffers: &[BufferHandle],
148 _session: &VortexSession,
149 ) -> VortexResult<Self::Metadata> {
150 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
151 Ok(ProstMetadata(inner))
152 }
153
154 fn build(
155 dtype: &DType,
156 len: usize,
157 metadata: &Self::Metadata,
158 _buffers: &[BufferHandle],
159 children: &dyn ArrayChildren,
160 ) -> VortexResult<RunEndArray> {
161 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
162 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
163 let ends = children.get(0, &ends_dtype, runs)?;
164
165 let values = children.get(1, dtype, runs)?;
166
167 RunEndArray::try_new_offset_length(
168 ends,
169 values,
170 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
171 len,
172 )
173 }
174
175 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
176 vortex_ensure!(
177 children.len() == 2,
178 "RunEndArray expects 2 children, got {}",
179 children.len()
180 );
181
182 let mut children_iter = children.into_iter();
183 array.ends = children_iter.next().vortex_expect("ends child");
184 array.values = children_iter.next().vortex_expect("values child");
185
186 Ok(())
187 }
188
189 fn reduce_parent(
190 array: &Self::Array,
191 parent: &ArrayRef,
192 child_idx: usize,
193 ) -> VortexResult<Option<ArrayRef>> {
194 RULES.evaluate(array, parent, child_idx)
195 }
196
197 fn execute_parent(
198 array: &Self::Array,
199 parent: &ArrayRef,
200 child_idx: usize,
201 ctx: &mut ExecutionCtx,
202 ) -> VortexResult<Option<ArrayRef>> {
203 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
204 }
205
206 fn execute(array: &Self::Array, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionStep> {
207 run_end_canonicalize(array, ctx).map(ExecutionStep::Done)
208 }
209}
210
211#[derive(Clone, Debug)]
212pub struct RunEndArray {
213 ends: ArrayRef,
214 values: ArrayRef,
215 offset: usize,
216 length: usize,
217 stats_set: ArrayStats,
218}
219
220pub struct RunEndArrayParts {
221 pub ends: ArrayRef,
222 pub values: ArrayRef,
223}
224
225#[derive(Debug)]
226pub struct RunEndVTable;
227
228impl RunEndVTable {
229 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
230}
231
232impl RunEndArray {
233 fn validate(
234 ends: &ArrayRef,
235 values: &ArrayRef,
236 offset: usize,
237 length: usize,
238 ) -> VortexResult<()> {
239 vortex_ensure!(
241 ends.dtype().is_unsigned_int(),
242 "run ends must be unsigned integers, was {}",
243 ends.dtype(),
244 );
245 vortex_ensure!(
246 ends.len() == values.len(),
247 "run ends len != run values len, {} != {}",
248 ends.len(),
249 values.len()
250 );
251
252 if ends.is_empty() {
254 vortex_ensure!(
255 offset == 0,
256 "non-zero offset provided for empty RunEndArray"
257 );
258 return Ok(());
259 }
260
261 if length == 0 {
263 vortex_ensure!(
264 ends.is_empty(),
265 "run ends must be empty when length is zero"
266 );
267 return Ok(());
268 }
269
270 debug_assert!({
271 let pre_validation = ends.statistics().to_owned();
273
274 let is_sorted = ends
275 .statistics()
276 .compute_is_strict_sorted()
277 .unwrap_or(false);
278
279 ends.statistics().inherit(pre_validation.iter());
282 is_sorted
283 });
284
285 if !ends.is_host() {
287 return Ok(());
288 }
289
290 if offset != 0 && length != 0 {
292 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
293 if first_run_end <= offset {
294 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
295 }
296 }
297
298 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
299 let min_required_end = offset + length;
300 if last_run_end < min_required_end {
301 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
302 }
303
304 Ok(())
305 }
306}
307
308impl RunEndArray {
309 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
335 Self::try_new(ends, values).vortex_expect("RunEndArray new")
336 }
337
338 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
344 let length: usize = if ends.is_empty() {
345 0
346 } else {
347 usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
348 };
349
350 Self::try_new_offset_length(ends, values, 0, length)
351 }
352
353 pub fn try_new_offset_length(
357 ends: ArrayRef,
358 values: ArrayRef,
359 offset: usize,
360 length: usize,
361 ) -> VortexResult<Self> {
362 Self::validate(&ends, &values, offset, length)?;
363
364 Ok(Self {
365 ends,
366 values,
367 offset,
368 length,
369 stats_set: Default::default(),
370 })
371 }
372
373 pub unsafe fn new_unchecked(
382 ends: ArrayRef,
383 values: ArrayRef,
384 offset: usize,
385 length: usize,
386 ) -> Self {
387 Self {
388 ends,
389 values,
390 offset,
391 length,
392 stats_set: Default::default(),
393 }
394 }
395
396 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
398 Ok(self
399 .ends()
400 .as_primitive_typed()
401 .search_sorted(
402 &PValue::from(index + self.offset()),
403 SearchSortedSide::Right,
404 )?
405 .to_ends_index(self.ends().len()))
406 }
407
408 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
410 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
411 let (ends, values) = runend_encode(parray);
412 unsafe {
414 Ok(Self::new_unchecked(
415 ends.into_array(),
416 values,
417 0,
418 array.len(),
419 ))
420 }
421 } else {
422 vortex_bail!("REE can only encode primitive arrays")
423 }
424 }
425
426 #[inline]
430 pub fn offset(&self) -> usize {
431 self.offset
432 }
433
434 #[inline]
439 pub fn ends(&self) -> &ArrayRef {
440 &self.ends
441 }
442
443 #[inline]
448 pub fn values(&self) -> &ArrayRef {
449 &self.values
450 }
451
452 #[inline]
454 pub fn into_parts(self) -> RunEndArrayParts {
455 RunEndArrayParts {
456 ends: self.ends,
457 values: self.values,
458 }
459 }
460}
461
462impl ValidityVTable<RunEndVTable> for RunEndVTable {
463 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
464 Ok(match array.values().validity()? {
465 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
466 Validity::AllInvalid => Validity::AllInvalid,
467 Validity::Array(values_validity) => Validity::Array(unsafe {
468 RunEndArray::new_unchecked(
469 array.ends().clone(),
470 values_validity,
471 array.offset(),
472 array.len(),
473 )
474 .into_array()
475 }),
476 })
477 }
478}
479
480pub(super) fn run_end_canonicalize(
481 array: &RunEndArray,
482 ctx: &mut ExecutionCtx,
483) -> VortexResult<ArrayRef> {
484 let pends = array.ends().clone().execute_as("ends", ctx)?;
485
486 Ok(match array.dtype() {
487 DType::Bool(_) => {
488 let bools = array.values().clone().execute_as("values", ctx)?;
489 runend_decode_bools(pends, bools, array.offset(), array.len())?.into_array()
490 }
491 DType::Primitive(..) => {
492 let pvalues = array.values().clone().execute_as("values", ctx)?;
493 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
494 }
495 DType::Utf8(_) | DType::Binary(_) => {
496 let values = array
497 .values()
498 .clone()
499 .execute_as::<VarBinViewArray>("values", ctx)?;
500 runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
501 }
502 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
503 })
504}
505
506#[cfg(test)]
507mod tests {
508 use vortex_array::IntoArray;
509 use vortex_array::arrays::DictArray;
510 use vortex_array::arrays::VarBinViewArray;
511 use vortex_array::assert_arrays_eq;
512 use vortex_array::dtype::DType;
513 use vortex_array::dtype::Nullability;
514 use vortex_array::dtype::PType;
515 use vortex_buffer::buffer;
516
517 use crate::RunEndArray;
518
519 #[test]
520 fn test_runend_constructor() {
521 let arr = RunEndArray::new(
522 buffer![2u32, 5, 10].into_array(),
523 buffer![1i32, 2, 3].into_array(),
524 );
525 assert_eq!(arr.len(), 10);
526 assert_eq!(
527 arr.dtype(),
528 &DType::Primitive(PType::I32, Nullability::NonNullable)
529 );
530
531 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
535 assert_arrays_eq!(arr.into_array(), expected);
536 }
537
538 #[test]
539 fn test_runend_utf8() {
540 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
541 let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
542 assert_eq!(arr.len(), 10);
543 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
544
545 let expected =
546 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
547 .into_array();
548 assert_arrays_eq!(arr.into_array(), expected);
549 }
550
551 #[test]
552 fn test_runend_dict() {
553 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
554 let dict_codes = buffer![0u32, 1, 2].into_array();
555 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
556
557 let arr =
558 RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
559 assert_eq!(arr.len(), 10);
560
561 let expected =
562 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
563 .into_array();
564 assert_arrays_eq!(arr.into_array(), expected);
565 }
566}