1use arrow_buffer::BooleanBufferBuilder;
5use itertools::Itertools;
6use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
7use vortex_array::validity::Validity;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, buffer};
11use vortex_dtype::{
12 NativePType, Nullability, match_each_native_ptype, match_each_unsigned_integer_ptype,
13};
14use vortex_error::VortexExpect;
15use vortex_mask::Mask;
16use vortex_scalar::Scalar;
17
18use crate::iter::trimmed_ends_iter;
19
20pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
22 let validity = match array.validity() {
23 Validity::NonNullable => None,
24 Validity::AllValid => None,
25 Validity::AllInvalid => {
26 return (
28 PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
29 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
30 );
31 }
32 Validity::Array(a) => Some(a.to_bool().boolean_buffer().clone()),
33 };
34
35 let (ends, values) = match validity {
36 None => {
37 match_each_native_ptype!(array.ptype(), |P| {
38 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
39 (
40 PrimitiveArray::new(ends, Validity::NonNullable),
41 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
42 )
43 })
44 }
45 Some(validity) => {
46 match_each_native_ptype!(array.ptype(), |P| {
47 let (ends, values) =
48 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
49 (
50 PrimitiveArray::new(ends, Validity::NonNullable),
51 values.into_array(),
52 )
53 })
54 }
55 };
56
57 let ends = ends
58 .downcast()
59 .vortex_expect("Ends must succeed downcasting")
60 .to_primitive();
61
62 (ends, values)
63}
64
65fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
66 let mut ends = BufferMut::empty();
67 let mut values = BufferMut::empty();
68
69 if elements.is_empty() {
70 return (ends.freeze(), values.freeze());
71 }
72
73 let mut prev = elements[0];
75 let mut end = 1;
76 for &e in elements.iter().skip(1) {
77 if e != prev {
78 ends.push(end);
79 values.push(prev);
80 }
81 prev = e;
82 end += 1;
83 }
84 ends.push(end);
85 values.push(prev);
86
87 (ends.freeze(), values.freeze())
88}
89
90fn runend_encode_nullable_primitive<T: NativePType>(
91 elements: &[T],
92 element_validity: BooleanBuffer,
93) -> (Buffer<u64>, PrimitiveArray) {
94 let mut ends = BufferMut::empty();
95 let mut values = BufferMut::empty();
96 let mut validity = BooleanBufferBuilder::new(values.capacity());
97
98 if elements.is_empty() {
99 return (
100 ends.freeze(),
101 PrimitiveArray::new(
102 values,
103 Validity::Array(BoolArray::from(validity.finish()).into_array()),
104 ),
105 );
106 }
107
108 let mut prev = element_validity.value(0).then(|| elements[0]);
110 let mut end = 1;
111 for e in elements
112 .iter()
113 .zip(element_validity.iter())
114 .map(|(&e, is_valid)| is_valid.then_some(e))
115 .skip(1)
116 {
117 if e != prev {
118 ends.push(end);
119 match prev {
120 None => {
121 validity.append(false);
122 values.push(T::default());
123 }
124 Some(p) => {
125 validity.append(true);
126 values.push(p);
127 }
128 }
129 }
130 prev = e;
131 end += 1;
132 }
133 ends.push(end);
134
135 match prev {
136 None => {
137 validity.append(false);
138 values.push(T::default());
139 }
140 Some(p) => {
141 validity.append(true);
142 values.push(p);
143 }
144 }
145
146 (
147 ends.freeze(),
148 PrimitiveArray::new(values, Validity::from(validity.finish())),
149 )
150}
151
152pub fn runend_decode_primitive(
153 ends: PrimitiveArray,
154 values: PrimitiveArray,
155 offset: usize,
156 length: usize,
157) -> PrimitiveArray {
158 match_each_native_ptype!(values.ptype(), |P| {
159 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
160 runend_decode_typed_primitive(
161 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
162 values.as_slice::<P>(),
163 values.validity_mask(),
164 values.dtype().nullability(),
165 length,
166 )
167 })
168 })
169}
170
171pub fn runend_decode_bools(
172 ends: PrimitiveArray,
173 values: BoolArray,
174 offset: usize,
175 length: usize,
176) -> BoolArray {
177 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
178 runend_decode_typed_bool(
179 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
180 values.boolean_buffer().clone(),
181 values.validity_mask(),
182 values.dtype().nullability(),
183 length,
184 )
185 })
186}
187
188pub fn runend_decode_typed_primitive<T: NativePType>(
189 run_ends: impl Iterator<Item = usize>,
190 values: &[T],
191 values_validity: Mask,
192 values_nullability: Nullability,
193 length: usize,
194) -> PrimitiveArray {
195 match values_validity {
196 Mask::AllTrue(_) => {
197 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
198 for (end, value) in run_ends.zip_eq(values) {
199 assert!(end <= length, "Runend end must be less than overall length");
200 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
203 }
204 PrimitiveArray::new(decoded, values_nullability.into())
205 }
206 Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
207 Mask::Values(mask) => {
208 let mut decoded = BufferMut::with_capacity(length);
209 let mut decoded_validity = BooleanBufferBuilder::new(length);
210 for (end, value) in run_ends.zip_eq(
211 values
212 .iter()
213 .zip(mask.boolean_buffer().iter())
214 .map(|(&v, is_valid)| is_valid.then_some(v)),
215 ) {
216 assert!(end <= length, "Runend end must be less than overall length");
217 match value {
218 None => {
219 decoded_validity.append_n(end - decoded.len(), false);
220 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
223 }
224 Some(value) => {
225 decoded_validity.append_n(end - decoded.len(), true);
226 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
229 }
230 }
231 }
232 PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
233 }
234 }
235}
236
237pub fn runend_decode_typed_bool(
238 run_ends: impl Iterator<Item = usize>,
239 values: BooleanBuffer,
240 values_validity: Mask,
241 values_nullability: Nullability,
242 length: usize,
243) -> BoolArray {
244 match values_validity {
245 Mask::AllTrue(_) => {
246 let mut decoded = BooleanBufferBuilder::new(length);
247 for (end, value) in run_ends.zip_eq(values.iter()) {
248 decoded.append_n(end - decoded.len(), value);
249 }
250 BoolArray::new(decoded.finish(), values_nullability.into())
251 }
252 Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
253 Mask::Values(mask) => {
254 let mut decoded = BooleanBufferBuilder::new(length);
255 let mut decoded_validity = BooleanBufferBuilder::new(length);
256 for (end, value) in run_ends.zip_eq(
257 values
258 .iter()
259 .zip(mask.boolean_buffer().iter())
260 .map(|(v, is_valid)| is_valid.then_some(v)),
261 ) {
262 match value {
263 None => {
264 decoded_validity.append_n(end - decoded.len(), false);
265 decoded.append_n(end - decoded.len(), false);
266 }
267 Some(value) => {
268 decoded_validity.append_n(end - decoded.len(), true);
269 decoded.append_n(end - decoded.len(), value);
270 }
271 }
272 }
273 BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
274 }
275 }
276}
277
278#[cfg(test)]
279mod test {
280 use arrow_buffer::BooleanBuffer;
281 use vortex_array::ToCanonical;
282 use vortex_array::arrays::PrimitiveArray;
283 use vortex_array::validity::Validity;
284 use vortex_buffer::buffer;
285
286 use crate::compress::{runend_decode_primitive, runend_encode};
287
288 #[test]
289 fn encode() {
290 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
291 let (ends, values) = runend_encode(&arr);
292 let values = values.to_primitive();
293
294 assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
295 assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
296 }
297
298 #[test]
299 fn encode_nullable() {
300 let arr = PrimitiveArray::new(
301 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
302 Validity::from(BooleanBuffer::from(vec![
303 true, true, false, false, true, true, true, true, false, false,
304 ])),
305 );
306 let (ends, values) = runend_encode(&arr);
307 let values = values.to_primitive();
308
309 assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
310 assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
311 }
312
313 #[test]
314 fn encode_all_null() {
315 let arr = PrimitiveArray::new(
316 buffer![0, 0, 0, 0, 0],
317 Validity::from(BooleanBuffer::new_unset(5)),
318 );
319 let (ends, values) = runend_encode(&arr);
320 let values = values.to_primitive();
321
322 assert_eq!(ends.as_slice::<u64>(), vec![5]);
323 assert_eq!(values.as_slice::<i32>(), vec![0]);
324 }
325
326 #[test]
327 fn decode() {
328 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
329 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
330 let decoded = runend_decode_primitive(ends, values, 0, 10);
331
332 assert_eq!(
333 decoded.as_slice::<i32>(),
334 vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
335 );
336 }
337}