1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::LEGACY_SESSION;
22use vortex_array::Precision;
23use vortex_array::TypedArrayRef;
24use vortex_array::VortexSessionExecute;
25use vortex_array::arrays::Primitive;
26use vortex_array::arrays::VarBinViewArray;
27use vortex_array::buffer::BufferHandle;
28use vortex_array::dtype::DType;
29use vortex_array::dtype::Nullability;
30use vortex_array::dtype::PType;
31use vortex_array::scalar::PValue;
32use vortex_array::search_sorted::SearchSorted;
33use vortex_array::search_sorted::SearchSortedSide;
34use vortex_array::serde::ArrayChildren;
35use vortex_array::validity::Validity;
36use vortex_array::vtable::VTable;
37use vortex_array::vtable::ValidityVTable;
38use vortex_error::VortexExpect as _;
39use vortex_error::VortexResult;
40use vortex_error::vortex_bail;
41use vortex_error::vortex_ensure;
42use vortex_error::vortex_panic;
43use vortex_session::VortexSession;
44use vortex_session::registry::CachedId;
45
46use crate::compress::runend_decode_primitive;
47use crate::compress::runend_decode_varbinview;
48use crate::compress::runend_encode;
49use crate::decompress_bool::runend_decode_bools;
50use crate::kernel::PARENT_KERNELS;
51use crate::rules::RULES;
52
53pub type RunEndArray = Array<RunEnd>;
55
56#[derive(Clone, prost::Message)]
57pub struct RunEndMetadata {
58 #[prost(enumeration = "PType", tag = "1")]
59 pub ends_ptype: i32,
60 #[prost(uint64, tag = "2")]
61 pub num_runs: u64,
62 #[prost(uint64, tag = "3")]
63 pub offset: u64,
64}
65
66impl ArrayHash for RunEndData {
67 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
68 self.offset.hash(state);
69 }
70}
71
72impl ArrayEq for RunEndData {
73 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
74 self.offset == other.offset
75 }
76}
77
78impl VTable for RunEnd {
79 type ArrayData = RunEndData;
80
81 type OperationsVTable = Self;
82 type ValidityVTable = Self;
83
84 fn id(&self) -> ArrayId {
85 static ID: CachedId = CachedId::new("vortex.runend");
86 *ID
87 }
88
89 fn validate(
90 &self,
91 data: &Self::ArrayData,
92 dtype: &DType,
93 len: usize,
94 slots: &[Option<ArrayRef>],
95 ) -> VortexResult<()> {
96 let ends = slots[ENDS_SLOT]
97 .as_ref()
98 .vortex_expect("RunEndArray ends slot");
99 let values = slots[VALUES_SLOT]
100 .as_ref()
101 .vortex_expect("RunEndArray values slot");
102 RunEndData::validate_parts(ends, values, data.offset, len)?;
103 vortex_ensure!(
104 values.dtype() == dtype,
105 "expected dtype {}, got {}",
106 dtype,
107 values.dtype()
108 );
109 Ok(())
110 }
111
112 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
113 0
114 }
115
116 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
117 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
118 }
119
120 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
121 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
122 }
123
124 fn serialize(
125 array: ArrayView<'_, Self>,
126 _session: &VortexSession,
127 ) -> VortexResult<Option<Vec<u8>>> {
128 Ok(Some(
129 RunEndMetadata {
130 ends_ptype: PType::try_from(array.ends().dtype())
131 .vortex_expect("Must be a valid PType") as i32,
132 num_runs: array.ends().len() as u64,
133 offset: array.offset() as u64,
134 }
135 .encode_to_vec(),
136 ))
137 }
138
139 fn deserialize(
140 &self,
141 dtype: &DType,
142 len: usize,
143 metadata: &[u8],
144 _buffers: &[BufferHandle],
145 children: &dyn ArrayChildren,
146 _session: &VortexSession,
147 ) -> VortexResult<ArrayParts<Self>> {
148 let metadata = RunEndMetadata::decode(metadata)?;
149 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
150 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
151 let ends = children.get(0, &ends_dtype, runs)?;
152
153 let values = children.get(1, dtype, runs)?;
154 let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
155 let slots = vec![Some(ends), Some(values)];
156 let data = RunEndData::new(offset);
157 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
158 }
159
160 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
161 SLOT_NAMES[idx].to_string()
162 }
163
164 fn reduce_parent(
165 array: ArrayView<'_, Self>,
166 parent: &ArrayRef,
167 child_idx: usize,
168 ) -> VortexResult<Option<ArrayRef>> {
169 RULES.evaluate(array, parent, child_idx)
170 }
171
172 fn execute_parent(
173 array: ArrayView<'_, Self>,
174 parent: &ArrayRef,
175 child_idx: usize,
176 ctx: &mut ExecutionCtx,
177 ) -> VortexResult<Option<ArrayRef>> {
178 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
179 }
180
181 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
182 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
183 }
184}
185
186pub(super) const ENDS_SLOT: usize = 0;
188pub(super) const VALUES_SLOT: usize = 1;
190pub(super) const NUM_SLOTS: usize = 2;
191pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
192
193#[derive(Clone, Debug)]
194pub struct RunEndData {
195 offset: usize,
196}
197
198impl Display for RunEndData {
199 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
200 write!(f, "offset: {}", self.offset)
201 }
202}
203
204pub struct RunEndDataParts {
205 pub ends: ArrayRef,
206 pub values: ArrayRef,
207 pub offset: usize,
208}
209
210pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
211 fn offset(&self) -> usize {
212 self.offset
213 }
214
215 fn ends(&self) -> &ArrayRef {
216 self.as_ref().slots()[ENDS_SLOT]
217 .as_ref()
218 .vortex_expect("RunEndArray ends slot")
219 }
220
221 fn values(&self) -> &ArrayRef {
222 self.as_ref().slots()[VALUES_SLOT]
223 .as_ref()
224 .vortex_expect("RunEndArray values slot")
225 }
226
227 fn dtype(&self) -> &DType {
228 self.values().dtype()
229 }
230
231 fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
232 Ok(self
233 .ends()
234 .as_primitive_typed()
235 .search_sorted(
236 &PValue::from(index + self.offset()),
237 SearchSortedSide::Right,
238 )?
239 .to_ends_index(self.ends().len()))
240 }
241}
242impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
243
244#[derive(Clone, Debug)]
245pub struct RunEnd;
246
247impl RunEnd {
248 pub unsafe fn new_unchecked(
253 ends: ArrayRef,
254 values: ArrayRef,
255 offset: usize,
256 length: usize,
257 ) -> RunEndArray {
258 let dtype = values.dtype().clone();
259 let slots = vec![Some(ends.clone()), Some(values.clone())];
260 RunEndData::validate_parts(&ends, &values, offset, length)
261 .vortex_expect("RunEndArray validation failed");
262 let data = unsafe { RunEndData::new_unchecked(offset) };
263 unsafe {
264 Array::from_parts_unchecked(
265 ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
266 )
267 }
268 }
269
270 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<RunEndArray> {
272 let len = RunEndData::logical_len_from_ends(&ends)?;
273 let dtype = values.dtype().clone();
274 let slots = vec![Some(ends), Some(values)];
275 let data = RunEndData::new(0);
276 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
277 }
278
279 pub fn try_new_offset_length(
281 ends: ArrayRef,
282 values: ArrayRef,
283 offset: usize,
284 length: usize,
285 ) -> VortexResult<RunEndArray> {
286 let dtype = values.dtype().clone();
287 let slots = vec![Some(ends), Some(values)];
288 let data = RunEndData::new(offset);
289 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
290 }
291
292 pub fn new(ends: ArrayRef, values: ArrayRef) -> RunEndArray {
294 Self::try_new(ends, values).vortex_expect("RunEndData is always valid")
295 }
296
297 pub fn encode(array: ArrayRef) -> VortexResult<RunEndArray> {
299 if let Some(parray) = array.as_opt::<Primitive>() {
300 let (ends, values) = runend_encode(parray);
301 let ends = ends.into_array();
302 let len = array.len();
303 let dtype = values.dtype().clone();
304 let slots = vec![Some(ends), Some(values)];
305 let data = unsafe { RunEndData::new_unchecked(0) };
306 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
307 } else {
308 vortex_bail!("REE can only encode primitive arrays")
309 }
310 }
311}
312
313impl RunEndData {
314 fn logical_len_from_ends(ends: &ArrayRef) -> VortexResult<usize> {
315 if ends.is_empty() {
316 Ok(0)
317 } else {
318 usize::try_from(
319 &ends.execute_scalar(ends.len() - 1, &mut LEGACY_SESSION.create_execution_ctx())?,
320 )
321 }
322 }
323
324 pub(crate) fn validate_parts(
325 ends: &ArrayRef,
326 values: &ArrayRef,
327 offset: usize,
328 length: usize,
329 ) -> VortexResult<()> {
330 vortex_ensure!(
332 ends.dtype().is_unsigned_int(),
333 "run ends must be unsigned integers, was {}",
334 ends.dtype(),
335 );
336 vortex_ensure!(
337 ends.len() == values.len(),
338 "run ends len != run values len, {} != {}",
339 ends.len(),
340 values.len()
341 );
342
343 if ends.is_empty() {
345 vortex_ensure!(
346 offset == 0,
347 "non-zero offset provided for empty RunEndArray"
348 );
349 return Ok(());
350 }
351
352 if length == 0 {
354 return Ok(());
355 }
356
357 #[cfg(debug_assertions)]
358 {
359 let pre_validation = ends.statistics().to_owned();
361
362 let mut ctx = LEGACY_SESSION.create_execution_ctx();
363 let is_sorted = ends
364 .statistics()
365 .compute_is_strict_sorted(&mut ctx)
366 .unwrap_or(false);
367
368 ends.statistics().inherit(pre_validation.iter());
371 debug_assert!(is_sorted);
372 }
373
374 if !ends.is_host() {
376 return Ok(());
377 }
378
379 if offset != 0 && length != 0 {
381 let first_run_end = usize::try_from(
382 &ends.execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?,
383 )?;
384 if first_run_end < offset {
385 vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
386 }
387 }
388
389 let last_run_end = usize::try_from(
390 &ends.execute_scalar(ends.len() - 1, &mut LEGACY_SESSION.create_execution_ctx())?,
391 )?;
392 let min_required_end = offset + length;
393 if last_run_end < min_required_end {
394 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
395 }
396
397 Ok(())
398 }
399}
400
401impl RunEndData {
402 pub fn new(offset: usize) -> Self {
430 Self { offset }
431 }
432
433 pub unsafe fn new_unchecked(offset: usize) -> Self {
443 Self { offset }
444 }
445
446 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
448 if let Some(parray) = array.as_opt::<Primitive>() {
449 let (_ends, _values) = runend_encode(parray);
450 unsafe { Ok(Self::new_unchecked(0)) }
452 } else {
453 vortex_bail!("REE can only encode primitive arrays")
454 }
455 }
456
457 pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
458 RunEndDataParts {
459 ends,
460 values,
461 offset: self.offset,
462 }
463 }
464}
465
466impl ValidityVTable<RunEnd> for RunEnd {
467 fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
468 Ok(match array.values().validity()? {
469 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
470 Validity::AllInvalid => Validity::AllInvalid,
471 Validity::Array(values_validity) => Validity::Array(unsafe {
472 RunEnd::new_unchecked(
473 array.ends().clone(),
474 values_validity,
475 array.offset(),
476 array.len(),
477 )
478 .into_array()
479 }),
480 })
481 }
482}
483
484pub(super) fn run_end_canonicalize(
485 array: &RunEndArray,
486 ctx: &mut ExecutionCtx,
487) -> VortexResult<ArrayRef> {
488 let pends = array.ends().clone().execute_as("ends", ctx)?;
489
490 Ok(match array.dtype() {
491 DType::Bool(_) => {
492 let bools = array.values().clone().execute_as("values", ctx)?;
493 runend_decode_bools(pends, bools, array.offset(), array.len())?
494 }
495 DType::Primitive(..) => {
496 let pvalues = array.values().clone().execute_as("values", ctx)?;
497 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
498 }
499 DType::Utf8(_) | DType::Binary(_) => {
500 let values = array
501 .values()
502 .clone()
503 .execute_as::<VarBinViewArray>("values", ctx)?;
504 runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
505 }
506 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
507 })
508}
509
510#[cfg(test)]
511mod tests {
512 use vortex_array::IntoArray;
513 use vortex_array::arrays::DictArray;
514 use vortex_array::arrays::VarBinViewArray;
515 use vortex_array::assert_arrays_eq;
516 use vortex_array::dtype::DType;
517 use vortex_array::dtype::Nullability;
518 use vortex_array::dtype::PType;
519 use vortex_buffer::buffer;
520
521 use crate::RunEnd;
522
523 #[test]
524 fn test_runend_constructor() {
525 let arr = RunEnd::new(
526 buffer![2u32, 5, 10].into_array(),
527 buffer![1i32, 2, 3].into_array(),
528 );
529 assert_eq!(arr.len(), 10);
530 assert_eq!(
531 arr.dtype(),
532 &DType::Primitive(PType::I32, Nullability::NonNullable)
533 );
534
535 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
539 assert_arrays_eq!(arr.into_array(), expected);
540 }
541
542 #[test]
543 fn test_runend_utf8() {
544 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
545 let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values);
546 assert_eq!(arr.len(), 10);
547 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
548
549 let expected =
550 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
551 .into_array();
552 assert_arrays_eq!(arr.into_array(), expected);
553 }
554
555 #[test]
556 fn test_runend_dict() {
557 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
558 let dict_codes = buffer![0u32, 1, 2].into_array();
559 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
560
561 let arr = RunEnd::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
562 assert_eq!(arr.len(), 10);
563
564 let expected =
565 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
566 .into_array();
567 assert_arrays_eq!(arr.into_array(), expected);
568 }
569}