1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::validity::Validity;
36use vortex_array::vtable::VTable;
37use vortex_array::vtable::ValidityVTable;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_session::VortexSession;
44use vortex_session::registry::CachedId;
45
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_decode_varbinview;
48use crate::compress::runend_encode;
49use crate::decompress_bool::runend_decode_bools;
50use crate::kernel::PARENT_KERNELS;
51use crate::rules::RULES;
52
53pub type RunEndArray = Array<RunEnd>;
55
56#[derive(Clone, prost::Message)]
57pub struct RunEndMetadata {
58 #[prost(enumeration = "PType", tag = "1")]
59 pub ends_ptype: i32,
60 #[prost(uint64, tag = "2")]
61 pub num_runs: u64,
62 #[prost(uint64, tag = "3")]
63 pub offset: u64,
64}
65
66impl ArrayHash for RunEndData {
67 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
68 self.offset.hash(state);
69 }
70}
71
72impl ArrayEq for RunEndData {
73 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
74 self.offset == other.offset
75 }
76}
77
78impl VTable for RunEnd {
79 type ArrayData = RunEndData;
80
81 type OperationsVTable = Self;
82 type ValidityVTable = Self;
83
84 fn id(&self) -> ArrayId {
85 static ID: CachedId = CachedId::new("vortex.runend");
86 *ID
87 }
88
89 fn validate(
90 &self,
91 data: &Self::ArrayData,
92 dtype: &DType,
93 len: usize,
94 slots: &[Option<ArrayRef>],
95 ) -> VortexResult<()> {
96 let ends = slots[ENDS_SLOT]
97 .as_ref()
98 .vortex_expect("RunEndArray ends slot");
99 let values = slots[VALUES_SLOT]
100 .as_ref()
101 .vortex_expect("RunEndArray values slot");
102 let mut ctx = LEGACY_SESSION.create_execution_ctx();
104 RunEndData::validate_parts(ends, values, data.offset, len, &mut ctx)?;
105 vortex_ensure!(
106 values.dtype() == dtype,
107 "expected dtype {}, got {}",
108 dtype,
109 values.dtype()
110 );
111 Ok(())
112 }
113
114 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
115 0
116 }
117
118 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
119 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
120 }
121
122 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
123 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
124 }
125
126 fn serialize(
127 array: ArrayView<'_, Self>,
128 _session: &VortexSession,
129 ) -> VortexResult<Option<Vec<u8>>> {
130 Ok(Some(
131 RunEndMetadata {
132 ends_ptype: PType::try_from(array.ends().dtype())
133 .vortex_expect("Must be a valid PType") as i32,
134 num_runs: array.ends().len() as u64,
135 offset: array.offset() as u64,
136 }
137 .encode_to_vec(),
138 ))
139 }
140
141 fn deserialize(
142 &self,
143 dtype: &DType,
144 len: usize,
145 metadata: &[u8],
146 _buffers: &[BufferHandle],
147 children: &dyn ArrayChildren,
148 _session: &VortexSession,
149 ) -> VortexResult<ArrayParts<Self>> {
150 let metadata = RunEndMetadata::decode(metadata)?;
151 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
152 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
153 let ends = children.get(0, &ends_dtype, runs)?;
154
155 let values = children.get(1, dtype, runs)?;
156 let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
157 let slots = vec![Some(ends), Some(values)];
158 let data = RunEndData::new(offset);
159 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
160 }
161
162 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
163 SLOT_NAMES[idx].to_string()
164 }
165
166 fn reduce_parent(
167 array: ArrayView<'_, Self>,
168 parent: &ArrayRef,
169 child_idx: usize,
170 ) -> VortexResult<Option<ArrayRef>> {
171 RULES.evaluate(array, parent, child_idx)
172 }
173
174 fn execute_parent(
175 array: ArrayView<'_, Self>,
176 parent: &ArrayRef,
177 child_idx: usize,
178 ctx: &mut ExecutionCtx,
179 ) -> VortexResult<Option<ArrayRef>> {
180 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
181 }
182
183 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
184 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
185 }
186}
187
188pub(super) const ENDS_SLOT: usize = 0;
190pub(super) const VALUES_SLOT: usize = 1;
192pub(super) const NUM_SLOTS: usize = 2;
193pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
194
195#[derive(Clone, Debug)]
196pub struct RunEndData {
197 offset: usize,
198}
199
200impl Display for RunEndData {
201 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
202 write!(f, "offset: {}", self.offset)
203 }
204}
205
206pub struct RunEndDataParts {
207 pub ends: ArrayRef,
208 pub values: ArrayRef,
209 pub offset: usize,
210}
211
212pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
213 fn offset(&self) -> usize {
214 self.offset
215 }
216
217 fn ends(&self) -> &ArrayRef {
218 self.as_ref().slots()[ENDS_SLOT]
219 .as_ref()
220 .vortex_expect("RunEndArray ends slot")
221 }
222
223 fn values(&self) -> &ArrayRef {
224 self.as_ref().slots()[VALUES_SLOT]
225 .as_ref()
226 .vortex_expect("RunEndArray values slot")
227 }
228
229 fn dtype(&self) -> &DType {
230 self.values().dtype()
231 }
232
233 fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
234 Ok(self
235 .ends()
236 .as_primitive_typed()
237 .search_sorted(
238 &PValue::from(index + self.offset()),
239 SearchSortedSide::Right,
240 )?
241 .to_ends_index(self.ends().len()))
242 }
243}
244impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
245
246#[derive(Clone, Debug)]
247pub struct RunEnd;
248
249impl RunEnd {
250 pub unsafe fn new_unchecked(
255 ends: ArrayRef,
256 values: ArrayRef,
257 offset: usize,
258 length: usize,
259 ) -> RunEndArray {
260 let dtype = values.dtype().clone();
261 let slots = vec![Some(ends), Some(values)];
262 let data = unsafe { RunEndData::new_unchecked(offset) };
263 unsafe {
264 Array::from_parts_unchecked(
265 ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
266 )
267 }
268 }
269
270 pub fn try_new(
272 ends: ArrayRef,
273 values: ArrayRef,
274 ctx: &mut ExecutionCtx,
275 ) -> VortexResult<RunEndArray> {
276 let len = RunEndData::logical_len_from_ends(&ends, ctx)?;
277 RunEndData::validate_parts(&ends, &values, 0, len, ctx)?;
278 let dtype = values.dtype().clone();
279 let slots = vec![Some(ends), Some(values)];
280 let data = RunEndData::new(0);
281 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
282 }
283
284 pub fn try_new_offset_length(
286 ends: ArrayRef,
287 values: ArrayRef,
288 offset: usize,
289 length: usize,
290 ctx: &mut ExecutionCtx,
291 ) -> VortexResult<RunEndArray> {
292 RunEndData::validate_parts(&ends, &values, offset, length, ctx)?;
293 let dtype = values.dtype().clone();
294 let slots = vec![Some(ends), Some(values)];
295 let data = RunEndData::new(offset);
296 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
297 }
298
299 pub fn new(ends: ArrayRef, values: ArrayRef, ctx: &mut ExecutionCtx) -> RunEndArray {
301 Self::try_new(ends, values, ctx).vortex_expect("RunEndData is always valid")
302 }
303
304 pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<RunEndArray> {
306 if let Some(parray) = array.as_opt::<Primitive>() {
307 let (ends, values) = runend_encode(parray, ctx);
308 let ends = ends.into_array();
309 let len = array.len();
310 let dtype = values.dtype().clone();
311 let slots = vec![Some(ends), Some(values)];
312 let data = unsafe { RunEndData::new_unchecked(0) };
313 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
314 } else {
315 vortex_bail!("REE can only encode primitive arrays")
316 }
317 }
318}
319
320impl RunEndData {
321 fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
322 if ends.is_empty() {
323 Ok(0)
324 } else {
325 usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)
326 }
327 }
328
329 pub(crate) fn validate_parts(
330 ends: &ArrayRef,
331 values: &ArrayRef,
332 offset: usize,
333 length: usize,
334 ctx: &mut ExecutionCtx,
335 ) -> VortexResult<()> {
336 vortex_ensure!(
338 ends.dtype().is_unsigned_int(),
339 "run ends must be unsigned integers, was {}",
340 ends.dtype(),
341 );
342 vortex_ensure!(
343 ends.len() == values.len(),
344 "run ends len != run values len, {} != {}",
345 ends.len(),
346 values.len()
347 );
348
349 if ends.is_empty() {
351 vortex_ensure!(
352 offset == 0,
353 "non-zero offset provided for empty RunEndArray"
354 );
355 return Ok(());
356 }
357
358 if length == 0 {
360 return Ok(());
361 }
362
363 #[cfg(debug_assertions)]
364 {
365 let pre_validation = ends.statistics().to_owned();
367
368 let is_sorted = ends
369 .statistics()
370 .compute_is_strict_sorted(ctx)
371 .unwrap_or(false);
372
373 ends.statistics().inherit(pre_validation.iter());
376 debug_assert!(is_sorted);
377 }
378
379 if !ends.is_host() {
381 return Ok(());
382 }
383
384 if offset != 0 && length != 0 {
386 let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?;
387 if first_run_end < offset {
388 vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
389 }
390 }
391
392 let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?;
393 let min_required_end = offset + length;
394 if last_run_end < min_required_end {
395 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
396 }
397
398 Ok(())
399 }
400}
401
402impl RunEndData {
403 pub fn new(offset: usize) -> Self {
431 Self { offset }
432 }
433
434 pub unsafe fn new_unchecked(offset: usize) -> Self {
444 Self { offset }
445 }
446
447 pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
449 if let Some(parray) = array.as_opt::<Primitive>() {
450 let (_ends, _values) = runend_encode(parray, ctx);
451 unsafe { Ok(Self::new_unchecked(0)) }
453 } else {
454 vortex_bail!("REE can only encode primitive arrays")
455 }
456 }
457
458 pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
459 RunEndDataParts {
460 ends,
461 values,
462 offset: self.offset,
463 }
464 }
465}
466
467impl ValidityVTable<RunEnd> for RunEnd {
468 fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
469 Ok(match array.values().validity()? {
470 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
471 Validity::AllInvalid => Validity::AllInvalid,
472 Validity::Array(values_validity) => Validity::Array(unsafe {
473 RunEnd::new_unchecked(
474 array.ends().clone(),
475 values_validity,
476 array.offset(),
477 array.len(),
478 )
479 .into_array()
480 }),
481 })
482 }
483}
484
485pub(super) fn run_end_canonicalize(
486 array: &RunEndArray,
487 ctx: &mut ExecutionCtx,
488) -> VortexResult<ArrayRef> {
489 let pends = array.ends().clone().execute_as("ends", ctx)?;
490
491 Ok(match array.dtype() {
492 DType::Bool(_) => {
493 let bools = array.values().clone().execute_as("values", ctx)?;
494 runend_decode_bools(pends, bools, array.offset(), array.len(), ctx)?
495 }
496 DType::Primitive(..) => {
497 let pvalues = array.values().clone().execute_as("values", ctx)?;
498 runend_decode_primitive(pends, pvalues, array.offset(), array.len(), ctx)?.into_array()
499 }
500 DType::Utf8(_) | DType::Binary(_) => {
501 let values = array
502 .values()
503 .clone()
504 .execute_as::<VarBinViewArray>("values", ctx)?;
505 runend_decode_varbinview(pends, values, array.offset(), array.len(), ctx)?.into_array()
506 }
507 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
508 })
509}
510
511#[cfg(test)]
512mod tests {
513 use std::sync::LazyLock;
514
515 use vortex_array::IntoArray;
516 use vortex_array::VortexSessionExecute;
517 use vortex_array::arrays::DictArray;
518 use vortex_array::arrays::VarBinViewArray;
519 use vortex_array::assert_arrays_eq;
520 use vortex_array::dtype::DType;
521 use vortex_array::dtype::Nullability;
522 use vortex_array::dtype::PType;
523 use vortex_array::session::ArraySession;
524 use vortex_buffer::buffer;
525 use vortex_session::VortexSession;
526
527 use crate::RunEnd;
528
529 static SESSION: LazyLock<VortexSession> =
530 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
531
532 #[test]
533 fn test_runend_constructor() {
534 let mut ctx = SESSION.create_execution_ctx();
535 let arr = RunEnd::new(
536 buffer![2u32, 5, 10].into_array(),
537 buffer![1i32, 2, 3].into_array(),
538 &mut ctx,
539 );
540 assert_eq!(arr.len(), 10);
541 assert_eq!(
542 arr.dtype(),
543 &DType::Primitive(PType::I32, Nullability::NonNullable)
544 );
545
546 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
550 assert_arrays_eq!(arr.into_array(), expected);
551 }
552
553 #[test]
554 fn test_runend_utf8() {
555 let mut ctx = SESSION.create_execution_ctx();
556 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
557 let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values, &mut ctx);
558 assert_eq!(arr.len(), 10);
559 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
560
561 let expected =
562 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
563 .into_array();
564 assert_arrays_eq!(arr.into_array(), expected);
565 }
566
567 #[test]
568 fn test_runend_dict() {
569 let mut ctx = SESSION.create_execution_ctx();
570 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
571 let dict_codes = buffer![0u32, 1, 2].into_array();
572 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
573
574 let arr = RunEnd::try_new(
575 buffer![2u32, 5, 10].into_array(),
576 dict.into_array(),
577 &mut ctx,
578 )
579 .unwrap();
580 assert_eq!(arr.len(), 10);
581
582 let expected =
583 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
584 .into_array();
585 assert_arrays_eq!(arr.into_array(), expected);
586 }
587}