1use arrow_buffer::BooleanBufferBuilder;
5use itertools::Itertools;
6use vortex_array::arrays::{BoolArray, BooleanBuffer, ConstantArray, PrimitiveArray};
7use vortex_array::validity::Validity;
8use vortex_array::vtable::ValidityHelper;
9use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10use vortex_buffer::{Buffer, BufferMut, buffer};
11use vortex_dtype::{
12 NativePType, Nullability, match_each_native_ptype, match_each_unsigned_integer_ptype,
13};
14use vortex_error::VortexResult;
15use vortex_mask::Mask;
16use vortex_scalar::Scalar;
17
18use crate::iter::trimmed_ends_iter;
19
20pub fn runend_encode(array: &PrimitiveArray) -> VortexResult<(PrimitiveArray, ArrayRef)> {
22 let validity = match array.validity() {
23 Validity::NonNullable => None,
24 Validity::AllValid => None,
25 Validity::AllInvalid => {
26 return Ok((
28 PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
29 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
30 ));
31 }
32 Validity::Array(a) => Some(a.to_bool()?.boolean_buffer().clone()),
33 };
34
35 let (ends, values) = match validity {
36 None => {
37 match_each_native_ptype!(array.ptype(), |P| {
38 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
39 (
40 PrimitiveArray::new(ends, Validity::NonNullable),
41 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
42 )
43 })
44 }
45 Some(validity) => {
46 match_each_native_ptype!(array.ptype(), |P| {
47 let (ends, values) =
48 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
49 (
50 PrimitiveArray::new(ends, Validity::NonNullable),
51 values.into_array(),
52 )
53 })
54 }
55 };
56
57 let ends = ends.downcast()?.to_primitive()?;
58
59 Ok((ends, values))
60}
61
62fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
63 let mut ends = BufferMut::empty();
64 let mut values = BufferMut::empty();
65
66 if elements.is_empty() {
67 return (ends.freeze(), values.freeze());
68 }
69
70 let mut prev = elements[0];
72 let mut end = 1;
73 for &e in elements.iter().skip(1) {
74 if e != prev {
75 ends.push(end);
76 values.push(prev);
77 }
78 prev = e;
79 end += 1;
80 }
81 ends.push(end);
82 values.push(prev);
83
84 (ends.freeze(), values.freeze())
85}
86
87fn runend_encode_nullable_primitive<T: NativePType>(
88 elements: &[T],
89 element_validity: BooleanBuffer,
90) -> (Buffer<u64>, PrimitiveArray) {
91 let mut ends = BufferMut::empty();
92 let mut values = BufferMut::empty();
93 let mut validity = BooleanBufferBuilder::new(values.capacity());
94
95 if elements.is_empty() {
96 return (
97 ends.freeze(),
98 PrimitiveArray::new(
99 values,
100 Validity::Array(BoolArray::from(validity.finish()).into_array()),
101 ),
102 );
103 }
104
105 let mut prev = element_validity.value(0).then(|| elements[0]);
107 let mut end = 1;
108 for e in elements
109 .iter()
110 .zip(element_validity.iter())
111 .map(|(&e, is_valid)| is_valid.then_some(e))
112 .skip(1)
113 {
114 if e != prev {
115 ends.push(end);
116 match prev {
117 None => {
118 validity.append(false);
119 values.push(T::default());
120 }
121 Some(p) => {
122 validity.append(true);
123 values.push(p);
124 }
125 }
126 }
127 prev = e;
128 end += 1;
129 }
130 ends.push(end);
131
132 match prev {
133 None => {
134 validity.append(false);
135 values.push(T::default());
136 }
137 Some(p) => {
138 validity.append(true);
139 values.push(p);
140 }
141 }
142
143 (
144 ends.freeze(),
145 PrimitiveArray::new(values, Validity::from(validity.finish())),
146 )
147}
148
149pub fn runend_decode_primitive(
150 ends: PrimitiveArray,
151 values: PrimitiveArray,
152 offset: usize,
153 length: usize,
154) -> VortexResult<PrimitiveArray> {
155 match_each_native_ptype!(values.ptype(), |P| {
156 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
157 runend_decode_typed_primitive(
158 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
159 values.as_slice::<P>(),
160 values.validity_mask()?,
161 values.dtype().nullability(),
162 length,
163 )
164 })
165 })
166}
167
168pub fn runend_decode_bools(
169 ends: PrimitiveArray,
170 values: BoolArray,
171 offset: usize,
172 length: usize,
173) -> VortexResult<BoolArray> {
174 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
175 runend_decode_typed_bool(
176 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
177 values.boolean_buffer().clone(),
178 values.validity_mask()?,
179 values.dtype().nullability(),
180 length,
181 )
182 })
183}
184
185pub fn runend_decode_typed_primitive<T: NativePType>(
186 run_ends: impl Iterator<Item = usize>,
187 values: &[T],
188 values_validity: Mask,
189 values_nullability: Nullability,
190 length: usize,
191) -> VortexResult<PrimitiveArray> {
192 Ok(match values_validity {
193 Mask::AllTrue(_) => {
194 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
195 for (end, value) in run_ends.zip_eq(values) {
196 assert!(end <= length, "Runend end must be less than overall length");
197 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
200 }
201 PrimitiveArray::new(decoded, values_nullability.into())
202 }
203 Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
204 Mask::Values(mask) => {
205 let mut decoded = BufferMut::with_capacity(length);
206 let mut decoded_validity = BooleanBufferBuilder::new(length);
207 for (end, value) in run_ends.zip_eq(
208 values
209 .iter()
210 .zip(mask.boolean_buffer().iter())
211 .map(|(&v, is_valid)| is_valid.then_some(v)),
212 ) {
213 assert!(end <= length, "Runend end must be less than overall length");
214 match value {
215 None => {
216 decoded_validity.append_n(end - decoded.len(), false);
217 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
220 }
221 Some(value) => {
222 decoded_validity.append_n(end - decoded.len(), true);
223 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
226 }
227 }
228 }
229 PrimitiveArray::new(decoded, Validity::from(decoded_validity.finish()))
230 }
231 })
232}
233
234pub fn runend_decode_typed_bool(
235 run_ends: impl Iterator<Item = usize>,
236 values: BooleanBuffer,
237 values_validity: Mask,
238 values_nullability: Nullability,
239 length: usize,
240) -> VortexResult<BoolArray> {
241 Ok(match values_validity {
242 Mask::AllTrue(_) => {
243 let mut decoded = BooleanBufferBuilder::new(length);
244 for (end, value) in run_ends.zip_eq(values.iter()) {
245 decoded.append_n(end - decoded.len(), value);
246 }
247 BoolArray::new(decoded.finish(), values_nullability.into())
248 }
249 Mask::AllFalse(_) => BoolArray::new(BooleanBuffer::new_unset(length), Validity::AllInvalid),
250 Mask::Values(mask) => {
251 let mut decoded = BooleanBufferBuilder::new(length);
252 let mut decoded_validity = BooleanBufferBuilder::new(length);
253 for (end, value) in run_ends.zip_eq(
254 values
255 .iter()
256 .zip(mask.boolean_buffer().iter())
257 .map(|(v, is_valid)| is_valid.then_some(v)),
258 ) {
259 match value {
260 None => {
261 decoded_validity.append_n(end - decoded.len(), false);
262 decoded.append_n(end - decoded.len(), false);
263 }
264 Some(value) => {
265 decoded_validity.append_n(end - decoded.len(), true);
266 decoded.append_n(end - decoded.len(), value);
267 }
268 }
269 }
270 BoolArray::new(decoded.finish(), Validity::from(decoded_validity.finish()))
271 }
272 })
273}
274
275#[cfg(test)]
276mod test {
277 use arrow_buffer::BooleanBuffer;
278 use vortex_array::ToCanonical;
279 use vortex_array::arrays::PrimitiveArray;
280 use vortex_array::validity::Validity;
281 use vortex_buffer::buffer;
282
283 use crate::compress::{runend_decode_primitive, runend_encode};
284
285 #[test]
286 fn encode() {
287 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
288 let (ends, values) = runend_encode(&arr).unwrap();
289 let values = values.to_primitive().unwrap();
290
291 assert_eq!(ends.as_slice::<u8>(), vec![2, 5, 10]);
292 assert_eq!(values.as_slice::<i32>(), vec![1, 2, 3]);
293 }
294
295 #[test]
296 fn encode_nullable() {
297 let arr = PrimitiveArray::new(
298 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
299 Validity::from(BooleanBuffer::from(vec![
300 true, true, false, false, true, true, true, true, false, false,
301 ])),
302 );
303 let (ends, values) = runend_encode(&arr).unwrap();
304 let values = values.to_primitive().unwrap();
305
306 assert_eq!(ends.as_slice::<u8>(), vec![2, 4, 5, 8, 10]);
307 assert_eq!(values.as_slice::<i32>(), vec![1, 0, 2, 3, 0]);
308 }
309
310 #[test]
311 fn encode_all_null() {
312 let arr = PrimitiveArray::new(
313 buffer![0, 0, 0, 0, 0],
314 Validity::from(BooleanBuffer::new_unset(5)),
315 );
316 let (ends, values) = runend_encode(&arr).unwrap();
317 let values = values.to_primitive().unwrap();
318
319 assert_eq!(ends.as_slice::<u64>(), vec![5]);
320 assert_eq!(values.as_slice::<i32>(), vec![0]);
321 }
322
323 #[test]
324 fn decode() {
325 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
326 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
327 let decoded = runend_decode_primitive(ends, values, 0, 10).unwrap();
328
329 assert_eq!(
330 decoded.as_slice::<i32>(),
331 vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]
332 );
333 }
334}