1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::IntoArray;
7use vortex_array::ToCanonical;
8use vortex_array::arrays::BoolArray;
9use vortex_array::arrays::ConstantArray;
10use vortex_array::arrays::PrimitiveArray;
11use vortex_array::validity::Validity;
12use vortex_array::vtable::ValidityHelper;
13use vortex_buffer::BitBuffer;
14use vortex_buffer::BitBufferMut;
15use vortex_buffer::Buffer;
16use vortex_buffer::BufferMut;
17use vortex_buffer::buffer;
18use vortex_dtype::NativePType;
19use vortex_dtype::Nullability;
20use vortex_dtype::match_each_native_ptype;
21use vortex_dtype::match_each_unsigned_integer_ptype;
22use vortex_error::VortexExpect;
23use vortex_mask::Mask;
24use vortex_scalar::Scalar;
25
26use crate::iter::trimmed_ends_iter;
27
28pub fn runend_encode(array: &PrimitiveArray) -> (PrimitiveArray, ArrayRef) {
30 let validity = match array.validity() {
31 Validity::NonNullable => None,
32 Validity::AllValid => None,
33 Validity::AllInvalid => {
34 return (
36 PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable),
37 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
38 );
39 }
40 Validity::Array(a) => Some(a.to_bool().bit_buffer().clone()),
41 };
42
43 let (ends, values) = match validity {
44 None => {
45 match_each_native_ptype!(array.ptype(), |P| {
46 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
47 (
48 PrimitiveArray::new(ends, Validity::NonNullable),
49 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
50 )
51 })
52 }
53 Some(validity) => {
54 match_each_native_ptype!(array.ptype(), |P| {
55 let (ends, values) =
56 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
57 (
58 PrimitiveArray::new(ends, Validity::NonNullable),
59 values.into_array(),
60 )
61 })
62 }
63 };
64
65 let ends = ends
66 .narrow()
67 .vortex_expect("Ends must succeed downcasting")
68 .to_primitive();
69
70 (ends, values)
71}
72
73fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
74 let mut ends = BufferMut::empty();
75 let mut values = BufferMut::empty();
76
77 if elements.is_empty() {
78 return (ends.freeze(), values.freeze());
79 }
80
81 let mut prev = elements[0];
83 let mut end = 1;
84 for &e in elements.iter().skip(1) {
85 if e != prev {
86 ends.push(end);
87 values.push(prev);
88 }
89 prev = e;
90 end += 1;
91 }
92 ends.push(end);
93 values.push(prev);
94
95 (ends.freeze(), values.freeze())
96}
97
98fn runend_encode_nullable_primitive<T: NativePType>(
99 elements: &[T],
100 element_validity: BitBuffer,
101) -> (Buffer<u64>, PrimitiveArray) {
102 let mut ends = BufferMut::empty();
103 let mut values = BufferMut::empty();
104 let mut validity = BitBufferMut::with_capacity(values.capacity());
105
106 if elements.is_empty() {
107 return (
108 ends.freeze(),
109 PrimitiveArray::new(
110 values,
111 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
112 ),
113 );
114 }
115
116 let mut prev = element_validity.value(0).then(|| elements[0]);
118 let mut end = 1;
119 for e in elements
120 .iter()
121 .zip(element_validity.iter())
122 .map(|(&e, is_valid)| is_valid.then_some(e))
123 .skip(1)
124 {
125 if e != prev {
126 ends.push(end);
127 match prev {
128 None => {
129 validity.append(false);
130 values.push(T::default());
131 }
132 Some(p) => {
133 validity.append(true);
134 values.push(p);
135 }
136 }
137 }
138 prev = e;
139 end += 1;
140 }
141 ends.push(end);
142
143 match prev {
144 None => {
145 validity.append(false);
146 values.push(T::default());
147 }
148 Some(p) => {
149 validity.append(true);
150 values.push(p);
151 }
152 }
153
154 (
155 ends.freeze(),
156 PrimitiveArray::new(values, Validity::from(validity.freeze())),
157 )
158}
159
160pub fn runend_decode_primitive(
161 ends: PrimitiveArray,
162 values: PrimitiveArray,
163 offset: usize,
164 length: usize,
165) -> PrimitiveArray {
166 match_each_native_ptype!(values.ptype(), |P| {
167 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
168 runend_decode_typed_primitive(
169 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
170 values.as_slice::<P>(),
171 values.validity_mask(),
172 values.dtype().nullability(),
173 length,
174 )
175 })
176 })
177}
178
179pub fn runend_decode_bools(
180 ends: PrimitiveArray,
181 values: BoolArray,
182 offset: usize,
183 length: usize,
184) -> BoolArray {
185 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
186 runend_decode_typed_bool(
187 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
188 values.bit_buffer(),
189 values.validity_mask(),
190 values.dtype().nullability(),
191 length,
192 )
193 })
194}
195
196pub fn runend_decode_typed_primitive<T: NativePType>(
197 run_ends: impl Iterator<Item = usize>,
198 values: &[T],
199 values_validity: Mask,
200 values_nullability: Nullability,
201 length: usize,
202) -> PrimitiveArray {
203 match values_validity {
204 Mask::AllTrue(_) => {
205 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
206 for (end, value) in run_ends.zip_eq(values) {
207 assert!(end <= length, "Runend end must be less than overall length");
208 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
211 }
212 PrimitiveArray::new(decoded, values_nullability.into())
213 }
214 Mask::AllFalse(_) => PrimitiveArray::new(Buffer::<T>::zeroed(length), Validity::AllInvalid),
215 Mask::Values(mask) => {
216 let mut decoded = BufferMut::with_capacity(length);
217 let mut decoded_validity = BitBufferMut::with_capacity(length);
218 for (end, value) in run_ends.zip_eq(
219 values
220 .iter()
221 .zip(mask.bit_buffer().iter())
222 .map(|(&v, is_valid)| is_valid.then_some(v)),
223 ) {
224 assert!(end <= length, "Runend end must be less than overall length");
225 match value {
226 None => {
227 decoded_validity.append_n(false, end - decoded.len());
228 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
231 }
232 Some(value) => {
233 decoded_validity.append_n(true, end - decoded.len());
234 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
237 }
238 }
239 }
240 PrimitiveArray::new(decoded, Validity::from(decoded_validity.freeze()))
241 }
242 }
243}
244
245pub fn runend_decode_typed_bool(
246 run_ends: impl Iterator<Item = usize>,
247 values: &BitBuffer,
248 values_validity: Mask,
249 values_nullability: Nullability,
250 length: usize,
251) -> BoolArray {
252 match values_validity {
253 Mask::AllTrue(_) => {
254 let mut decoded = BitBufferMut::with_capacity(length);
255 for (end, value) in run_ends.zip_eq(values.iter()) {
256 decoded.append_n(value, end - decoded.len());
257 }
258 BoolArray::from_bit_buffer(decoded.freeze(), values_nullability.into())
259 }
260 Mask::AllFalse(_) => {
261 BoolArray::from_bit_buffer(BitBuffer::new_unset(length), Validity::AllInvalid)
262 }
263 Mask::Values(mask) => {
264 let mut decoded = BitBufferMut::with_capacity(length);
265 let mut decoded_validity = BitBufferMut::with_capacity(length);
266 for (end, value) in run_ends.zip_eq(
267 values
268 .iter()
269 .zip(mask.bit_buffer().iter())
270 .map(|(v, is_valid)| is_valid.then_some(v)),
271 ) {
272 match value {
273 None => {
274 decoded_validity.append_n(false, end - decoded.len());
275 decoded.append_n(false, end - decoded.len());
276 }
277 Some(value) => {
278 decoded_validity.append_n(true, end - decoded.len());
279 decoded.append_n(value, end - decoded.len());
280 }
281 }
282 }
283 BoolArray::from_bit_buffer(decoded.freeze(), Validity::from(decoded_validity.freeze()))
284 }
285 }
286}
287
288#[cfg(test)]
289mod test {
290 use vortex_array::ToCanonical;
291 use vortex_array::arrays::PrimitiveArray;
292 use vortex_array::assert_arrays_eq;
293 use vortex_array::validity::Validity;
294 use vortex_buffer::BitBuffer;
295 use vortex_buffer::buffer;
296
297 use crate::compress::runend_decode_primitive;
298 use crate::compress::runend_encode;
299
300 #[test]
301 fn encode() {
302 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
303 let (ends, values) = runend_encode(&arr);
304 let values = values.to_primitive();
305
306 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
307 assert_arrays_eq!(ends, expected_ends);
308 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
309 assert_arrays_eq!(values, expected_values);
310 }
311
312 #[test]
313 fn encode_nullable() {
314 let arr = PrimitiveArray::new(
315 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
316 Validity::from(BitBuffer::from(vec![
317 true, true, false, false, true, true, true, true, false, false,
318 ])),
319 );
320 let (ends, values) = runend_encode(&arr);
321 let values = values.to_primitive();
322
323 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
324 assert_arrays_eq!(ends, expected_ends);
325 let expected_values =
326 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
327 assert_arrays_eq!(values, expected_values);
328 }
329
330 #[test]
331 fn encode_all_null() {
332 let arr = PrimitiveArray::new(
333 buffer![0, 0, 0, 0, 0],
334 Validity::from(BitBuffer::new_unset(5)),
335 );
336 let (ends, values) = runend_encode(&arr);
337 let values = values.to_primitive();
338
339 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
340 assert_arrays_eq!(ends, expected_ends);
341 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
342 assert_arrays_eq!(values, expected_values);
343 }
344
345 #[test]
346 fn decode() {
347 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
348 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
349 let decoded = runend_decode_primitive(ends, values, 0, 10);
350
351 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
352 assert_arrays_eq!(decoded, expected);
353 }
354}