1use std::fmt::Debug;
2
3use vortex_array::arrays::PrimitiveArray;
4use vortex_array::compute::{
5 SearchSortedSide, scalar_at, search_sorted_usize, search_sorted_usize_many,
6};
7use vortex_array::stats::{ArrayStats, StatsSetRef};
8use vortex_array::variants::{BoolArrayTrait, PrimitiveArrayTrait};
9use vortex_array::vtable::{EncodingVTable, VTableRef};
10use vortex_array::{
11 Array, ArrayCanonicalImpl, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl,
12 ArrayVariantsImpl, Canonical, Encoding, EncodingId, IntoArray, SerdeMetadata, ToCanonical,
13 try_from_array_ref,
14};
15use vortex_buffer::Buffer;
16use vortex_dtype::DType;
17use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
18use vortex_mask::Mask;
19
20use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
21use crate::serde::RunEndMetadata;
22
23#[derive(Clone, Debug)]
24pub struct RunEndArray {
25 ends: ArrayRef,
26 values: ArrayRef,
27 offset: usize,
28 length: usize,
29 stats_set: ArrayStats,
30}
31
32try_from_array_ref!(RunEndArray);
33
34pub struct RunEndEncoding;
35impl Encoding for RunEndEncoding {
36 type Array = RunEndArray;
37 type Metadata = SerdeMetadata<RunEndMetadata>;
38}
39
40impl EncodingVTable for RunEndEncoding {
41 fn id(&self) -> EncodingId {
42 EncodingId::new_ref("vortex.runend")
43 }
44}
45
46impl RunEndArray {
47 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
48 let length = if ends.is_empty() {
49 0
50 } else {
51 scalar_at(&ends, ends.len() - 1)?.as_ref().try_into()?
52 };
53 Self::with_offset_and_length(ends, values, 0, length)
54 }
55
56 pub(crate) fn with_offset_and_length(
57 ends: ArrayRef,
58 values: ArrayRef,
59 offset: usize,
60 length: usize,
61 ) -> VortexResult<Self> {
62 if !matches!(values.dtype(), &DType::Bool(_) | &DType::Primitive(_, _)) {
63 vortex_bail!(
64 "RunEnd array can only have Bool or Primitive values, {} given",
65 values.dtype()
66 );
67 }
68
69 if offset != 0 {
70 let first_run_end: usize = scalar_at(&ends, 0)?.as_ref().try_into()?;
71 if first_run_end <= offset {
72 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
73 }
74 }
75
76 if !ends.dtype().is_unsigned_int() || ends.dtype().is_nullable() {
77 vortex_bail!(MismatchedTypes: "non-nullable unsigned int", ends.dtype());
78 }
79 if !ends.statistics().compute_is_strict_sorted().unwrap_or(true) {
80 vortex_bail!("Ends array must be strictly sorted");
81 }
82
83 Ok(Self {
84 ends,
85 values,
86 offset,
87 length,
88 stats_set: Default::default(),
89 })
90 }
91
92 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
94 search_sorted_usize(self.ends(), index + self.offset(), SearchSortedSide::Right)
95 .map(|s| s.to_ends_index(self.ends().len()))
96 }
97
98 pub fn find_physical_indices(&self, indices: &[usize]) -> VortexResult<Buffer<u64>> {
103 search_sorted_usize_many(self.ends(), indices, SearchSortedSide::Right).map(|results| {
104 results
105 .into_iter()
106 .map(|result| result.to_ends_index(self.ends().len()) as u64)
107 .collect()
108 })
109 }
110
111 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
113 if let Ok(parray) = PrimitiveArray::try_from(array) {
114 let (ends, values) = runend_encode(&parray)?;
115 Self::try_new(ends.into_array(), values)
116 } else {
117 vortex_bail!("REE can only encode primitive arrays")
118 }
119 }
120
121 #[inline]
125 pub fn offset(&self) -> usize {
126 self.offset
127 }
128
129 #[inline]
134 pub fn ends(&self) -> &ArrayRef {
135 &self.ends
136 }
137
138 #[inline]
143 pub fn values(&self) -> &ArrayRef {
144 &self.values
145 }
146}
147
148impl ArrayImpl for RunEndArray {
149 type Encoding = RunEndEncoding;
150
151 fn _len(&self) -> usize {
152 self.length
153 }
154
155 fn _dtype(&self) -> &DType {
156 self.values.dtype()
157 }
158
159 fn _vtable(&self) -> VTableRef {
160 VTableRef::new_ref(&RunEndEncoding)
161 }
162}
163
164impl ArrayVariantsImpl for RunEndArray {
165 fn _as_bool_typed(&self) -> Option<&dyn BoolArrayTrait> {
166 Some(self)
167 }
168
169 fn _as_primitive_typed(&self) -> Option<&dyn PrimitiveArrayTrait> {
170 Some(self)
171 }
172}
173
174impl PrimitiveArrayTrait for RunEndArray {}
175
176impl BoolArrayTrait for RunEndArray {}
177
178impl ArrayValidityImpl for RunEndArray {
179 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
180 let physical_idx = self
181 .find_physical_index(index)
182 .vortex_expect("Invalid index");
183 self.values().is_valid(physical_idx)
184 }
185
186 fn _all_valid(&self) -> VortexResult<bool> {
187 self.values().all_valid()
188 }
189
190 fn _all_invalid(&self) -> VortexResult<bool> {
191 self.values().all_invalid()
192 }
193
194 fn _validity_mask(&self) -> VortexResult<Mask> {
195 Ok(match self.values().validity_mask()? {
196 Mask::AllTrue(_) => Mask::AllTrue(self.len()),
197 Mask::AllFalse(_) => Mask::AllFalse(self.len()),
198 Mask::Values(values) => {
199 let ree_validity = RunEndArray::with_offset_and_length(
200 self.ends().clone(),
201 values.into_array(),
202 self.offset(),
203 self.len(),
204 )
205 .vortex_expect("invalid array")
206 .into_array();
207 Mask::from_buffer(ree_validity.to_bool()?.boolean_buffer().clone())
208 }
209 })
210 }
211}
212
213impl ArrayCanonicalImpl for RunEndArray {
214 fn _to_canonical(&self) -> VortexResult<Canonical> {
215 let pends = self.ends().to_primitive()?;
216 match self.dtype() {
217 DType::Bool(_) => {
218 let bools = self.values().to_bool()?;
219 runend_decode_bools(pends, bools, self.offset(), self.len()).map(Canonical::Bool)
220 }
221 DType::Primitive(..) => {
222 let pvalues = self.values().to_primitive()?;
223 runend_decode_primitive(pends, pvalues, self.offset(), self.len())
224 .map(Canonical::Primitive)
225 }
226 _ => vortex_bail!("Only Primitive and Bool values are supported"),
227 }
228 }
229}
230
231impl ArrayStatisticsImpl for RunEndArray {
232 fn _stats_ref(&self) -> StatsSetRef<'_> {
233 self.stats_set.to_ref(self)
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use vortex_array::compute::scalar_at;
240 use vortex_array::{Array, IntoArray};
241 use vortex_buffer::buffer;
242 use vortex_dtype::{DType, Nullability, PType};
243
244 use crate::RunEndArray;
245
246 #[test]
247 fn test_runend_constructor() {
248 let arr = RunEndArray::try_new(
249 buffer![2u32, 5, 10].into_array(),
250 buffer![1i32, 2, 3].into_array(),
251 )
252 .unwrap();
253 assert_eq!(arr.len(), 10);
254 assert_eq!(
255 arr.dtype(),
256 &DType::Primitive(PType::I32, Nullability::NonNullable)
257 );
258
259 assert_eq!(scalar_at(&arr, 0).unwrap(), 1.into());
263 assert_eq!(scalar_at(&arr, 2).unwrap(), 2.into());
264 assert_eq!(scalar_at(&arr, 5).unwrap(), 3.into());
265 assert_eq!(scalar_at(&arr, 9).unwrap(), 3.into());
266 }
267}