1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::ExecutionCtx;
8use vortex_array::IntoArray;
9use vortex_array::arrays::BoolArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Primitive;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::VarBinViewArray;
14use vortex_array::arrays::bool::BoolArrayExt;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::dtype::NativePType;
18use vortex_array::dtype::Nullability;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::match_each_native_ptype;
22use vortex_array::match_each_unsigned_integer_ptype;
23use vortex_array::scalar::Scalar;
24use vortex_array::validity::Validity;
25use vortex_buffer::BitBuffer;
26use vortex_buffer::BitBufferMut;
27use vortex_buffer::Buffer;
28use vortex_buffer::BufferMut;
29use vortex_buffer::buffer;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_mask::Mask;
33
34use crate::iter::trimmed_ends_iter;
35
36pub fn runend_encode(
38 array: ArrayView<Primitive>,
39 ctx: &mut ExecutionCtx,
40) -> (PrimitiveArray, ArrayRef) {
41 let validity = match array
42 .validity()
43 .vortex_expect("run-end validity should be derivable")
44 {
45 Validity::NonNullable => None,
46 Validity::AllValid => None,
47 Validity::AllInvalid => {
48 let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
50 ends.statistics()
51 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
52 return (
53 ends,
54 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
55 );
56 }
57 Validity::Array(a) => {
58 let bool_array = a
59 .execute::<BoolArray>(ctx)
60 .vortex_expect("validity array must be convertible to bool");
61 Some(bool_array.to_bit_buffer())
62 }
63 };
64
65 let (ends, values) = match validity {
66 None => {
67 match_each_native_ptype!(array.ptype(), |P| {
68 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
69 (
70 PrimitiveArray::new(ends, Validity::NonNullable),
71 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
72 )
73 })
74 }
75 Some(validity) => {
76 match_each_native_ptype!(array.ptype(), |P| {
77 let (ends, values) =
78 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
79 (
80 PrimitiveArray::new(ends, Validity::NonNullable),
81 values.into_array(),
82 )
83 })
84 }
85 };
86
87 let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
88
89 ends.statistics()
90 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
91
92 (ends, values)
93}
94
95fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
96 let mut ends = BufferMut::empty();
97 let mut values = BufferMut::empty();
98
99 if elements.is_empty() {
100 return (ends.freeze(), values.freeze());
101 }
102
103 let mut prev = elements[0];
105 let mut end = 1;
106 for &e in elements.iter().skip(1) {
107 if e != prev {
108 ends.push(end);
109 values.push(prev);
110 }
111 prev = e;
112 end += 1;
113 }
114 ends.push(end);
115 values.push(prev);
116
117 (ends.freeze(), values.freeze())
118}
119
120fn runend_encode_nullable_primitive<T: NativePType>(
121 elements: &[T],
122 element_validity: BitBuffer,
123) -> (Buffer<u64>, PrimitiveArray) {
124 let mut ends = BufferMut::empty();
125 let mut values = BufferMut::empty();
126 let mut validity = BitBufferMut::with_capacity(values.capacity());
127
128 if elements.is_empty() {
129 return (
130 ends.freeze(),
131 PrimitiveArray::new(
132 values,
133 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
134 ),
135 );
136 }
137
138 let mut prev = element_validity.value(0).then(|| elements[0]);
140 let mut end = 1;
141 for e in elements
142 .iter()
143 .zip(element_validity.iter())
144 .map(|(&e, is_valid)| is_valid.then_some(e))
145 .skip(1)
146 {
147 if e != prev {
148 ends.push(end);
149 match prev {
150 None => {
151 validity.append(false);
152 values.push(T::default());
153 }
154 Some(p) => {
155 validity.append(true);
156 values.push(p);
157 }
158 }
159 }
160 prev = e;
161 end += 1;
162 }
163 ends.push(end);
164
165 match prev {
166 None => {
167 validity.append(false);
168 values.push(T::default());
169 }
170 Some(p) => {
171 validity.append(true);
172 values.push(p);
173 }
174 }
175
176 (
177 ends.freeze(),
178 PrimitiveArray::new(values, Validity::from(validity.freeze())),
179 )
180}
181
182pub fn runend_decode_primitive(
183 ends: PrimitiveArray,
184 values: PrimitiveArray,
185 offset: usize,
186 length: usize,
187 ctx: &mut ExecutionCtx,
188) -> VortexResult<PrimitiveArray> {
189 let validity_mask = values
190 .as_ref()
191 .validity()?
192 .execute_mask(values.as_ref().len(), ctx)?;
193 Ok(match_each_native_ptype!(values.ptype(), |P| {
194 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
195 runend_decode_typed_primitive(
196 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
197 values.as_slice::<P>(),
198 validity_mask,
199 values.dtype().nullability(),
200 length,
201 )
202 })
203 }))
204}
205
206fn runend_decode_slice<T: Copy + Default>(
210 run_ends: impl Iterator<Item = usize>,
211 values: &[T],
212 values_validity: Mask,
213 values_nullability: Nullability,
214 length: usize,
215) -> (Buffer<T>, Validity) {
216 match values_validity {
217 Mask::AllTrue(_) => {
218 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
219 for (end, value) in run_ends.zip_eq(values) {
220 assert!(
221 end >= decoded.len(),
222 "Runend ends must be monotonic, got {end} after {}",
223 decoded.len()
224 );
225 assert!(end <= length, "Runend end must be less than overall length");
226 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
229 }
230 (decoded.into(), values_nullability.into())
231 }
232 Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
233 Mask::Values(mask) => {
234 let mut decoded = BufferMut::with_capacity(length);
235 let mut decoded_validity = BitBufferMut::with_capacity(length);
236 for (end, value) in run_ends.zip_eq(
237 values
238 .iter()
239 .zip(mask.bit_buffer().iter())
240 .map(|(&v, is_valid)| is_valid.then_some(v)),
241 ) {
242 assert!(
243 end >= decoded.len(),
244 "Runend ends must be monotonic, got {end} after {}",
245 decoded.len()
246 );
247 assert!(end <= length, "Runend end must be less than overall length");
248 match value {
249 None => {
250 decoded_validity.append_n(false, end - decoded.len());
251 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
254 }
255 Some(value) => {
256 decoded_validity.append_n(true, end - decoded.len());
257 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
260 }
261 }
262 }
263 (decoded.into(), Validity::from(decoded_validity.freeze()))
264 }
265 }
266}
267
268pub fn runend_decode_typed_primitive<T: NativePType>(
269 run_ends: impl Iterator<Item = usize>,
270 values: &[T],
271 values_validity: Mask,
272 values_nullability: Nullability,
273 length: usize,
274) -> PrimitiveArray {
275 let (decoded, validity) = runend_decode_slice(
276 run_ends,
277 values,
278 values_validity,
279 values_nullability,
280 length,
281 );
282 PrimitiveArray::new(decoded, validity)
283}
284
285pub fn runend_decode_varbinview(
287 ends: PrimitiveArray,
288 values: VarBinViewArray,
289 offset: usize,
290 length: usize,
291 ctx: &mut ExecutionCtx,
292) -> VortexResult<VarBinViewArray> {
293 let validity_mask = values
294 .as_ref()
295 .validity()?
296 .execute_mask(values.as_ref().len(), ctx)?;
297 let views = values.views();
298
299 let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
300 runend_decode_slice(
301 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
302 views,
303 validity_mask,
304 values.dtype().nullability(),
305 length,
306 )
307 });
308
309 let parts = values.into_data_parts();
310 let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
311
312 Ok(unsafe {
315 VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
316 })
317}
318
319#[cfg(test)]
320mod tests {
321 use vortex_array::LEGACY_SESSION;
322 use vortex_array::VortexSessionExecute;
323 use vortex_array::arrays::PrimitiveArray;
324 use vortex_array::assert_arrays_eq;
325 use vortex_array::validity::Validity;
326 use vortex_buffer::BitBuffer;
327 use vortex_buffer::buffer;
328 use vortex_error::VortexResult;
329
330 use crate::compress::runend_decode_primitive;
331 use crate::compress::runend_encode;
332
333 #[test]
334 fn encode() -> VortexResult<()> {
335 let mut ctx = LEGACY_SESSION.create_execution_ctx();
336 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
337 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
338 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
339
340 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
341 assert_arrays_eq!(ends, expected_ends);
342 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
343 assert_arrays_eq!(values, expected_values);
344 Ok(())
345 }
346
347 #[test]
348 fn encode_nullable() -> VortexResult<()> {
349 let mut ctx = LEGACY_SESSION.create_execution_ctx();
350 let arr = PrimitiveArray::new(
351 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
352 Validity::from(BitBuffer::from(vec![
353 true, true, false, false, true, true, true, true, false, false,
354 ])),
355 );
356 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
357 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
358
359 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
360 assert_arrays_eq!(ends, expected_ends);
361 let expected_values =
362 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
363 assert_arrays_eq!(values, expected_values);
364 Ok(())
365 }
366
367 #[test]
368 fn encode_all_null() -> VortexResult<()> {
369 let mut ctx = LEGACY_SESSION.create_execution_ctx();
370 let arr = PrimitiveArray::new(
371 buffer![0, 0, 0, 0, 0],
372 Validity::from(BitBuffer::new_unset(5)),
373 );
374 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
375 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
376
377 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
378 assert_arrays_eq!(ends, expected_ends);
379 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
380 assert_arrays_eq!(values, expected_values);
381 Ok(())
382 }
383
384 #[test]
385 fn decode() -> VortexResult<()> {
386 let mut ctx = LEGACY_SESSION.create_execution_ctx();
387 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
388 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
389 let decoded = runend_decode_primitive(ends, values, 0, 10, &mut ctx)?;
390
391 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
392 assert_arrays_eq!(decoded, expected);
393 Ok(())
394 }
395}