1use std::fmt::Debug;
2
3use vortex_array::arrays::PrimitiveVTable;
4use vortex_array::search_sorted::{SearchSorted, SearchSortedSide};
5use vortex_array::stats::{ArrayStats, StatsSetRef};
6use vortex_array::vtable::{ArrayVTable, CanonicalVTable, NotSupported, VTable, ValidityVTable};
7use vortex_array::{
8 Array, ArrayRef, Canonical, EncodingId, EncodingRef, IntoArray, ToCanonical, vtable,
9};
10use vortex_dtype::DType;
11use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
12use vortex_mask::Mask;
13use vortex_scalar::PValue;
14
15use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
16
17vtable!(RunEnd);
18
19impl VTable for RunEndVTable {
20 type Array = RunEndArray;
21 type Encoding = RunEndEncoding;
22
23 type ArrayVTable = Self;
24 type CanonicalVTable = Self;
25 type OperationsVTable = Self;
26 type ValidityVTable = Self;
27 type VisitorVTable = Self;
28 type ComputeVTable = NotSupported;
29 type EncodeVTable = Self;
30 type SerdeVTable = Self;
31
32 fn id(_encoding: &Self::Encoding) -> EncodingId {
33 EncodingId::new_ref("vortex.runend")
34 }
35
36 fn encoding(_array: &Self::Array) -> EncodingRef {
37 EncodingRef::new_ref(RunEndEncoding.as_ref())
38 }
39}
40
41#[derive(Clone, Debug)]
42pub struct RunEndArray {
43 ends: ArrayRef,
44 values: ArrayRef,
45 offset: usize,
46 length: usize,
47 stats_set: ArrayStats,
48}
49
50#[derive(Clone, Debug)]
51pub struct RunEndEncoding;
52
53impl RunEndArray {
54 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
55 let length = if ends.is_empty() {
56 0
57 } else {
58 ends.scalar_at(ends.len() - 1)?.as_ref().try_into()?
59 };
60 Self::with_offset_and_length(ends, values, 0, length)
61 }
62
63 pub(crate) fn with_offset_and_length(
64 ends: ArrayRef,
65 values: ArrayRef,
66 offset: usize,
67 length: usize,
68 ) -> VortexResult<Self> {
69 if !matches!(values.dtype(), &DType::Bool(_) | &DType::Primitive(_, _)) {
70 vortex_bail!(
71 "RunEnd array can only have Bool or Primitive values, {} given",
72 values.dtype()
73 );
74 }
75
76 if offset != 0 {
77 let first_run_end: usize = ends.scalar_at(0)?.as_ref().try_into()?;
78 if first_run_end <= offset {
79 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
80 }
81 }
82
83 if !ends.dtype().is_unsigned_int() || ends.dtype().is_nullable() {
84 vortex_bail!(MismatchedTypes: "non-nullable unsigned int", ends.dtype());
85 }
86 if !ends.statistics().compute_is_strict_sorted().unwrap_or(true) {
87 vortex_bail!("Ends array must be strictly sorted");
88 }
89
90 Ok(Self {
91 ends,
92 values,
93 offset,
94 length,
95 stats_set: Default::default(),
96 })
97 }
98
99 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
101 Ok(self
102 .ends()
103 .as_primitive_typed()
104 .search_sorted(
105 &PValue::from(index + self.offset()),
106 SearchSortedSide::Right,
107 )
108 .to_ends_index(self.ends().len()))
109 }
110
111 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
113 if let Some(parray) = array.as_opt::<PrimitiveVTable>() {
114 let (ends, values) = runend_encode(parray)?;
115 Self::try_new(ends.into_array(), values)
116 } else {
117 vortex_bail!("REE can only encode primitive arrays")
118 }
119 }
120
121 #[inline]
125 pub fn offset(&self) -> usize {
126 self.offset
127 }
128
129 #[inline]
134 pub fn ends(&self) -> &ArrayRef {
135 &self.ends
136 }
137
138 #[inline]
143 pub fn values(&self) -> &ArrayRef {
144 &self.values
145 }
146}
147
148impl ArrayVTable<RunEndVTable> for RunEndVTable {
149 fn len(array: &RunEndArray) -> usize {
150 array.length
151 }
152
153 fn dtype(array: &RunEndArray) -> &DType {
154 array.values.dtype()
155 }
156
157 fn stats(array: &RunEndArray) -> StatsSetRef<'_> {
158 array.stats_set.to_ref(array.as_ref())
159 }
160}
161
162impl ValidityVTable<RunEndVTable> for RunEndVTable {
163 fn is_valid(array: &RunEndArray, index: usize) -> VortexResult<bool> {
164 let physical_idx = array
165 .find_physical_index(index)
166 .vortex_expect("Invalid index");
167 array.values().is_valid(physical_idx)
168 }
169
170 fn all_valid(array: &RunEndArray) -> VortexResult<bool> {
171 array.values().all_valid()
172 }
173
174 fn all_invalid(array: &RunEndArray) -> VortexResult<bool> {
175 array.values().all_invalid()
176 }
177
178 fn validity_mask(array: &RunEndArray) -> VortexResult<Mask> {
179 Ok(match array.values().validity_mask()? {
180 Mask::AllTrue(_) => Mask::AllTrue(array.len()),
181 Mask::AllFalse(_) => Mask::AllFalse(array.len()),
182 Mask::Values(values) => {
183 let ree_validity = RunEndArray::with_offset_and_length(
184 array.ends().clone(),
185 values.into_array(),
186 array.offset(),
187 array.len(),
188 )
189 .vortex_expect("invalid array")
190 .into_array();
191 Mask::from_buffer(ree_validity.to_bool()?.boolean_buffer().clone())
192 }
193 })
194 }
195}
196
197impl CanonicalVTable<RunEndVTable> for RunEndVTable {
198 fn canonicalize(array: &RunEndArray) -> VortexResult<Canonical> {
199 let pends = array.ends().to_primitive()?;
200 match array.dtype() {
201 DType::Bool(_) => {
202 let bools = array.values().to_bool()?;
203 runend_decode_bools(pends, bools, array.offset(), array.len()).map(Canonical::Bool)
204 }
205 DType::Primitive(..) => {
206 let pvalues = array.values().to_primitive()?;
207 runend_decode_primitive(pends, pvalues, array.offset(), array.len())
208 .map(Canonical::Primitive)
209 }
210 _ => vortex_bail!("Only Primitive and Bool values are supported"),
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use vortex_array::IntoArray;
218 use vortex_buffer::buffer;
219 use vortex_dtype::{DType, Nullability, PType};
220
221 use crate::RunEndArray;
222
223 #[test]
224 fn test_runend_constructor() {
225 let arr = RunEndArray::try_new(
226 buffer![2u32, 5, 10].into_array(),
227 buffer![1i32, 2, 3].into_array(),
228 )
229 .unwrap();
230 assert_eq!(arr.len(), 10);
231 assert_eq!(
232 arr.dtype(),
233 &DType::Primitive(PType::I32, Nullability::NonNullable)
234 );
235
236 assert_eq!(arr.scalar_at(0).unwrap(), 1.into());
240 assert_eq!(arr.scalar_at(2).unwrap(), 2.into());
241 assert_eq!(arr.scalar_at(5).unwrap(), 3.into());
242 assert_eq!(arr.scalar_at(9).unwrap(), 3.into());
243 }
244}