1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::IntoArray;
7use vortex_array::ToCanonical;
8use vortex_array::arrays::BoolArray;
9use vortex_array::arrays::ConstantArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::arrays::VarBinViewArray;
12use vortex_array::buffer::BufferHandle;
13use vortex_array::dtype::NativePType;
14use vortex_array::dtype::Nullability;
15use vortex_array::expr::stats::Precision;
16use vortex_array::expr::stats::Stat;
17use vortex_array::match_each_native_ptype;
18use vortex_array::match_each_unsigned_integer_ptype;
19use vortex_array::scalar::Scalar;
20use vortex_array::validity::Validity;
21use vortex_array::vtable::ValidityHelper;
22use vortex_buffer::BitBuffer;
23use vortex_buffer::BitBufferMut;
24use vortex_buffer::Buffer;
25use vortex_buffer::BufferMut;
26use vortex_buffer::buffer;
27use vortex_error::VortexExpect;
28use vortex_error::VortexResult;
29use vortex_mask::Mask;
30
31use crate::iter::trimmed_ends_iter;
32
33pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
35 let validity = match array.validity() {
36 Validity::NonNullable => None,
37 Validity::AllValid => None,
38 Validity::AllInvalid => {
39 let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
41 ends.statistics()
42 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
43 return (
44 ends,
45 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
46 );
47 }
48 Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
49 };
50
51 let (ends, values) = match validity {
52 None => {
53 match_each_native_ptype!(array.ptype(), |P| {
54 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
55 (
56 PrimitiveArray::new(ends, Validity::NonNullable),
57 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
58 )
59 })
60 }
61 Some(validity) => {
62 match_each_native_ptype!(array.ptype(), |P| {
63 let (ends, values) =
64 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
65 (
66 PrimitiveArray::new(ends, Validity::NonNullable),
67 values.into_array(),
68 )
69 })
70 }
71 };
72
73 let ends = ends
74 .narrow()
75 .vortex_expect("Ends must succeed downcasting")
76 .to_primitive();
77
78 ends.statistics()
79 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
80
81 (ends, values)
82}
83
84fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
85 let mut ends = BufferMut::empty();
86 let mut values = BufferMut::empty();
87
88 if elements.is_empty() {
89 return (ends.freeze(), values.freeze());
90 }
91
92 let mut prev = elements[0];
94 let mut end = 1;
95 for &e in elements.iter().skip(1) {
96 if e != prev {
97 ends.push(end);
98 values.push(prev);
99 }
100 prev = e;
101 end += 1;
102 }
103 ends.push(end);
104 values.push(prev);
105
106 (ends.freeze(), values.freeze())
107}
108
109fn runend_encode_nullable_primitive<T: NativePType>(
110 elements: &[T],
111 element_validity: BitBuffer,
112) -> (Buffer<u64>, PrimitiveArray) {
113 let mut ends = BufferMut::empty();
114 let mut values = BufferMut::empty();
115 let mut validity = BitBufferMut::with_capacity(values.capacity());
116
117 if elements.is_empty() {
118 return (
119 ends.freeze(),
120 PrimitiveArray::new(
121 values,
122 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
123 ),
124 );
125 }
126
127 let mut prev = element_validity.value(0).then(|| elements[0]);
129 let mut end = 1;
130 for e in elements
131 .iter()
132 .zip(element_validity.iter())
133 .map(|(&e, is_valid)| is_valid.then_some(e))
134 .skip(1)
135 {
136 if e != prev {
137 ends.push(end);
138 match prev {
139 None => {
140 validity.append(false);
141 values.push(T::default());
142 }
143 Some(p) => {
144 validity.append(true);
145 values.push(p);
146 }
147 }
148 }
149 prev = e;
150 end += 1;
151 }
152 ends.push(end);
153
154 match prev {
155 None => {
156 validity.append(false);
157 values.push(T::default());
158 }
159 Some(p) => {
160 validity.append(true);
161 values.push(p);
162 }
163 }
164
165 (
166 ends.freeze(),
167 PrimitiveArray::new(values, Validity::from(validity.freeze())),
168 )
169}
170
171pub fn runend_decode_primitive(
172 ends: PrimitiveArray,
173 values: PrimitiveArray,
174 offset: usize,
175 length: usize,
176) -> VortexResult<PrimitiveArray> {
177 let validity_mask = values.validity_mask()?;
178 Ok(match_each_native_ptype!(values.ptype(), |P| {
179 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
180 runend_decode_typed_primitive(
181 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
182 values.as_slice::<P>(),
183 validity_mask,
184 values.dtype().nullability(),
185 length,
186 )
187 })
188 }))
189}
190
191pub fn runend_decode_bools(
192 ends: PrimitiveArray,
193 values: BoolArray,
194 offset: usize,
195 length: usize,
196) -> VortexResult<BoolArray> {
197 let validity_mask = values.validity_mask()?;
198 Ok(match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
199 runend_decode_typed_bool(
200 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
201 &values.to_bit_buffer(),
202 validity_mask,
203 values.dtype().nullability(),
204 length,
205 )
206 }))
207}
208
209fn runend_decode_slice<T: Copy + Default>(
213 run_ends: impl Iterator<Item = usize>,
214 values: &[T],
215 values_validity: Mask,
216 values_nullability: Nullability,
217 length: usize,
218) -> (Buffer<T>, Validity) {
219 match values_validity {
220 Mask::AllTrue(_) => {
221 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
222 for (end, value) in run_ends.zip_eq(values) {
223 assert!(
224 end >= decoded.len(),
225 "Runend ends must be monotonic, got {end} after {}",
226 decoded.len()
227 );
228 assert!(end <= length, "Runend end must be less than overall length");
229 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
232 }
233 (decoded.into(), values_nullability.into())
234 }
235 Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
236 Mask::Values(mask) => {
237 let mut decoded = BufferMut::with_capacity(length);
238 let mut decoded_validity = BitBufferMut::with_capacity(length);
239 for (end, value) in run_ends.zip_eq(
240 values
241 .iter()
242 .zip(mask.bit_buffer().iter())
243 .map(|(&v, is_valid)| is_valid.then_some(v)),
244 ) {
245 assert!(
246 end >= decoded.len(),
247 "Runend ends must be monotonic, got {end} after {}",
248 decoded.len()
249 );
250 assert!(end <= length, "Runend end must be less than overall length");
251 match value {
252 None => {
253 decoded_validity.append_n(false, end - decoded.len());
254 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
257 }
258 Some(value) => {
259 decoded_validity.append_n(true, end - decoded.len());
260 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
263 }
264 }
265 }
266 (decoded.into(), Validity::from(decoded_validity.freeze()))
267 }
268 }
269}
270
271pub fn runend_decode_typed_primitive<T: NativePType>(
272 run_ends: impl Iterator<Item = usize>,
273 values: &[T],
274 values_validity: Mask,
275 values_nullability: Nullability,
276 length: usize,
277) -> PrimitiveArray {
278 let (decoded, validity) = runend_decode_slice(
279 run_ends,
280 values,
281 values_validity,
282 values_nullability,
283 length,
284 );
285 PrimitiveArray::new(decoded, validity)
286}
287
288pub fn runend_decode_typed_bool(
289 run_ends: impl Iterator<Item = usize>,
290 values: &BitBuffer,
291 values_validity: Mask,
292 values_nullability: Nullability,
293 length: usize,
294) -> BoolArray {
295 match values_validity {
296 Mask::AllTrue(_) => {
297 let mut decoded = BitBufferMut::with_capacity(length);
298 for (end, value) in run_ends.zip_eq(values.iter()) {
299 decoded.append_n(value, end - decoded.len());
300 }
301 BoolArray::new(decoded.freeze(), values_nullability.into())
302 }
303 Mask::AllFalse(_) => BoolArray::new(BitBuffer::new_unset(length), Validity::AllInvalid),
304 Mask::Values(mask) => {
305 let mut decoded = BitBufferMut::with_capacity(length);
306 let mut decoded_validity = BitBufferMut::with_capacity(length);
307 for (end, value) in run_ends.zip_eq(
308 values
309 .iter()
310 .zip(mask.bit_buffer().iter())
311 .map(|(v, is_valid)| is_valid.then_some(v)),
312 ) {
313 match value {
314 None => {
315 decoded_validity.append_n(false, end - decoded.len());
316 decoded.append_n(false, end - decoded.len());
317 }
318 Some(value) => {
319 decoded_validity.append_n(true, end - decoded.len());
320 decoded.append_n(value, end - decoded.len());
321 }
322 }
323 }
324 BoolArray::new(decoded.freeze(), Validity::from(decoded_validity.freeze()))
325 }
326 }
327}
328
329pub fn runend_decode_varbinview(
331 ends: PrimitiveArray,
332 values: VarBinViewArray,
333 offset: usize,
334 length: usize,
335) -> VortexResult<VarBinViewArray> {
336 let validity_mask = values.validity_mask()?;
337 let views = values.views();
338
339 let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
340 runend_decode_slice(
341 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
342 views,
343 validity_mask,
344 values.dtype().nullability(),
345 length,
346 )
347 });
348
349 let parts = values.into_parts();
350 let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
351
352 Ok(unsafe {
355 VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
356 })
357}
358
359#[cfg(test)]
360mod test {
361 use vortex_array::ToCanonical;
362 use vortex_array::arrays::PrimitiveArray;
363 use vortex_array::assert_arrays_eq;
364 use vortex_array::validity::Validity;
365 use vortex_buffer::BitBuffer;
366 use vortex_buffer::buffer;
367 use vortex_error::VortexResult;
368
369 use crate::compress::runend_decode_primitive;
370 use crate::compress::runend_encode;
371
372 #[test]
373 fn encode() {
374 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
375 let (ends, values) = runend_encode(&arr);
376 let values = values.to_primitive();
377
378 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
379 assert_arrays_eq!(ends, expected_ends);
380 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
381 assert_arrays_eq!(values, expected_values);
382 }
383
384 #[test]
385 fn encode_nullable() {
386 let arr = PrimitiveArray::new(
387 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
388 Validity::from(BitBuffer::from(vec![
389 true, true, false, false, true, true, true, true, false, false,
390 ])),
391 );
392 let (ends, values) = runend_encode(&arr);
393 let values = values.to_primitive();
394
395 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
396 assert_arrays_eq!(ends, expected_ends);
397 let expected_values =
398 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
399 assert_arrays_eq!(values, expected_values);
400 }
401
402 #[test]
403 fn encode_all_null() {
404 let arr = PrimitiveArray::new(
405 buffer![0, 0, 0, 0, 0],
406 Validity::from(BitBuffer::new_unset(5)),
407 );
408 let (ends, values) = runend_encode(&arr);
409 let values = values.to_primitive();
410
411 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
412 assert_arrays_eq!(ends, expected_ends);
413 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
414 assert_arrays_eq!(values, expected_values);
415 }
416
417 #[test]
418 fn decode() -> VortexResult<()> {
419 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
420 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
421 let decoded = runend_decode_primitive(ends, values, 0, 10)?;
422
423 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
424 assert_arrays_eq!(decoded, expected);
425 Ok(())
426 }
427}