1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use prost::Message;
11use vortex_array::Array;
12use vortex_array::ArrayEq;
13use vortex_array::ArrayHash;
14use vortex_array::ArrayId;
15use vortex_array::ArrayParts;
16use vortex_array::ArrayRef;
17use vortex_array::ArrayView;
18use vortex_array::ExecutionCtx;
19use vortex_array::ExecutionResult;
20use vortex_array::IntoArray;
21use vortex_array::Precision;
22use vortex_array::TypedArrayRef;
23use vortex_array::arrays::Primitive;
24use vortex_array::arrays::VarBinViewArray;
25use vortex_array::buffer::BufferHandle;
26use vortex_array::dtype::DType;
27use vortex_array::dtype::Nullability;
28use vortex_array::dtype::PType;
29use vortex_array::scalar::PValue;
30use vortex_array::search_sorted::SearchSorted;
31use vortex_array::search_sorted::SearchSortedSide;
32use vortex_array::serde::ArrayChildren;
33use vortex_array::validity::Validity;
34use vortex_array::vtable::VTable;
35use vortex_array::vtable::ValidityVTable;
36use vortex_error::VortexExpect as _;
37use vortex_error::VortexResult;
38use vortex_error::vortex_bail;
39use vortex_error::vortex_ensure;
40use vortex_error::vortex_panic;
41use vortex_session::VortexSession;
42
43use crate::compress::runend_decode_primitive;
44use crate::compress::runend_decode_varbinview;
45use crate::compress::runend_encode;
46use crate::decompress_bool::runend_decode_bools;
47use crate::kernel::PARENT_KERNELS;
48use crate::rules::RULES;
49
50pub type RunEndArray = Array<RunEnd>;
52
53#[derive(Clone, prost::Message)]
54pub struct RunEndMetadata {
55 #[prost(enumeration = "PType", tag = "1")]
56 pub ends_ptype: i32,
57 #[prost(uint64, tag = "2")]
58 pub num_runs: u64,
59 #[prost(uint64, tag = "3")]
60 pub offset: u64,
61}
62
63impl ArrayHash for RunEndData {
64 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
65 self.offset.hash(state);
66 }
67}
68
69impl ArrayEq for RunEndData {
70 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
71 self.offset == other.offset
72 }
73}
74
75impl VTable for RunEnd {
76 type ArrayData = RunEndData;
77
78 type OperationsVTable = Self;
79 type ValidityVTable = Self;
80
81 fn id(&self) -> ArrayId {
82 Self::ID
83 }
84
85 fn validate(
86 &self,
87 data: &Self::ArrayData,
88 dtype: &DType,
89 len: usize,
90 slots: &[Option<ArrayRef>],
91 ) -> VortexResult<()> {
92 let ends = slots[ENDS_SLOT]
93 .as_ref()
94 .vortex_expect("RunEndArray ends slot");
95 let values = slots[VALUES_SLOT]
96 .as_ref()
97 .vortex_expect("RunEndArray values slot");
98 RunEndData::validate_parts(ends, values, data.offset, len)?;
99 vortex_ensure!(
100 values.dtype() == dtype,
101 "expected dtype {}, got {}",
102 dtype,
103 values.dtype()
104 );
105 Ok(())
106 }
107
108 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
109 0
110 }
111
112 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
113 vortex_panic!("RunEndArray buffer index {idx} out of bounds")
114 }
115
116 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
117 vortex_panic!("RunEndArray buffer_name index {idx} out of bounds")
118 }
119
120 fn serialize(
121 array: ArrayView<'_, Self>,
122 _session: &VortexSession,
123 ) -> VortexResult<Option<Vec<u8>>> {
124 Ok(Some(
125 RunEndMetadata {
126 ends_ptype: PType::try_from(array.ends().dtype())
127 .vortex_expect("Must be a valid PType") as i32,
128 num_runs: array.ends().len() as u64,
129 offset: array.offset() as u64,
130 }
131 .encode_to_vec(),
132 ))
133 }
134
135 fn deserialize(
136 &self,
137 dtype: &DType,
138 len: usize,
139 metadata: &[u8],
140 _buffers: &[BufferHandle],
141 children: &dyn ArrayChildren,
142 _session: &VortexSession,
143 ) -> VortexResult<ArrayParts<Self>> {
144 let metadata = RunEndMetadata::decode(metadata)?;
145 let ends_dtype = DType::Primitive(metadata.ends_ptype(), Nullability::NonNullable);
146 let runs = usize::try_from(metadata.num_runs).vortex_expect("Must be a valid usize");
147 let ends = children.get(0, &ends_dtype, runs)?;
148
149 let values = children.get(1, dtype, runs)?;
150 let offset = usize::try_from(metadata.offset).vortex_expect("Offset must be a valid usize");
151 let slots = vec![Some(ends), Some(values)];
152 let data = RunEndData::new(offset);
153 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
154 }
155
156 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
157 SLOT_NAMES[idx].to_string()
158 }
159
160 fn reduce_parent(
161 array: ArrayView<'_, Self>,
162 parent: &ArrayRef,
163 child_idx: usize,
164 ) -> VortexResult<Option<ArrayRef>> {
165 RULES.evaluate(array, parent, child_idx)
166 }
167
168 fn execute_parent(
169 array: ArrayView<'_, Self>,
170 parent: &ArrayRef,
171 child_idx: usize,
172 ctx: &mut ExecutionCtx,
173 ) -> VortexResult<Option<ArrayRef>> {
174 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
175 }
176
177 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
178 run_end_canonicalize(&array, ctx).map(ExecutionResult::done)
179 }
180}
181
182pub(super) const ENDS_SLOT: usize = 0;
184pub(super) const VALUES_SLOT: usize = 1;
186pub(super) const NUM_SLOTS: usize = 2;
187pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["ends", "values"];
188
189#[derive(Clone, Debug)]
190pub struct RunEndData {
191 offset: usize,
192}
193
194impl Display for RunEndData {
195 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
196 write!(f, "offset: {}", self.offset)
197 }
198}
199
200pub struct RunEndDataParts {
201 pub ends: ArrayRef,
202 pub values: ArrayRef,
203 pub offset: usize,
204}
205
206pub trait RunEndArrayExt: TypedArrayRef<RunEnd> {
207 fn offset(&self) -> usize {
208 self.offset
209 }
210
211 fn ends(&self) -> &ArrayRef {
212 self.as_ref().slots()[ENDS_SLOT]
213 .as_ref()
214 .vortex_expect("RunEndArray ends slot")
215 }
216
217 fn values(&self) -> &ArrayRef {
218 self.as_ref().slots()[VALUES_SLOT]
219 .as_ref()
220 .vortex_expect("RunEndArray values slot")
221 }
222
223 fn dtype(&self) -> &DType {
224 self.values().dtype()
225 }
226
227 fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
228 Ok(self
229 .ends()
230 .as_primitive_typed()
231 .search_sorted(
232 &PValue::from(index + self.offset()),
233 SearchSortedSide::Right,
234 )?
235 .to_ends_index(self.ends().len()))
236 }
237}
238impl<T: TypedArrayRef<RunEnd>> RunEndArrayExt for T {}
239
240#[derive(Clone, Debug)]
241pub struct RunEnd;
242
243impl RunEnd {
244 pub const ID: ArrayId = ArrayId::new_ref("vortex.runend");
245
246 pub unsafe fn new_unchecked(
251 ends: ArrayRef,
252 values: ArrayRef,
253 offset: usize,
254 length: usize,
255 ) -> RunEndArray {
256 let dtype = values.dtype().clone();
257 let slots = vec![Some(ends.clone()), Some(values.clone())];
258 RunEndData::validate_parts(&ends, &values, offset, length)
259 .vortex_expect("RunEndArray validation failed");
260 let data = unsafe { RunEndData::new_unchecked(offset) };
261 unsafe {
262 Array::from_parts_unchecked(
263 ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots),
264 )
265 }
266 }
267
268 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<RunEndArray> {
270 let len = RunEndData::logical_len_from_ends(&ends)?;
271 let dtype = values.dtype().clone();
272 let slots = vec![Some(ends), Some(values)];
273 let data = RunEndData::new(0);
274 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
275 }
276
277 pub fn try_new_offset_length(
279 ends: ArrayRef,
280 values: ArrayRef,
281 offset: usize,
282 length: usize,
283 ) -> VortexResult<RunEndArray> {
284 let dtype = values.dtype().clone();
285 let slots = vec![Some(ends), Some(values)];
286 let data = RunEndData::new(offset);
287 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, length, data).with_slots(slots))
288 }
289
290 pub fn new(ends: ArrayRef, values: ArrayRef) -> RunEndArray {
292 Self::try_new(ends, values).vortex_expect("RunEndData is always valid")
293 }
294
295 pub fn encode(array: ArrayRef) -> VortexResult<RunEndArray> {
297 if let Some(parray) = array.as_opt::<Primitive>() {
298 let (ends, values) = runend_encode(parray);
299 let ends = ends.into_array();
300 let len = array.len();
301 let dtype = values.dtype().clone();
302 let slots = vec![Some(ends), Some(values)];
303 let data = unsafe { RunEndData::new_unchecked(0) };
304 Array::try_from_parts(ArrayParts::new(RunEnd, dtype, len, data).with_slots(slots))
305 } else {
306 vortex_bail!("REE can only encode primitive arrays")
307 }
308 }
309}
310
311impl RunEndData {
312 fn logical_len_from_ends(ends: &ArrayRef) -> VortexResult<usize> {
313 if ends.is_empty() {
314 Ok(0)
315 } else {
316 usize::try_from(&ends.scalar_at(ends.len() - 1)?)
317 }
318 }
319
320 pub(crate) fn validate_parts(
321 ends: &ArrayRef,
322 values: &ArrayRef,
323 offset: usize,
324 length: usize,
325 ) -> VortexResult<()> {
326 vortex_ensure!(
328 ends.dtype().is_unsigned_int(),
329 "run ends must be unsigned integers, was {}",
330 ends.dtype(),
331 );
332 vortex_ensure!(
333 ends.len() == values.len(),
334 "run ends len != run values len, {} != {}",
335 ends.len(),
336 values.len()
337 );
338
339 if ends.is_empty() {
341 vortex_ensure!(
342 offset == 0,
343 "non-zero offset provided for empty RunEndArray"
344 );
345 return Ok(());
346 }
347
348 if length == 0 {
350 return Ok(());
351 }
352
353 debug_assert!({
354 let pre_validation = ends.statistics().to_owned();
356
357 let is_sorted = ends
358 .statistics()
359 .compute_is_strict_sorted()
360 .unwrap_or(false);
361
362 ends.statistics().inherit(pre_validation.iter());
365 is_sorted
366 });
367
368 if !ends.is_host() {
370 return Ok(());
371 }
372
373 if offset != 0 && length != 0 {
375 let first_run_end = usize::try_from(&ends.scalar_at(0)?)?;
376 if first_run_end < offset {
377 vortex_bail!("First run end {first_run_end} must be >= offset {offset}");
378 }
379 }
380
381 let last_run_end = usize::try_from(&ends.scalar_at(ends.len() - 1)?)?;
382 let min_required_end = offset + length;
383 if last_run_end < min_required_end {
384 vortex_bail!("Last run end {last_run_end} must be >= offset+length {min_required_end}");
385 }
386
387 Ok(())
388 }
389}
390
391impl RunEndData {
392 pub fn new(offset: usize) -> Self {
418 Self { offset }
419 }
420
421 pub unsafe fn new_unchecked(offset: usize) -> Self {
431 Self { offset }
432 }
433
434 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
436 if let Some(parray) = array.as_opt::<Primitive>() {
437 let (_ends, _values) = runend_encode(parray);
438 unsafe { Ok(Self::new_unchecked(0)) }
440 } else {
441 vortex_bail!("REE can only encode primitive arrays")
442 }
443 }
444
445 pub fn into_parts(self, ends: ArrayRef, values: ArrayRef) -> RunEndDataParts {
446 RunEndDataParts {
447 ends,
448 values,
449 offset: self.offset,
450 }
451 }
452}
453
454impl ValidityVTable<RunEnd> for RunEnd {
455 fn validity(array: ArrayView<'_, RunEnd>) -> VortexResult<Validity> {
456 Ok(match array.values().validity()? {
457 Validity::NonNullable | Validity::AllValid => Validity::AllValid,
458 Validity::AllInvalid => Validity::AllInvalid,
459 Validity::Array(values_validity) => Validity::Array(unsafe {
460 RunEnd::new_unchecked(
461 array.ends().clone(),
462 values_validity,
463 array.offset(),
464 array.len(),
465 )
466 .into_array()
467 }),
468 })
469 }
470}
471
472pub(super) fn run_end_canonicalize(
473 array: &RunEndArray,
474 ctx: &mut ExecutionCtx,
475) -> VortexResult<ArrayRef> {
476 let pends = array.ends().clone().execute_as("ends", ctx)?;
477
478 Ok(match array.dtype() {
479 DType::Bool(_) => {
480 let bools = array.values().clone().execute_as("values", ctx)?;
481 runend_decode_bools(pends, bools, array.offset(), array.len())?
482 }
483 DType::Primitive(..) => {
484 let pvalues = array.values().clone().execute_as("values", ctx)?;
485 runend_decode_primitive(pends, pvalues, array.offset(), array.len())?.into_array()
486 }
487 DType::Utf8(_) | DType::Binary(_) => {
488 let values = array
489 .values()
490 .clone()
491 .execute_as::<VarBinViewArray>("values", ctx)?;
492 runend_decode_varbinview(pends, values, array.offset(), array.len())?.into_array()
493 }
494 _ => vortex_bail!("Unsupported RunEnd value type: {}", array.dtype()),
495 })
496}
497
498#[cfg(test)]
499mod tests {
500 use vortex_array::IntoArray;
501 use vortex_array::arrays::DictArray;
502 use vortex_array::arrays::VarBinViewArray;
503 use vortex_array::assert_arrays_eq;
504 use vortex_array::dtype::DType;
505 use vortex_array::dtype::Nullability;
506 use vortex_array::dtype::PType;
507 use vortex_buffer::buffer;
508
509 use crate::RunEnd;
510
511 #[test]
512 fn test_runend_constructor() {
513 let arr = RunEnd::new(
514 buffer![2u32, 5, 10].into_array(),
515 buffer![1i32, 2, 3].into_array(),
516 );
517 assert_eq!(arr.len(), 10);
518 assert_eq!(
519 arr.dtype(),
520 &DType::Primitive(PType::I32, Nullability::NonNullable)
521 );
522
523 let expected = buffer![1, 1, 2, 2, 2, 3, 3, 3, 3, 3].into_array();
527 assert_arrays_eq!(arr.into_array(), expected);
528 }
529
530 #[test]
531 fn test_runend_utf8() {
532 let values = VarBinViewArray::from_iter_str(["a", "b", "c"]).into_array();
533 let arr = RunEnd::new(buffer![2u32, 5, 10].into_array(), values);
534 assert_eq!(arr.len(), 10);
535 assert_eq!(arr.dtype(), &DType::Utf8(Nullability::NonNullable));
536
537 let expected =
538 VarBinViewArray::from_iter_str(["a", "a", "b", "b", "b", "c", "c", "c", "c", "c"])
539 .into_array();
540 assert_arrays_eq!(arr.into_array(), expected);
541 }
542
543 #[test]
544 fn test_runend_dict() {
545 let dict_values = VarBinViewArray::from_iter_str(["x", "y", "z"]).into_array();
546 let dict_codes = buffer![0u32, 1, 2].into_array();
547 let dict = DictArray::try_new(dict_codes, dict_values).unwrap();
548
549 let arr = RunEnd::try_new(buffer![2u32, 5, 10].into_array(), dict.into_array()).unwrap();
550 assert_eq!(arr.len(), 10);
551
552 let expected =
553 VarBinViewArray::from_iter_str(["x", "x", "y", "y", "y", "z", "z", "z", "z", "z"])
554 .into_array();
555 assert_arrays_eq!(arr.into_array(), expected);
556 }
557}