1use std::fmt::Debug;
2
3use vortex_array::arrays::PrimitiveArray;
4use vortex_array::compute::{
5 SearchSortedSide, scalar_at, search_sorted_usize, search_sorted_usize_many,
6};
7use vortex_array::stats::{ArrayStats, StatsSetRef};
8use vortex_array::variants::{BoolArrayTrait, DecimalArrayTrait, PrimitiveArrayTrait};
9use vortex_array::vtable::VTableRef;
10use vortex_array::{
11 Array, ArrayCanonicalImpl, ArrayImpl, ArrayRef, ArrayStatisticsImpl, ArrayValidityImpl,
12 ArrayVariantsImpl, Canonical, Encoding, IntoArray, ProstMetadata, ToCanonical,
13 try_from_array_ref,
14};
15use vortex_buffer::Buffer;
16use vortex_dtype::DType;
17use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
18use vortex_mask::Mask;
19
20use crate::compress::{runend_decode_bools, runend_decode_primitive, runend_encode};
21use crate::serde::RunEndMetadata;
22
23#[derive(Clone, Debug)]
24pub struct RunEndArray {
25 ends: ArrayRef,
26 values: ArrayRef,
27 offset: usize,
28 length: usize,
29 stats_set: ArrayStats,
30}
31
32try_from_array_ref!(RunEndArray);
33
34#[derive(Debug)]
35pub struct RunEndEncoding;
36impl Encoding for RunEndEncoding {
37 type Array = RunEndArray;
38 type Metadata = ProstMetadata<RunEndMetadata>;
39}
40
41impl RunEndArray {
42 pub fn try_new(ends: ArrayRef, values: ArrayRef) -> VortexResult<Self> {
43 let length = if ends.is_empty() {
44 0
45 } else {
46 scalar_at(&ends, ends.len() - 1)?.as_ref().try_into()?
47 };
48 Self::with_offset_and_length(ends, values, 0, length)
49 }
50
51 pub(crate) fn with_offset_and_length(
52 ends: ArrayRef,
53 values: ArrayRef,
54 offset: usize,
55 length: usize,
56 ) -> VortexResult<Self> {
57 if !matches!(values.dtype(), &DType::Bool(_) | &DType::Primitive(_, _)) {
58 vortex_bail!(
59 "RunEnd array can only have Bool or Primitive values, {} given",
60 values.dtype()
61 );
62 }
63
64 if offset != 0 {
65 let first_run_end: usize = scalar_at(&ends, 0)?.as_ref().try_into()?;
66 if first_run_end <= offset {
67 vortex_bail!("First run end {first_run_end} must be bigger than offset {offset}");
68 }
69 }
70
71 if !ends.dtype().is_unsigned_int() || ends.dtype().is_nullable() {
72 vortex_bail!(MismatchedTypes: "non-nullable unsigned int", ends.dtype());
73 }
74 if !ends.statistics().compute_is_strict_sorted().unwrap_or(true) {
75 vortex_bail!("Ends array must be strictly sorted");
76 }
77
78 Ok(Self {
79 ends,
80 values,
81 offset,
82 length,
83 stats_set: Default::default(),
84 })
85 }
86
87 pub fn find_physical_index(&self, index: usize) -> VortexResult<usize> {
89 search_sorted_usize(self.ends(), index + self.offset(), SearchSortedSide::Right)
90 .map(|s| s.to_ends_index(self.ends().len()))
91 }
92
93 pub fn find_physical_indices(&self, indices: &[usize]) -> VortexResult<Buffer<u64>> {
98 search_sorted_usize_many(self.ends(), indices, SearchSortedSide::Right).map(|results| {
99 results
100 .into_iter()
101 .map(|result| result.to_ends_index(self.ends().len()) as u64)
102 .collect()
103 })
104 }
105
106 pub fn encode(array: ArrayRef) -> VortexResult<Self> {
108 if let Ok(parray) = PrimitiveArray::try_from(array) {
109 let (ends, values) = runend_encode(&parray)?;
110 Self::try_new(ends.into_array(), values)
111 } else {
112 vortex_bail!("REE can only encode primitive arrays")
113 }
114 }
115
116 #[inline]
120 pub fn offset(&self) -> usize {
121 self.offset
122 }
123
124 #[inline]
129 pub fn ends(&self) -> &ArrayRef {
130 &self.ends
131 }
132
133 #[inline]
138 pub fn values(&self) -> &ArrayRef {
139 &self.values
140 }
141}
142
143impl ArrayImpl for RunEndArray {
144 type Encoding = RunEndEncoding;
145
146 fn _len(&self) -> usize {
147 self.length
148 }
149
150 fn _dtype(&self) -> &DType {
151 self.values.dtype()
152 }
153
154 fn _vtable(&self) -> VTableRef {
155 VTableRef::new_ref(&RunEndEncoding)
156 }
157
158 fn _with_children(&self, children: &[ArrayRef]) -> VortexResult<Self> {
159 let ends = children[0].clone();
160 let values = children[1].clone();
161
162 Self::try_new(ends, values)
163 }
164}
165
166impl ArrayVariantsImpl for RunEndArray {
167 fn _as_bool_typed(&self) -> Option<&dyn BoolArrayTrait> {
168 Some(self)
169 }
170
171 fn _as_primitive_typed(&self) -> Option<&dyn PrimitiveArrayTrait> {
172 Some(self)
173 }
174
175 fn _as_decimal_typed(&self) -> Option<&dyn DecimalArrayTrait> {
176 Some(self)
177 }
178}
179
180impl PrimitiveArrayTrait for RunEndArray {}
181
182impl BoolArrayTrait for RunEndArray {}
183
184impl DecimalArrayTrait for RunEndArray {}
185
186impl ArrayValidityImpl for RunEndArray {
187 fn _is_valid(&self, index: usize) -> VortexResult<bool> {
188 let physical_idx = self
189 .find_physical_index(index)
190 .vortex_expect("Invalid index");
191 self.values().is_valid(physical_idx)
192 }
193
194 fn _all_valid(&self) -> VortexResult<bool> {
195 self.values().all_valid()
196 }
197
198 fn _all_invalid(&self) -> VortexResult<bool> {
199 self.values().all_invalid()
200 }
201
202 fn _validity_mask(&self) -> VortexResult<Mask> {
203 Ok(match self.values().validity_mask()? {
204 Mask::AllTrue(_) => Mask::AllTrue(self.len()),
205 Mask::AllFalse(_) => Mask::AllFalse(self.len()),
206 Mask::Values(values) => {
207 let ree_validity = RunEndArray::with_offset_and_length(
208 self.ends().clone(),
209 values.into_array(),
210 self.offset(),
211 self.len(),
212 )
213 .vortex_expect("invalid array")
214 .into_array();
215 Mask::from_buffer(ree_validity.to_bool()?.boolean_buffer().clone())
216 }
217 })
218 }
219}
220
221impl ArrayCanonicalImpl for RunEndArray {
222 fn _to_canonical(&self) -> VortexResult<Canonical> {
223 let pends = self.ends().to_primitive()?;
224 match self.dtype() {
225 DType::Bool(_) => {
226 let bools = self.values().to_bool()?;
227 runend_decode_bools(pends, bools, self.offset(), self.len()).map(Canonical::Bool)
228 }
229 DType::Primitive(..) => {
230 let pvalues = self.values().to_primitive()?;
231 runend_decode_primitive(pends, pvalues, self.offset(), self.len())
232 .map(Canonical::Primitive)
233 }
234 _ => vortex_bail!("Only Primitive and Bool values are supported"),
235 }
236 }
237}
238
239impl ArrayStatisticsImpl for RunEndArray {
240 fn _stats_ref(&self) -> StatsSetRef<'_> {
241 self.stats_set.to_ref(self)
242 }
243}
244
245#[cfg(test)]
246mod tests {
247 use vortex_array::compute::scalar_at;
248 use vortex_array::{Array, IntoArray};
249 use vortex_buffer::buffer;
250 use vortex_dtype::{DType, Nullability, PType};
251
252 use crate::RunEndArray;
253
254 #[test]
255 fn test_runend_constructor() {
256 let arr = RunEndArray::try_new(
257 buffer![2u32, 5, 10].into_array(),
258 buffer![1i32, 2, 3].into_array(),
259 )
260 .unwrap();
261 assert_eq!(arr.len(), 10);
262 assert_eq!(
263 arr.dtype(),
264 &DType::Primitive(PType::I32, Nullability::NonNullable)
265 );
266
267 assert_eq!(scalar_at(&arr, 0).unwrap(), 1.into());
271 assert_eq!(scalar_at(&arr, 2).unwrap(), 2.into());
272 assert_eq!(scalar_at(&arr, 5).unwrap(), 3.into());
273 assert_eq!(scalar_at(&arr, 9).unwrap(), 3.into());
274 }
275}