1use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::DynArray;
13use vortex_array::ExecutionCtx;
14use vortex_array::ExecutionResult;
15use vortex_array::IntoArray;
16use vortex_array::Precision;
17use vortex_array::ProstMetadata;
18use vortex_array::SerializeMetadata;
19use vortex_array::arrays::Primitive;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::Nullability;
24use vortex_array::dtype::PType;
25use vortex_array::scalar::PValue;
26use vortex_array::search_sorted::SearchSorted;
27use vortex_array::search_sorted::SearchSortedSide;
28use vortex_array::serde::ArrayChildren;
29use vortex_array::stats::ArrayStats;
30use vortex_array::stats::StatsSetRef;
31use vortex_array::validity::Validity;
32use vortex_array::vtable;
33use vortex_array::vtable::Array;
34use vortex_array::vtable::ArrayId;
35use vortex_array::vtable::VTable;
36use vortex_array::vtable::ValidityVTable;
37use vortex_error::VortexExpect as _;
38use vortex_error::VortexResult;
39use vortex_error::vortex_bail;
40use vortex_error::vortex_ensure;
41use vortex_error::vortex_panic;
42use vortex_session::VortexSession;
43
44use crate::compress::runend_decode_primitive;
45use crate::compress::runend_decode_varbinview;
46use crate::compress::runend_encode;
47use crate::decompress_bool::runend_decode_bools;
48use crate::kernel::PARENT_KERNELS;
49use crate::rules::RULES;
50
51vtable!(RunEnd);
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55 #[prost(enumeration = "PType", tag = "1")]
56 pub ends_ptype: i32,
57 #[prost(uint64, tag = "2")]
58 pub num_runs: u64,
59 #[prost(uint64, tag = "3")]
60 pub offset: u64,
61}
62
63impl VTable for RunEnd {
64 type Array = RunEndArray;
65
66 type Metadata = ProstMetadata<RunEndMetadata>;
67 type OperationsVTable = Self;
68 type ValidityVTable = Self;
69
70 fn vtable(_array: &Self::Array) -> &Self {
71 &RunEnd
72 }
73
74 fn id(&self) -> ArrayId {
75 Self::ID
76 }
77
78 fn len(array: &RunEndArray) -> usize {
79 array.length
80 }
81
82 fn dtype(array: &RunEndArray) -> &DType {
83 array.values.dtype()
84 }
85
86 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
87 array.stats_set.to_ref(array.as_ref())
88 }
89
90 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
91 array.ends.array_hash(state, precision);
92 array.values.array_hash(state, precision);
93 array.offset.hash(state);
94 array.length.hash(state);
95 }
96
97 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
98 array.ends.array_eq(&other.ends, precision)
99 && array.values.array_eq(&other.values, precision)
100 && array.offset == other.offset
101 && array.length == other.length
102 }
103
104 fn nbuffers(_array: &RunEndArray) -> usize {
105 0
106 }
107
108 fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
109 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
110 }
111
112 fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
113 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
114 }
115
116 fn nchildren(_array: &RunEndArray) -> usize {
117 2
118 }
119
120 fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
121 match idx {
122 0 => array.ends().clone(),
123 1 => array.values().clone(),
124 _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
125 }
126 }
127
128 fn child_name(_array: &RunEndArray, idx: usize) -> String {
129 match idx {
130 0 => "ends".to_string(),
131 1 => "values".to_string(),
132 _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
133 }
134 }
135
136 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
137 Ok(ProstMetadata(RunEndMetadata {
138 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
139 as i32,
140 num_runs: array.ends().len() as u64,
141 offset: array.offset() as u64,
142 }))
143 }
144
145 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
146 Ok(Some(metadata.serialize()))
147 }
148
149 fn deserialize(
150 bytes: &[u8],
151 _dtype: &DType,
152 _len: usize,
153 _buffers: &[BufferHandle],
154 _session: &VortexSession,
155 ) -> VortexResult<Self::Metadata> {
156 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
157 Ok(ProstMetadata(inner))
158 }
159
160 fn build(
161 dtype: &DType,
162 len: usize,
163 metadata: &Self::Metadata,
164 _buffers: &[BufferHandle],
165 children: &dyn ArrayChildren,
166 ) -> VortexResult<RunEndArray> {
167 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
168 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
169 let ends = children.get(0, &ends_dtype, runs)?;
170
171 let values = children.get(1, dtype, runs)?;
172
173 RunEndArray::try_new_offset_length(
174 ends,
175 values,
176 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
177 len,
178 )
179 }
180
181 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
182 vortex_ensure!(
183 children.len() == 2,
184 "RunEndArray expects 2 children, got {}",
185 children.len()
186 );
187
188 let mut children_iter = children.into_iter();
189 array.ends = children_iter.next().vortex_expect("ends child");
190 array.values = children_iter.next().vortex_expect("values child");
191
192 Ok(())
193 }
194
195 fn reduce_parent(
196 array: &Array<Self>,
197 parent: &ArrayRef,
198 child_idx: usize,
199 ) -> VortexResult<Option<ArrayRef>> {
200 RULES.evaluate(array, parent, child_idx)
201 }
202
203 fn execute_parent(
204 array: &Array<Self>,
205 parent: &ArrayRef,
206 child_idx: usize,
207 ctx: &mut ExecutionCtx,
208 ) -> VortexResult<Option<ArrayRef>> {
209 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
210 }
211
212 fn execute(array: Arc<Array<Self>>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
213 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
214 }
215}
216
217#[derive(Clone, Debug)]
218pub struct RunEndArray {
219 ends: ArrayRef,
220 values: ArrayRef,
221 offset: usize,
222 length: usize,
223 stats_set: ArrayStats,
224}
225
226pub struct RunEndArrayParts {
227 pub ends: ArrayRef,
228 pub values: ArrayRef,
229}
230
231#[derive(Clone, Debug)]
232pub struct RunEnd;
233
234impl RunEnd {
235 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
236}
237
238impl RunEndArray {
239 fn validate(
240 ends: &ArrayRef,
241 values: &ArrayRef,
242 offset: usize,
243 length: usize,
244 ) -> VortexResult<()> {
245 vortex_ensure!(
247 ends.dtype().is_unsigned_int(),
248 "run ends must be unsigned integers, was {}",
249 ends.dtype(),
250 );
251 vortex_ensure!(
252 ends.len() == values.len(),
253 "run ends len != run values len, {} != {}",
254 ends.len(),
255 values.len()
256 );
257
258 if ends.is_empty() {
260 vortex_ensure!(
261 offset == 0,
262 "non-zero offset provided for empty RunEndArray"
263 );
264 return Ok(());
265 }
266
267 if length == 0 {
269 vortex_ensure!(
270 ends.is_empty(),
271 "run ends must be empty when length is zero"
272 );
273 return Ok(());
274 }
275
276 debug_assert!({
277 let pre_validation = ends.statistics().to_owned();
279
280 let is_sorted = ends
281 .statistics()
282 .compute_is_strict_sorted()
283 .unwrap_or(false);
284
285 ends.statistics().inherit(pre_validation.iter());
288 is_sorted
289 });
290
291 if !ends.is_host() {
293 return Ok(());
294 }
295
296 if offset != 0 && length != 0 {
298 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
299 if first_run_end <= offset {
300 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
301 }
302 }
303
304 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
305 let min_required_end = offset + length;
306 if last_run_end < min_required_end {
307 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
308 }
309
310 Ok(())
311 }
312}
313
314impl RunEndArray {
315 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
341 Self::try_new(ends, values).vortex_expect("RunEndArray new")
342 }
343
344 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
350 let length: usize = if ends.is_empty() {
351 0
352 } else {
353 usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
354 };
355
356 Self::try_new_offset_length(ends, values, 0, length)
357 }
358
359 pub fn try_new_offset_length(
363 ends: ArrayRef,
364 values: ArrayRef,
365 offset: usize,
366 length: usize,
367 ) -> VortexResult<Self> {
368 Self::validate(&ends, &values, offset, length)?;
369
370 Ok(Self {
371 ends,
372 values,
373 offset,
374 length,
375 stats_set: Default::default(),
376 })
377 }
378
379 pub unsafe fn new_unchecked(
388 ends: ArrayRef,
389 values: ArrayRef,
390 offset: usize,
391 length: usize,
392 ) -> Self {
393 Self {
394 ends,
395 values,
396 offset,
397 length,
398 stats_set: Default::default(),
399 }
400 }
401
402 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
404 Ok(self
405 .ends()
406 .as_primitive_typed()
407 .search_sorted(
408 &PValue::from(index + self.offset()),
409 SearchSortedSide::Right,
410 )?
411 .to_ends_index(self.ends().len()))
412 }
413
414 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
416 if let Some(parray) = array.as_opt::<Primitive>() {
417 let (ends, values) = runend_encode(parray);
418 unsafe {
420 Ok(Self::new_unchecked(
421 ends.into_array(),
422 values,
423 0,
424 array.len(),
425 ))
426 }
427 } else {
428 vortex_bail!("REE can only encode primitive arrays")
429 }
430 }
431
432 #[inline]
436 pub fn offset(&self) -> usize {
437 self.offset
438 }
439
440 #[inline]
445 pub fn ends(&self) -> &ArrayRef {
446 &self.ends
447 }
448
449 #[inline]
454 pub fn values(&self) -> &ArrayRef {
455 &self.values
456 }
457
458 #[inline]
460 pub fn into_parts(self) -> RunEndArrayParts {
461 RunEndArrayParts {
462 ends: self.ends,
463 values: self.values,
464 }
465 }
466}
467
468impl ValidityVTable<RunEnd> for RunEnd {
469 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
470 Ok(match array.values().validity()? {
471 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
472 Validity::AllInvalid => Validity::AllInvalid,
473 Validity::Array(values_validity) => Validity::Array(unsafe {
474 RunEndArray::new_unchecked(
475 array.ends().clone(),
476 values_validity,
477 array.offset(),
478 array.len(),
479 )
480 .into_array()
481 }),
482 })
483 }
484}
485
486pub(super) fn run_end_canonicalize(
487 array: &RunEndArray,
488 ctx: &mut ExecutionCtx,
489) -> VortexResult<ArrayRef> {
490 let pends = array.ends().clone().execute_as("ends", ctx)?;
491
492 Ok(match array.dtype() {
493 DType::Bool(_) => {
494 let bools = array.values().clone().execute_as("values", ctx)?;
495 runend_decode_bools(pends, bools, array.offset(), array.len())?
496 }
497 DType::Primitive(..) => {
498 let pvalues = array.values().clone().execute_as("values", ctx)?;
499 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
500 }
501 DType::Utf8(_) | DType::Binary(_) => {
502 let values = array
503 .values()
504 .clone()
505 .execute_as::<VarBinViewArray>("values", ctx)?;
506 runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
507 }
508 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
509 })
510}
511
512#[cfg(test)]
513mod tests {
514 use vortex_array::IntoArray;
515 use vortex_array::arrays::DictArray;
516 use vortex_array::arrays::VarBinViewArray;
517 use vortex_array::assert_arrays_eq;
518 use vortex_array::dtype::DType;
519 use vortex_array::dtype::Nullability;
520 use vortex_array::dtype::PType;
521 use vortex_buffer::buffer;
522
523 use crate::RunEndArray;
524
525 #[test]
526 fn test_runend_constructor() {
527 let arr = RunEndArray::new(
528 buffer![2u32, 5, 10].into_array(),
529 buffer![1i32, 2, 3].into_array(),
530 );
531 assert_eq!(arr.len(), 10);
532 assert_eq!(
533 arr.dtype(),
534 &DType::Primitive(PType::I32, Nullability::NonNullable)
535 );
536
537 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
541 assert_arrays_eq!(arr.into_array(), expected);
542 }
543
544 #[test]
545 fn test_runend_utf8() {
546 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
547 let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
548 assert_eq!(arr.len(), 10);
549 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
550
551 let expected =
552 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
553 .into_array();
554 assert_arrays_eq!(arr.into_array(), expected);
555 }
556
557 #[test]
558 fn test_runend_dict() {
559 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
560 let dict_codes = buffer![0u32, 1, 2].into_array();
561 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
562
563 let arr =
564 RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
565 assert_eq!(arr.len(), 10);
566
567 let expected =
568 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
569 .into_array();
570 assert_arrays_eq!(arr.into_array(), expected);
571 }
572}