1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::IntoArray;
8use vortex_array::ToCanonical;
9use vortex_array::arrays::BoolArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Primitive;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::VarBinViewArray;
14use vortex_array::arrays::bool::BoolArrayExt;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::dtype::NativePType;
18use vortex_array::dtype::Nullability;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::match_each_native_ptype;
22use vortex_array::match_each_unsigned_integer_ptype;
23use vortex_array::scalar::Scalar;
24use vortex_array::validity::Validity;
25use vortex_buffer::BitBuffer;
26use vortex_buffer::BitBufferMut;
27use vortex_buffer::Buffer;
28use vortex_buffer::BufferMut;
29use vortex_buffer::buffer;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_mask::Mask;
33
34use crate::iter::trimmed_ends_iter;
35
36pub fn runend_encode(array: ArrayView<Primitive>) -> (PrimitiveArray, ArrayRef) {
38 let validity = match array
39 .validity()
40 .vortex_expect("run-end validity should be derivable")
41 {
42 Validity::NonNullable => None,
43 Validity::AllValid => None,
44 Validity::AllInvalid => {
45 let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
47 ends.statistics()
48 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
49 return (
50 ends,
51 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
52 );
53 }
54 Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
55 };
56
57 let (ends, values) = match validity {
58 None => {
59 match_each_native_ptype!(array.ptype(), |P| {
60 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
61 (
62 PrimitiveArray::new(ends, Validity::NonNullable),
63 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
64 )
65 })
66 }
67 Some(validity) => {
68 match_each_native_ptype!(array.ptype(), |P| {
69 let (ends, values) =
70 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
71 (
72 PrimitiveArray::new(ends, Validity::NonNullable),
73 values.into_array(),
74 )
75 })
76 }
77 };
78
79 let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
80
81 ends.statistics()
82 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
83
84 (ends, values)
85}
86
87fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
88 let mut ends = BufferMut::empty();
89 let mut values = BufferMut::empty();
90
91 if elements.is_empty() {
92 return (ends.freeze(), values.freeze());
93 }
94
95 let mut prev = elements[0];
97 let mut end = 1;
98 for &e in elements.iter().skip(1) {
99 if e != prev {
100 ends.push(end);
101 values.push(prev);
102 }
103 prev = e;
104 end += 1;
105 }
106 ends.push(end);
107 values.push(prev);
108
109 (ends.freeze(), values.freeze())
110}
111
112fn runend_encode_nullable_primitive<T: NativePType>(
113 elements: &[T],
114 element_validity: BitBuffer,
115) -> (Buffer<u64>, PrimitiveArray) {
116 let mut ends = BufferMut::empty();
117 let mut values = BufferMut::empty();
118 let mut validity = BitBufferMut::with_capacity(values.capacity());
119
120 if elements.is_empty() {
121 return (
122 ends.freeze(),
123 PrimitiveArray::new(
124 values,
125 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
126 ),
127 );
128 }
129
130 let mut prev = element_validity.value(0).then(|| elements[0]);
132 let mut end = 1;
133 for e in elements
134 .iter()
135 .zip(element_validity.iter())
136 .map(|(&e, is_valid)| is_valid.then_some(e))
137 .skip(1)
138 {
139 if e != prev {
140 ends.push(end);
141 match prev {
142 None => {
143 validity.append(false);
144 values.push(T::default());
145 }
146 Some(p) => {
147 validity.append(true);
148 values.push(p);
149 }
150 }
151 }
152 prev = e;
153 end += 1;
154 }
155 ends.push(end);
156
157 match prev {
158 None => {
159 validity.append(false);
160 values.push(T::default());
161 }
162 Some(p) => {
163 validity.append(true);
164 values.push(p);
165 }
166 }
167
168 (
169 ends.freeze(),
170 PrimitiveArray::new(values, Validity::from(validity.freeze())),
171 )
172}
173
174pub fn runend_decode_primitive(
175 ends: PrimitiveArray,
176 values: PrimitiveArray,
177 offset: usize,
178 length: usize,
179) -> VortexResult<PrimitiveArray> {
180 let validity_mask = values.validity_mask()?;
181 Ok(match_each_native_ptype!(values.ptype(), |P| {
182 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
183 runend_decode_typed_primitive(
184 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
185 values.as_slice::<P>(),
186 validity_mask,
187 values.dtype().nullability(),
188 length,
189 )
190 })
191 }))
192}
193
194fn runend_decode_slice<T: Copy + Default>(
198 run_ends: impl Iterator<Item = usize>,
199 values: &[T],
200 values_validity: Mask,
201 values_nullability: Nullability,
202 length: usize,
203) -> (Buffer<T>, Validity) {
204 match values_validity {
205 Mask::AllTrue(_) => {
206 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
207 for (end, value) in run_ends.zip_eq(values) {
208 assert!(
209 end >= decoded.len(),
210 "Runend ends must be monotonic, got {end} after {}",
211 decoded.len()
212 );
213 assert!(end <= length, "Runend end must be less than overall length");
214 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
217 }
218 (decoded.into(), values_nullability.into())
219 }
220 Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
221 Mask::Values(mask) => {
222 let mut decoded = BufferMut::with_capacity(length);
223 let mut decoded_validity = BitBufferMut::with_capacity(length);
224 for (end, value) in run_ends.zip_eq(
225 values
226 .iter()
227 .zip(mask.bit_buffer().iter())
228 .map(|(&v, is_valid)| is_valid.then_some(v)),
229 ) {
230 assert!(
231 end >= decoded.len(),
232 "Runend ends must be monotonic, got {end} after {}",
233 decoded.len()
234 );
235 assert!(end <= length, "Runend end must be less than overall length");
236 match value {
237 None => {
238 decoded_validity.append_n(false, end - decoded.len());
239 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
242 }
243 Some(value) => {
244 decoded_validity.append_n(true, end - decoded.len());
245 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
248 }
249 }
250 }
251 (decoded.into(), Validity::from(decoded_validity.freeze()))
252 }
253 }
254}
255
256pub fn runend_decode_typed_primitive<T: NativePType>(
257 run_ends: impl Iterator<Item = usize>,
258 values: &[T],
259 values_validity: Mask,
260 values_nullability: Nullability,
261 length: usize,
262) -> PrimitiveArray {
263 let (decoded, validity) = runend_decode_slice(
264 run_ends,
265 values,
266 values_validity,
267 values_nullability,
268 length,
269 );
270 PrimitiveArray::new(decoded, validity)
271}
272
273pub fn runend_decode_varbinview(
275 ends: PrimitiveArray,
276 values: VarBinViewArray,
277 offset: usize,
278 length: usize,
279) -> VortexResult<VarBinViewArray> {
280 let validity_mask = values.validity_mask()?;
281 let views = values.views();
282
283 let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
284 runend_decode_slice(
285 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
286 views,
287 validity_mask,
288 values.dtype().nullability(),
289 length,
290 )
291 });
292
293 let parts = values.into_data_parts();
294 let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
295
296 Ok(unsafe {
299 VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
300 })
301}
302
303#[cfg(test)]
304mod test {
305 use vortex_array::ToCanonical;
306 use vortex_array::arrays::PrimitiveArray;
307 use vortex_array::assert_arrays_eq;
308 use vortex_array::validity::Validity;
309 use vortex_buffer::BitBuffer;
310 use vortex_buffer::buffer;
311 use vortex_error::VortexResult;
312
313 use crate::compress::runend_decode_primitive;
314 use crate::compress::runend_encode;
315
316 #[test]
317 fn encode() {
318 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
319 let (ends, values) = runend_encode(arr.as_view());
320 let values = values.to_primitive();
321
322 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
323 assert_arrays_eq!(ends, expected_ends);
324 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
325 assert_arrays_eq!(values, expected_values);
326 }
327
328 #[test]
329 fn encode_nullable() {
330 let arr = PrimitiveArray::new(
331 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
332 Validity::from(BitBuffer::from(vec![
333 true, true, false, false, true, true, true, true, false, false,
334 ])),
335 );
336 let (ends, values) = runend_encode(arr.as_view());
337 let values = values.to_primitive();
338
339 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
340 assert_arrays_eq!(ends, expected_ends);
341 let expected_values =
342 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
343 assert_arrays_eq!(values, expected_values);
344 }
345
346 #[test]
347 fn encode_all_null() {
348 let arr = PrimitiveArray::new(
349 buffer![0, 0, 0, 0, 0],
350 Validity::from(BitBuffer::new_unset(5)),
351 );
352 let (ends, values) = runend_encode(arr.as_view());
353 let values = values.to_primitive();
354
355 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
356 assert_arrays_eq!(ends, expected_ends);
357 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
358 assert_arrays_eq!(values, expected_values);
359 }
360
361 #[test]
362 fn decode() -> VortexResult<()> {
363 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
364 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
365 let decoded = runend_decode_primitive(ends, values, 0, 10)?;
366
367 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
368 assert_arrays_eq!(decoded, expected);
369 Ok(())
370 }
371}