1use arrow_buffer::BooleanBufferBuilder;
2use itertools::Itertools;
3use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
4use vortex_array::compress::downscale_integer_array;
5use vortex_array::validity::Validity;
6use vortex_array::vtable::ValidityHelper;
7use vortex_array::{ArrayRef, IntoArray, ToCanonical};
8use vortex_buffer::{Buffer, BufferMut, buffer};
9use vortex_dtype::{NativePType, Nullability, match_each_integer_ptype, match_each_native_ptype};
10use vortex_error::VortexResult;
11use vortex_mask::Mask;
12use vortex_scalar::Scalar;
13
14use crate::iter::trimmed_ends_iter;
15
16pub fn runend_encode(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, ArrayRef)> {
18 let validity = match array.validity() {
19 Validity::NonNullable => None,
20 Validity::AllValid => None,
21 Validity::AllInvalid => {
22 return Ok((
24 PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
25 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
26 ));
27 }
28 Validity::Array(a) => Some(a.to_bool()?.boolean_buffer().clone()),
29 };
30
31 let (ends, values) = match validity {
32 None => {
33 match_each_native_ptype!(array.ptype(), |P| {
34 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
35 (
36 PrimitiveArray::new(ends, Validity::NonNullable),
37 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
38 )
39 })
40 }
41 Some(validity) => {
42 match_each_native_ptype!(array.ptype(), |P| {
43 let (ends, values) =
44 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
45 (
46 PrimitiveArray::new(ends, Validity::NonNullable),
47 values.into_array(),
48 )
49 })
50 }
51 };
52
53 let ends = downscale_integer_array(ends.to_array())?.to_primitive()?;
54
55 Ok((ends, values))
56}
57
58fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
59 let mut ends = BufferMut::empty();
60 let mut values = BufferMut::empty();
61
62 if elements.is_empty() {
63 return (ends.freeze(), values.freeze());
64 }
65
66 let mut prev = elements[0];
68 let mut end = 1;
69 for &e in elements.iter().skip(1) {
70 if e != prev {
71 ends.push(end);
72 values.push(prev);
73 }
74 prev = e;
75 end += 1;
76 }
77 ends.push(end);
78 values.push(prev);
79
80 (ends.freeze(), values.freeze())
81}
82
83fn runend_encode_nullable_primitive<T: NativePType>(
84 elements: &[T],
85 element_validity: BooleanBuffer,
86) -> (Buffer<u64>, PrimitiveArray) {
87 let mut ends = BufferMut::empty();
88 let mut values = BufferMut::empty();
89 let mut validity = BooleanBufferBuilder::new(values.capacity());
90
91 if elements.is_empty() {
92 return (
93 ends.freeze(),
94 PrimitiveArray::new(
95 values,
96 Validity::Array(BoolArray::from(validity.finish()).into_array()),
97 ),
98 );
99 }
100
101 let mut prev = element_validity.value(0).then(|| elements[0]);
103 let mut end = 1;
104 for e in elements
105 .iter()
106 .zip(element_validity.iter())
107 .map(|(&e, is_valid)| is_valid.then_some(e))
108 .skip(1)
109 {
110 if e != prev {
111 ends.push(end);
112 match prev {
113 None => {
114 validity.append(false);
115 values.push(T::default());
116 }
117 Some(p) => {
118 validity.append(true);
119 values.push(p);
120 }
121 }
122 }
123 prev = e;
124 end += 1;
125 }
126 ends.push(end);
127
128 match prev {
129 None => {
130 validity.append(false);
131 values.push(T::default());
132 }
133 Some(p) => {
134 validity.append(true);
135 values.push(p);
136 }
137 }
138
139 (
140 ends.freeze(),
141 PrimitiveArray::new(values, Validity::from(validity.finish())),
142 )
143}
144
145pub fn runend_decode_primitive(
146 ends: PrimitiveArray,
147 values: PrimitiveArray,
148 offset: usize,
149 length: usize,
150) -> VortexResult<PrimitiveArray> {
151 match_each_native_ptype!(values.ptype(), |P| {
152 match_each_integer_ptype!(ends.ptype(), |E| {
153 runend_decode_typed_primitive(
154 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
155 values.as_slice::<P>(),
156 values.validity_mask()?,
157 values.dtype().nullability(),
158 length,
159 )
160 })
161 })
162}
163
164pub fn runend_decode_bools(
165 ends: PrimitiveArray,
166 values: BoolArray,
167 offset: usize,
168 length: usize,
169) -> VortexResult<BoolArray> {
170 match_each_integer_ptype!(ends.ptype(), |E| {
171 runend_decode_typed_bool(
172 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
173 values.boolean_buffer().clone(),
174 values.validity_mask()?,
175 values.dtype().nullability(),
176 length,
177 )
178 })
179}
180
181pub fn runend_decode_typed_primitive<T: NativePType>(
182 run_ends: impl Iterator<Item = usize>,
183 values: &[T],
184 values_validity: Mask,
185 values_nullability: Nullability,
186 length: usize,
187) -> VortexResult<PrimitiveArray> {
188 Ok(match values_validity {
189 Mask::AllTrue(_) => {
190 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
191 for (end, value) in run_ends.zip_eq(values) {
192 assert!(end <= length, "Runend end must be less than overall length");
193 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
196 }
197 PrimitiveArray::new(decoded, values_nullability.into())
198 }
199 Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
200 Mask::Values(mask) => {
201 let mut decoded = BufferMut::with_capacity(length);
202 let mut decoded_validity = BooleanBufferBuilder::new(length);
203 for (end, value) in run_ends.zip_eq(
204 values
205 .iter()
206 .zip(mask.boolean_buffer().iter())
207 .map(|(&v, is_valid)| is_valid.then_some(v)),
208 ) {
209 assert!(end <= length, "Runend end must be less than overall length");
210 match value {
211 None => {
212 decoded_validity.append_n(end - decoded.len(), false);
213 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
216 }
217 Some(value) => {
218 decoded_validity.append_n(end - decoded.len(), true);
219 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
222 }
223 }
224 }
225 PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
226 }
227 })
228}
229
230pub fn runend_decode_typed_bool(
231 run_ends: impl Iterator<Item = usize>,
232 values: BooleanBuffer,
233 values_validity: Mask,
234 values_nullability: Nullability,
235 length: usize,
236) -> VortexResult<BoolArray> {
237 Ok(match values_validity {
238 Mask::AllTrue(_) => {
239 let mut decoded = BooleanBufferBuilder::new(length);
240 for (end, value) in run_ends.zip_eq(values.iter()) {
241 decoded.append_n(end - decoded.len(), value);
242 }
243 BoolArray::new(decoded.finish(), values_nullability.into())
244 }
245 Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
246 Mask::Values(mask) => {
247 let mut decoded = BooleanBufferBuilder::new(length);
248 let mut decoded_validity = BooleanBufferBuilder::new(length);
249 for (end, value) in run_ends.zip_eq(
250 values
251 .iter()
252 .zip(mask.boolean_buffer().iter())
253 .map(|(v, is_valid)| is_valid.then_some(v)),
254 ) {
255 match value {
256 None => {
257 decoded_validity.append_n(end - decoded.len(), false);
258 decoded.append_n(end - decoded.len(), false);
259 }
260 Some(value) => {
261 decoded_validity.append_n(end - decoded.len(), true);
262 decoded.append_n(end - decoded.len(), value);
263 }
264 }
265 }
266 BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
267 }
268 })
269}
270
271#[cfg(test)]
272mod test {
273 use arrow_buffer::BooleanBuffer;
274 use vortex_array::ToCanonical;
275 use vortex_array::arrays::PrimitiveArray;
276 use vortex_array::validity::Validity;
277 use vortex_buffer::buffer;
278
279 use crate::compress::{runend_decode_primitive, runend_encode};
280
281 #[test]
282 fn encode() {
283 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
284 let (ends, values) = runend_encode(&arr).unwrap();
285 let values = values.to_primitive().unwrap();
286
287 assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
288 assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
289 }
290
291 #[test]
292 fn encode_nullable() {
293 let arr = PrimitiveArray::new(
294 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
295 Validity::from(BooleanBuffer::from(vec![
296 true, true, false, false, true, true, true, true, false, false,
297 ])),
298 );
299 let (ends, values) = runend_encode(&arr).unwrap();
300 let values = values.to_primitive().unwrap();
301
302 assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
303 assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
304 }
305
306 #[test]
307 fn encode_all_null() {
308 let arr = PrimitiveArray::new(
309 buffer![0, 0, 0, 0, 0],
310 Validity::from(BooleanBuffer::new_unset(5)),
311 );
312 let (ends, values) = runend_encode(&arr).unwrap();
313 let values = values.to_primitive().unwrap();
314
315 assert_eq!(ends.as_slice::<u64>(), vec![5]);
316 assert_eq!(values.as_slice::<i32>(), vec![0]);
317 }
318
319 #[test]
320 fn decode() {
321 let ends = PrimitiveArray::from_iter([2, 5, 10]);
322 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
323 let decoded = runend_decode_primitive(ends, values, 0, 10).unwrap();
324
325 assert_eq!(
326 decoded.as_slice::<i32>(),
327 vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
328 );
329 }
330}