1use std::fmt::Debug;
5use std::hash::Hash;
6use std::sync::Arc;
7
8use vortex_array::ArrayEq;
9use vortex_array::ArrayHash;
10use vortex_array::ArrayRef;
11use vortex_array::DeserializeMetadata;
12use vortex_array::DynArray;
13use vortex_array::ExecutionCtx;
14use vortex_array::ExecutionResult;
15use vortex_array::IntoArray;
16use vortex_array::Precision;
17use vortex_array::ProstMetadata;
18use vortex_array::SerializeMetadata;
19use vortex_array::arrays::Primitive;
20use vortex_array::arrays::VarBinViewArray;
21use vortex_array::buffer::BufferHandle;
22use vortex_array::dtype::DType;
23use vortex_array::dtype::Nullability;
24use vortex_array::dtype::PType;
25use vortex_array::scalar::PValue;
26use vortex_array::search_sorted::SearchSorted;
27use vortex_array::search_sorted::SearchSortedSide;
28use vortex_array::serde::ArrayChildren;
29use vortex_array::stats::ArrayStats;
30use vortex_array::stats::StatsSetRef;
31use vortex_array::validity::Validity;
32use vortex_array::vtable;
33use vortex_array::vtable::ArrayId;
34use vortex_array::vtable::VTable;
35use vortex_array::vtable::ValidityVTable;
36use vortex_error::VortexExpect as _;
37use vortex_error::VortexResult;
38use vortex_error::vortex_bail;
39use vortex_error::vortex_ensure;
40use vortex_error::vortex_panic;
41use vortex_session::VortexSession;
42
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::decompress_bool::runend_decode_bools;
47use crate::kernel::PARENT_KERNELS;
48use crate::rules::RULES;
49
50vtable!(RunEnd);
51
52#[derive(Clone, prost::Message)]
53pub struct RunEndMetadata {
54 #[prost(enumeration = "PType", tag = "1")]
55 pub ends_ptype: i32,
56 #[prost(uint64, tag = "2")]
57 pub num_runs: u64,
58 #[prost(uint64, tag = "3")]
59 pub offset: u64,
60}
61
62impl VTable for RunEnd {
63 type Array = RunEndArray;
64
65 type Metadata = ProstMetadata<RunEndMetadata>;
66 type OperationsVTable = Self;
67 type ValidityVTable = Self;
68
69 fn vtable(_array: &Self::Array) -> &Self {
70 &RunEnd
71 }
72
73 fn id(&self) -> ArrayId {
74 Self::ID
75 }
76
77 fn len(array: &RunEndArray) -> usize {
78 array.length
79 }
80
81 fn dtype(array: &RunEndArray) -> &DType {
82 array.values.dtype()
83 }
84
85 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
86 array.stats_set.to_ref(array.as_ref())
87 }
88
89 fn array_hash<H: std::hash::Hasher>(array: &RunEndArray, state: &mut H, precision: Precision) {
90 array.ends.array_hash(state, precision);
91 array.values.array_hash(state, precision);
92 array.offset.hash(state);
93 array.length.hash(state);
94 }
95
96 fn array_eq(array: &RunEndArray, other: &RunEndArray, precision: Precision) -> bool {
97 array.ends.array_eq(&other.ends, precision)
98 && array.values.array_eq(&other.values, precision)
99 && array.offset == other.offset
100 && array.length == other.length
101 }
102
103 fn nbuffers(_array: &RunEndArray) -> usize {
104 0
105 }
106
107 fn buffer(_array: &RunEndArray, idx: usize) -> BufferHandle {
108 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
109 }
110
111 fn buffer_name(_array: &RunEndArray, idx: usize) -> Option<String> {
112 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
113 }
114
115 fn nchildren(_array: &RunEndArray) -> usize {
116 2
117 }
118
119 fn child(array: &RunEndArray, idx: usize) -> ArrayRef {
120 match idx {
121 0 => array.ends().clone(),
122 1 => array.values().clone(),
123 _ => vortex_panic!("RunEndArray child index {idx} out of bounds"),
124 }
125 }
126
127 fn child_name(_array: &RunEndArray, idx: usize) -> String {
128 match idx {
129 0 => "ends".to_string(),
130 1 => "values".to_string(),
131 _ => vortex_panic!("RunEndArray child_name index {idx} out of bounds"),
132 }
133 }
134
135 fn metadata(array: &RunEndArray) -> VortexResult<Self::Metadata> {
136 Ok(ProstMetadata(RunEndMetadata {
137 ends_ptype: PType::try_from(array.ends().dtype()).vortex_expect("Must be a valid PType")
138 as i32,
139 num_runs: array.ends().len() as u64,
140 offset: array.offset() as u64,
141 }))
142 }
143
144 fn serialize(metadata: Self::Metadata) -> VortexResult<Option<Vec<u8>>> {
145 Ok(Some(metadata.serialize()))
146 }
147
148 fn deserialize(
149 bytes: &[u8],
150 _dtype: &DType,
151 _len: usize,
152 _buffers: &[BufferHandle],
153 _session: &VortexSession,
154 ) -> VortexResult<Self::Metadata> {
155 let inner = <ProstMetadata<RunEndMetadata> as DeserializeMetadata>::deserialize(bytes)?;
156 Ok(ProstMetadata(inner))
157 }
158
159 fn build(
160 dtype: &DType,
161 len: usize,
162 metadata: &Self::Metadata,
163 _buffers: &[BufferHandle],
164 children: &dyn ArrayChildren,
165 ) -> VortexResult<RunEndArray> {
166 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
167 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
168 let ends = children.get(0, &ends_dtype, runs)?;
169
170 let values = children.get(1, dtype, runs)?;
171
172 RunEndArray::try_new_offset_length(
173 ends,
174 values,
175 usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize"),
176 len,
177 )
178 }
179
180 fn with_children(array: &mut Self::Array, children: Vec<ArrayRef>) -> VortexResult<()> {
181 vortex_ensure!(
182 children.len() == 2,
183 "RunEndArray expects 2 children, got {}",
184 children.len()
185 );
186
187 let mut children_iter = children.into_iter();
188 array.ends = children_iter.next().vortex_expect("ends child");
189 array.values = children_iter.next().vortex_expect("values child");
190
191 Ok(())
192 }
193
194 fn reduce_parent(
195 array: &Self::Array,
196 parent: &ArrayRef,
197 child_idx: usize,
198 ) -> VortexResult<Option<ArrayRef>> {
199 RULES.evaluate(array, parent, child_idx)
200 }
201
202 fn execute_parent(
203 array: &Self::Array,
204 parent: &ArrayRef,
205 child_idx: usize,
206 ctx: &mut ExecutionCtx,
207 ) -> VortexResult<Option<ArrayRef>> {
208 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
209 }
210
211 fn execute(array: Arc<Self::Array>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
212 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
213 }
214}
215
216#[derive(Clone, Debug)]
217pub struct RunEndArray {
218 ends: ArrayRef,
219 values: ArrayRef,
220 offset: usize,
221 length: usize,
222 stats_set: ArrayStats,
223}
224
225pub struct RunEndArrayParts {
226 pub ends: ArrayRef,
227 pub values: ArrayRef,
228}
229
230#[derive(Clone, Debug)]
231pub struct RunEnd;
232
233impl RunEnd {
234 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
235}
236
237impl RunEndArray {
238 fn validate(
239 ends: &ArrayRef,
240 values: &ArrayRef,
241 offset: usize,
242 length: usize,
243 ) -> VortexResult<()> {
244 vortex_ensure!(
246 ends.dtype().is_unsigned_int(),
247 "run ends must be unsigned integers, was {}",
248 ends.dtype(),
249 );
250 vortex_ensure!(
251 ends.len() == values.len(),
252 "run ends len != run values len, {} != {}",
253 ends.len(),
254 values.len()
255 );
256
257 if ends.is_empty() {
259 vortex_ensure!(
260 offset == 0,
261 "non-zero offset provided for empty RunEndArray"
262 );
263 return Ok(());
264 }
265
266 if length == 0 {
268 vortex_ensure!(
269 ends.is_empty(),
270 "run ends must be empty when length is zero"
271 );
272 return Ok(());
273 }
274
275 debug_assert!({
276 let pre_validation = ends.statistics().to_owned();
278
279 let is_sorted = ends
280 .statistics()
281 .compute_is_strict_sorted()
282 .unwrap_or(false);
283
284 ends.statistics().inherit(pre_validation.iter());
287 is_sorted
288 });
289
290 if !ends.is_host() {
292 return Ok(());
293 }
294
295 if offset != 0 && length != 0 {
297 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
298 if first_run_end <= offset {
299 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
300 }
301 }
302
303 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
304 let min_required_end = offset + length;
305 if last_run_end < min_required_end {
306 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
307 }
308
309 Ok(())
310 }
311}
312
313impl RunEndArray {
314 pub fn new(ends: ArrayRef, values: ArrayRef) -> Self {
340 Self::try_new(ends, values).vortex_expect("RunEndArray new")
341 }
342
343 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
349 let length: usize = if ends.is_empty() {
350 0
351 } else {
352 usize::try_from(&ends.scalar_at(ends.len() - 1)?)?
353 };
354
355 Self::try_new_offset_length(ends, values, 0, length)
356 }
357
358 pub fn try_new_offset_length(
362 ends: ArrayRef,
363 values: ArrayRef,
364 offset: usize,
365 length: usize,
366 ) -> VortexResult<Self> {
367 Self::validate(&ends, &values, offset, length)?;
368
369 Ok(Self {
370 ends,
371 values,
372 offset,
373 length,
374 stats_set: Default::default(),
375 })
376 }
377
378 pub unsafe fn new_unchecked(
387 ends: ArrayRef,
388 values: ArrayRef,
389 offset: usize,
390 length: usize,
391 ) -> Self {
392 Self {
393 ends,
394 values,
395 offset,
396 length,
397 stats_set: Default::default(),
398 }
399 }
400
401 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
403 Ok(self
404 .ends()
405 .as_primitive_typed()
406 .search_sorted(
407 &PValue::from(index + self.offset()),
408 SearchSortedSide::Right,
409 )?
410 .to_ends_index(self.ends().len()))
411 }
412
413 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
415 if let Some(parray) = array.as_opt::<Primitive>() {
416 let (ends, values) = runend_encode(parray);
417 unsafe {
419 Ok(Self::new_unchecked(
420 ends.into_array(),
421 values,
422 0,
423 array.len(),
424 ))
425 }
426 } else {
427 vortex_bail!("REE can only encode primitive arrays")
428 }
429 }
430
431 #[inline]
435 pub fn offset(&self) -> usize {
436 self.offset
437 }
438
439 #[inline]
444 pub fn ends(&self) -> &ArrayRef {
445 &self.ends
446 }
447
448 #[inline]
453 pub fn values(&self) -> &ArrayRef {
454 &self.values
455 }
456
457 #[inline]
459 pub fn into_parts(self) -> RunEndArrayParts {
460 RunEndArrayParts {
461 ends: self.ends,
462 values: self.values,
463 }
464 }
465}
466
467impl ValidityVTable<RunEnd> for RunEnd {
468 fn validity(array: &RunEndArray) -> VortexResult<Validity> {
469 Ok(match array.values().validity()? {
470 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
471 Validity::AllInvalid => Validity::AllInvalid,
472 Validity::Array(values_validity) => Validity::Array(unsafe {
473 RunEndArray::new_unchecked(
474 array.ends().clone(),
475 values_validity,
476 array.offset(),
477 array.len(),
478 )
479 .into_array()
480 }),
481 })
482 }
483}
484
485pub(super) fn run_end_canonicalize(
486 array: &RunEndArray,
487 ctx: &mut ExecutionCtx,
488) -> VortexResult<ArrayRef> {
489 let pends = array.ends().clone().execute_as("ends", ctx)?;
490
491 Ok(match array.dtype() {
492 DType::Bool(_) => {
493 let bools = array.values().clone().execute_as("values", ctx)?;
494 runend_decode_bools(pends, bools, array.offset(), array.len())?
495 }
496 DType::Primitive(..) => {
497 let pvalues = array.values().clone().execute_as("values", ctx)?;
498 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
499 }
500 DType::Utf8(_) | DType::Binary(_) => {
501 let values = array
502 .values()
503 .clone()
504 .execute_as::<VarBinViewArray>("values", ctx)?;
505 runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
506 }
507 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
508 })
509}
510
511#[cfg(test)]
512mod tests {
513 use vortex_array::IntoArray;
514 use vortex_array::arrays::DictArray;
515 use vortex_array::arrays::VarBinViewArray;
516 use vortex_array::assert_arrays_eq;
517 use vortex_array::dtype::DType;
518 use vortex_array::dtype::Nullability;
519 use vortex_array::dtype::PType;
520 use vortex_buffer::buffer;
521
522 use crate::RunEndArray;
523
524 #[test]
525 fn test_runend_constructor() {
526 let arr = RunEndArray::new(
527 buffer![2u32, 5, 10].into_array(),
528 buffer![1i32, 2, 3].into_array(),
529 );
530 assert_eq!(arr.len(), 10);
531 assert_eq!(
532 arr.dtype(),
533 &DType::Primitive(PType::I32, Nullability::NonNullable)
534 );
535
536 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
540 assert_arrays_eq!(arr.into_array(), expected);
541 }
542
543 #[test]
544 fn test_runend_utf8() {
545 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
546 let arr = RunEndArray::new(buffer![2u32, 5, 10].into_array(), values);
547 assert_eq!(arr.len(), 10);
548 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
549
550 let expected =
551 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
552 .into_array();
553 assert_arrays_eq!(arr.into_array(), expected);
554 }
555
556 #[test]
557 fn test_runend_dict() {
558 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
559 let dict_codes = buffer![0u32, 1, 2].into_array();
560 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
561
562 let arr =
563 RunEndArray::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
564 assert_eq!(arr.len(), 10);
565
566 let expected =
567 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
568 .into_array();
569 assert_arrays_eq!(arr.into_array(), expected);
570 }
571}