1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::smallvec::smallvec;
36use vortex_array::validity::Validity;
37use vortex_array::vtable::VTable;
38use vortex_array::vtable::ValidityVTable;
39use vortex_error::VortexExpect as _;
40use vortex_error::VortexResult;
41use vortex_error::vortex_bail;
42use vortex_error::vortex_ensure;
43use vortex_error::vortex_panic;
44use vortex_session::VortexSession;
45use vortex_session::registry::CachedId;
46
47use crate::compress::runend_decode_primitive;
48use crate::compress::runend_decode_varbinview;
49use crate::compress::runend_encode;
50use crate::decompress_bool::runend_decode_bools;
51use crate::kernel::PARENT_KERNELS;
52use crate::rules::RULES;
53
54pub type RunEndArray = Array<RunEnd>;
56
57#[derive(Clone, prost::Message)]
58pub struct RunEndMetadata {
59 #[prost(enumeration = "PType", tag = "1")]
60 pub ends_ptype: i32,
61 #[prost(uint64, tag = "2")]
62 pub num_runs: u64,
63 #[prost(uint64, tag = "3")]
64 pub offset: u64,
65}
66
67impl ArrayHash for RunEndData {
68 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
69 self.offset.hash(state);
70 }
71}
72
73impl ArrayEq for RunEndData {
74 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
75 self.offset == other.offset
76 }
77}
78
79impl VTable for RunEnd {
80 type TypedArrayData = RunEndData;
81
82 type OperationsVTable = Self;
83 type ValidityVTable = Self;
84
85 fn id(&self) -> ArrayId {
86 static ID: CachedId = CachedId::new("vortex.runend");
87 *ID
88 }
89
90 fn validate(
91 &self,
92 data: &Self::TypedArrayData,
93 dtype: &DType,
94 len: usize,
95 slots: &[Option<ArrayRef>],
96 ) -> VortexResult<()> {
97 let ends = slots[ENDS_SLOT]
98 .as_ref()
99 .vortex_expect("RunEndArray ends slot");
100 let values = slots[VALUES_SLOT]
101 .as_ref()
102 .vortex_expect("RunEndArray values slot");
103 let mut ctx = LEGACY_SESSION.create_execution_ctx();
105 RunEndData::validate_parts(ends, values, data.offset, len, &mut ctx)?;
106 vortex_ensure!(
107 values.dtype() == dtype,
108 "expected dtype {}, got {}",
109 dtype,
110 values.dtype()
111 );
112 Ok(())
113 }
114
115 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
116 0
117 }
118
119 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
120 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
121 }
122
123 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
124 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
125 }
126
127 fn serialize(
128 array: ArrayView<'_, Self>,
129 _session: &VortexSession,
130 ) -> VortexResult<Option<Vec<u8>>> {
131 Ok(Some(
132 RunEndMetadata {
133 ends_ptype: PType::try_from(array.ends().dtype())
134 .vortex_expect("Must be a valid PType") as i32,
135 num_runs: array.ends().len() as u64,
136 offset: array.offset() as u64,
137 }
138 .encode_to_vec(),
139 ))
140 }
141
142 fn deserialize(
143 &self,
144 dtype: &DType,
145 len: usize,
146 metadata: &[u8],
147 _buffers: &[BufferHandle],
148 children: &dyn ArrayChildren,
149 _session: &VortexSession,
150 ) -> VortexResult<ArrayParts<Self>> {
151 let metadata = RunEndMetadata::decode(metadata)?;
152 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
153 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
154 let ends = children.get(0, &ends_dtype, runs)?;
155
156 let values = children.get(1, dtype, runs)?;
157 let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
158 let slots = smallvec![Some(ends), Some(values)];
159 let data = RunEndData::new(offset);
160 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
161 }
162
163 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
164 SLOT_NAMES[idx].to_string()
165 }
166
167 fn reduce_parent(
168 array: ArrayView<'_, Self>,
169 parent: &ArrayRef,
170 child_idx: usize,
171 ) -> VortexResult<Option<ArrayRef>> {
172 RULES.evaluate(array, parent, child_idx)
173 }
174
175 fn execute_parent(
176 array: ArrayView<'_, Self>,
177 parent: &ArrayRef,
178 child_idx: usize,
179 ctx: &mut ExecutionCtx,
180 ) -> VortexResult<Option<ArrayRef>> {
181 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
182 }
183
184 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
185 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
186 }
187}
188
189pub(super) const ENDS_SLOT: usize = 0;
191pub(super) const VALUES_SLOT: usize = 1;
193pub(super) const NUM_SLOTS: usize = 2;
194pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
195
196#[derive(Clone, Debug)]
197pub struct RunEndData {
198 offset: usize,
199}
200
201impl Display for RunEndData {
202 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
203 write!(f, "offset: {}", self.offset)
204 }
205}
206
207pub struct RunEndDataParts {
208 pub ends: ArrayRef,
209 pub values: ArrayRef,
210 pub offset: usize,
211}
212
213pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
214 fn offset(&self) -> usize {
215 self.offset
216 }
217
218 fn ends(&self) -> &ArrayRef {
219 self.as_ref().slots()[ENDS_SLOT]
220 .as_ref()
221 .vortex_expect("RunEndArray ends slot")
222 }
223
224 fn values(&self) -> &ArrayRef {
225 self.as_ref().slots()[VALUES_SLOT]
226 .as_ref()
227 .vortex_expect("RunEndArray values slot")
228 }
229
230 fn dtype(&self) -> &DType {
231 self.values().dtype()
232 }
233
234 fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
235 Ok(self
236 .ends()
237 .as_primitive_typed()
238 .search_sorted(
239 &PValue::from(index + self.offset()),
240 SearchSortedSide::Right,
241 )?
242 .to_ends_index(self.ends().len()))
243 }
244}
245impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
246
247#[derive(Clone, Debug)]
248pub struct RunEnd;
249
250impl RunEnd {
251 pub unsafe fn new_unchecked(
256 ends: ArrayRef,
257 values: ArrayRef,
258 offset: usize,
259 length: usize,
260 ) -> RunEndArray {
261 let dtype = values.dtype().clone();
262 let slots = smallvec![Some(ends), Some(values)];
263 let data = unsafe { RunEndData::new_unchecked(offset) };
264 unsafe {
265 Array::from_parts_unchecked(
266 ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
267 )
268 }
269 }
270
271 pub fn try_new(
273 ends: ArrayRef,
274 values: ArrayRef,
275 ctx: &mut ExecutionCtx,
276 ) -> VortexResult<RunEndArray> {
277 let len = RunEndData::logical_len_from_ends(&ends, ctx)?;
278 RunEndData::validate_parts(&ends, &values, 0, len, ctx)?;
279 let dtype = values.dtype().clone();
280 let slots = smallvec![Some(ends), Some(values)];
281 let data = RunEndData::new(0);
282 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
283 }
284
285 pub fn try_new_offset_length(
287 ends: ArrayRef,
288 values: ArrayRef,
289 offset: usize,
290 length: usize,
291 ctx: &mut ExecutionCtx,
292 ) -> VortexResult<RunEndArray> {
293 RunEndData::validate_parts(&ends, &values, offset, length, ctx)?;
294 let dtype = values.dtype().clone();
295 let slots = smallvec![Some(ends), Some(values)];
296 let data = RunEndData::new(offset);
297 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
298 }
299
300 pub fn new(ends: ArrayRef, values: ArrayRef, ctx: &mut ExecutionCtx) -> RunEndArray {
302 Self::try_new(ends, values, ctx).vortex_expect("RunEndData is always valid")
303 }
304
305 pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<RunEndArray> {
307 if let Some(parray) = array.as_opt::<Primitive>() {
308 let (ends, values) = runend_encode(parray, ctx);
309 let ends = ends.into_array();
310 let len = array.len();
311 let dtype = values.dtype().clone();
312 let slots = smallvec![Some(ends), Some(values)];
313 let data = unsafe { RunEndData::new_unchecked(0) };
314 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
315 } else {
316 vortex_bail!("REE can only encode primitive arrays")
317 }
318 }
319}
320
321impl RunEndData {
322 fn logical_len_from_ends(ends: &ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<usize> {
323 if ends.is_empty() {
324 Ok(0)
325 } else {
326 usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)
327 }
328 }
329
330 pub(crate) fn validate_parts(
331 ends: &ArrayRef,
332 values: &ArrayRef,
333 offset: usize,
334 length: usize,
335 ctx: &mut ExecutionCtx,
336 ) -> VortexResult<()> {
337 vortex_ensure!(
339 ends.dtype().is_unsigned_int(),
340 "run ends must be unsigned integers, was {}",
341 ends.dtype(),
342 );
343 vortex_ensure!(
344 ends.len() == values.len(),
345 "run ends len != run values len, {} != {}",
346 ends.len(),
347 values.len()
348 );
349
350 if ends.is_empty() {
352 vortex_ensure!(
353 offset == 0,
354 "non-zero offset provided for empty RunEndArray"
355 );
356 return Ok(());
357 }
358
359 if length == 0 {
361 return Ok(());
362 }
363
364 #[cfg(debug_assertions)]
365 {
366 let pre_validation = ends.statistics().to_owned();
368
369 let is_sorted = ends
370 .statistics()
371 .compute_is_strict_sorted(ctx)
372 .unwrap_or(false);
373
374 ends.statistics().inherit(pre_validation.iter());
377 debug_assert!(is_sorted);
378 }
379
380 if !ends.is_host() {
382 return Ok(());
383 }
384
385 if offset != 0 && length != 0 {
387 let first_run_end = usize::try_from(&ends.execute_scalar(0, ctx)?)?;
388 if first_run_end < offset {
389 vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
390 }
391 }
392
393 let last_run_end = usize::try_from(&ends.execute_scalar(ends.len() - 1, ctx)?)?;
394 let min_required_end = offset + length;
395 if last_run_end < min_required_end {
396 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
397 }
398
399 Ok(())
400 }
401}
402
403impl RunEndData {
404 pub fn new(offset: usize) -> Self {
432 Self { offset }
433 }
434
435 pub unsafe fn new_unchecked(offset: usize) -> Self {
445 Self { offset }
446 }
447
448 pub fn encode(array: ArrayRef, ctx: &mut ExecutionCtx) -> VortexResult<Self> {
450 if let Some(parray) = array.as_opt::<Primitive>() {
451 let (_ends, _values) = runend_encode(parray, ctx);
452 unsafe { Ok(Self::new_unchecked(0)) }
454 } else {
455 vortex_bail!("REE can only encode primitive arrays")
456 }
457 }
458
459 pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
460 RunEndDataParts {
461 ends,
462 values,
463 offset: self.offset,
464 }
465 }
466}
467
468impl ValidityVTable<RunEnd> for RunEnd {
469 fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
470 Ok(match array.values().validity()? {
471 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
472 Validity::AllInvalid => Validity::AllInvalid,
473 Validity::Array(values_validity) => Validity::Array(unsafe {
474 RunEnd::new_unchecked(
475 array.ends().clone(),
476 values_validity,
477 array.offset(),
478 array.len(),
479 )
480 .into_array()
481 }),
482 })
483 }
484}
485
486pub(super) fn run_end_canonicalize(
487 array: &RunEndArray,
488 ctx: &mut ExecutionCtx,
489) -> VortexResult<ArrayRef> {
490 let pends = array.ends().clone().execute_as("ends", ctx)?;
491
492 Ok(match array.dtype() {
493 DType::Bool(_) => {
494 let bools = array.values().clone().execute_as("values", ctx)?;
495 runend_decode_bools(pends, bools, array.offset(), array.len(), ctx)?
496 }
497 DType::Primitive(..) => {
498 let pvalues = array.values().clone().execute_as("values", ctx)?;
499 runend_decode_primitive(pends, pvalues, array.offset(), array.len(), ctx)?.into_array()
500 }
501 DType::Utf8(_) | DType::Binary(_) => {
502 let values = array
503 .values()
504 .clone()
505 .execute_as::<VarBinViewArray>("values", ctx)?;
506 runend_decode_varbinview(pends, values, array.offset(), array.len(), ctx)?.into_array()
507 }
508 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
509 })
510}
511
512#[cfg(test)]
513mod tests {
514 use std::sync::LazyLock;
515
516 use vortex_array::IntoArray;
517 use vortex_array::VortexSessionExecute;
518 use vortex_array::arrays::DictArray;
519 use vortex_array::arrays::VarBinViewArray;
520 use vortex_array::assert_arrays_eq;
521 use vortex_array::dtype::DType;
522 use vortex_array::dtype::Nullability;
523 use vortex_array::dtype::PType;
524 use vortex_array::session::ArraySession;
525 use vortex_buffer::buffer;
526 use vortex_session::VortexSession;
527
528 use crate::RunEnd;
529
530 static SESSION: LazyLock<VortexSession> =
531 LazyLock::new(|| VortexSession::empty().with::<ArraySession>());
532
533 #[test]
534 fn test_runend_constructor() {
535 let mut ctx = SESSION.create_execution_ctx();
536 let arr = RunEnd::new(
537 buffer![2u32, 5, 10].into_array(),
538 buffer![1i32, 2, 3].into_array(),
539 &mut ctx,
540 );
541 assert_eq!(arr.len(), 10);
542 assert_eq!(
543 arr.dtype(),
544 &DType::Primitive(PType::I32, Nullability::NonNullable)
545 );
546
547 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
551 assert_arrays_eq!(arr.into_array(), expected);
552 }
553
554 #[test]
555 fn test_runend_utf8() {
556 let mut ctx = SESSION.create_execution_ctx();
557 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
558 let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values, &mut ctx);
559 assert_eq!(arr.len(), 10);
560 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
561
562 let expected =
563 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
564 .into_array();
565 assert_arrays_eq!(arr.into_array(), expected);
566 }
567
568 #[test]
569 fn test_runend_dict() {
570 let mut ctx = SESSION.create_execution_ctx();
571 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
572 let dict_codes = buffer![0u32, 1, 2].into_array();
573 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
574
575 let arr = RunEnd::try_new(
576 buffer![2u32, 5, 10].into_array(),
577 dict.into_array(),
578 &mut ctx,
579 )
580 .unwrap();
581 assert_eq!(arr.len(), 10);
582
583 let expected =
584 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
585 .into_array();
586 assert_arrays_eq!(arr.into_array(), expected);
587 }
588}