1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::ExecutionCtx;
8use vortex_array::IntoArray;
9use vortex_array::arrays::BoolArray;
10use vortex_array::arrays::ConstantArray;
11use vortex_array::arrays::Primitive;
12use vortex_array::arrays::PrimitiveArray;
13use vortex_array::arrays::VarBinViewArray;
14use vortex_array::arrays::bool::BoolArrayExt;
15use vortex_array::arrays::primitive::PrimitiveArrayExt;
16use vortex_array::buffer::BufferHandle;
17use vortex_array::dtype::NativePType;
18use vortex_array::dtype::Nullability;
19use vortex_array::expr::stats::Precision;
20use vortex_array::expr::stats::Stat;
21use vortex_array::match_each_native_ptype;
22use vortex_array::match_each_unsigned_integer_ptype;
23use vortex_array::scalar::Scalar;
24use vortex_array::validity::Validity;
25use vortex_buffer::BitBuffer;
26use vortex_buffer::BitBufferMut;
27use vortex_buffer::Buffer;
28use vortex_buffer::BufferMut;
29use vortex_buffer::buffer;
30use vortex_error::VortexExpect;
31use vortex_error::VortexResult;
32use vortex_mask::Mask;
33
34use crate::iter::trimmed_ends_iter;
35
36pub fn runend_encode(
38 array: ArrayView<Primitive>,
39 ctx: &mut ExecutionCtx,
40) -> (PrimitiveArray, ArrayRef) {
41 let validity = match array
42 .validity()
43 .vortex_expect("run-end validity should be derivable")
44 {
45 Validity::NonNullable => None,
46 Validity::AllValid => None,
47 Validity::AllInvalid => {
48 let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
50 ends.statistics()
51 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
52 return (
53 ends,
54 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
55 );
56 }
57 Validity::Array(a) => {
58 let bool_array = a
59 .execute::<BoolArray>(ctx)
60 .vortex_expect("validity array must be convertible to bool");
61 Some(bool_array.to_bit_buffer())
62 }
63 };
64
65 let (ends, values) = match validity {
66 None => {
67 match_each_native_ptype!(array.ptype(), |P| {
68 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
69 (
70 PrimitiveArray::new(ends, Validity::NonNullable),
71 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
72 )
73 })
74 }
75 Some(validity) => {
76 match_each_native_ptype!(array.ptype(), |P| {
77 let (ends, values) =
78 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
79 (
80 PrimitiveArray::new(ends, Validity::NonNullable),
81 values.into_array(),
82 )
83 })
84 }
85 };
86
87 let ends = ends
88 .narrow(ctx)
89 .vortex_expect("Ends must succeed downcasting");
90
91 ends.statistics()
92 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
93
94 (ends, values)
95}
96
97fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
98 let mut ends = BufferMut::empty();
99 let mut values = BufferMut::empty();
100
101 if elements.is_empty() {
102 return (ends.freeze(), values.freeze());
103 }
104
105 let mut prev = elements[0];
107 let mut end = 1;
108 for &e in elements.iter().skip(1) {
109 if e != prev {
110 ends.push(end);
111 values.push(prev);
112 }
113 prev = e;
114 end += 1;
115 }
116 ends.push(end);
117 values.push(prev);
118
119 (ends.freeze(), values.freeze())
120}
121
122fn runend_encode_nullable_primitive<T: NativePType>(
123 elements: &[T],
124 element_validity: BitBuffer,
125) -> (Buffer<u64>, PrimitiveArray) {
126 let mut ends = BufferMut::empty();
127 let mut values = BufferMut::empty();
128 let mut validity = BitBufferMut::with_capacity(values.capacity());
129
130 if elements.is_empty() {
131 return (
132 ends.freeze(),
133 PrimitiveArray::new(
134 values,
135 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
136 ),
137 );
138 }
139
140 let mut prev = element_validity.value(0).then(|| elements[0]);
142 let mut end = 1;
143 for e in elements
144 .iter()
145 .zip(element_validity.iter())
146 .map(|(&e, is_valid)| is_valid.then_some(e))
147 .skip(1)
148 {
149 if e != prev {
150 ends.push(end);
151 match prev {
152 None => {
153 validity.append(false);
154 values.push(T::default());
155 }
156 Some(p) => {
157 validity.append(true);
158 values.push(p);
159 }
160 }
161 }
162 prev = e;
163 end += 1;
164 }
165 ends.push(end);
166
167 match prev {
168 None => {
169 validity.append(false);
170 values.push(T::default());
171 }
172 Some(p) => {
173 validity.append(true);
174 values.push(p);
175 }
176 }
177
178 (
179 ends.freeze(),
180 PrimitiveArray::new(values, Validity::from(validity.freeze())),
181 )
182}
183
184pub fn runend_decode_primitive(
185 ends: PrimitiveArray,
186 values: PrimitiveArray,
187 offset: usize,
188 length: usize,
189 ctx: &mut ExecutionCtx,
190) -> VortexResult<PrimitiveArray> {
191 let validity_mask = values
192 .as_ref()
193 .validity()?
194 .execute_mask(values.as_ref().len(), ctx)?;
195 Ok(match_each_native_ptype!(values.ptype(), |P| {
196 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
197 runend_decode_typed_primitive(
198 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
199 values.as_slice::<P>(),
200 validity_mask,
201 values.dtype().nullability(),
202 length,
203 )
204 })
205 }))
206}
207
208fn runend_decode_slice<T: Copy + Default>(
212 run_ends: impl Iterator<Item = usize>,
213 values: &[T],
214 values_validity: Mask,
215 values_nullability: Nullability,
216 length: usize,
217) -> (Buffer<T>, Validity) {
218 match values_validity {
219 Mask::AllTrue(_) => {
220 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
221 for (end, value) in run_ends.zip_eq(values) {
222 assert!(
223 end >= decoded.len(),
224 "Runend ends must be monotonic, got {end} after {}",
225 decoded.len()
226 );
227 assert!(end <= length, "Runend end must be less than overall length");
228 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
231 }
232 (decoded.into(), values_nullability.into())
233 }
234 Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
235 Mask::Values(mask) => {
236 let mut decoded = BufferMut::with_capacity(length);
237 let mut decoded_validity = BitBufferMut::with_capacity(length);
238 for (end, value) in run_ends.zip_eq(
239 values
240 .iter()
241 .zip(mask.bit_buffer().iter())
242 .map(|(&v, is_valid)| is_valid.then_some(v)),
243 ) {
244 assert!(
245 end >= decoded.len(),
246 "Runend ends must be monotonic, got {end} after {}",
247 decoded.len()
248 );
249 assert!(end <= length, "Runend end must be less than overall length");
250 match value {
251 None => {
252 decoded_validity.append_n(false, end - decoded.len());
253 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
256 }
257 Some(value) => {
258 decoded_validity.append_n(true, end - decoded.len());
259 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
262 }
263 }
264 }
265 (decoded.into(), Validity::from(decoded_validity.freeze()))
266 }
267 }
268}
269
270pub fn runend_decode_typed_primitive<T: NativePType>(
271 run_ends: impl Iterator<Item = usize>,
272 values: &[T],
273 values_validity: Mask,
274 values_nullability: Nullability,
275 length: usize,
276) -> PrimitiveArray {
277 let (decoded, validity) = runend_decode_slice(
278 run_ends,
279 values,
280 values_validity,
281 values_nullability,
282 length,
283 );
284 PrimitiveArray::new(decoded, validity)
285}
286
287pub fn runend_decode_varbinview(
289 ends: PrimitiveArray,
290 values: VarBinViewArray,
291 offset: usize,
292 length: usize,
293 ctx: &mut ExecutionCtx,
294) -> VortexResult<VarBinViewArray> {
295 let validity_mask = values
296 .as_ref()
297 .validity()?
298 .execute_mask(values.as_ref().len(), ctx)?;
299 let views = values.views();
300
301 let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
302 runend_decode_slice(
303 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
304 views,
305 validity_mask,
306 values.dtype().nullability(),
307 length,
308 )
309 });
310
311 let parts = values.into_data_parts();
312 let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
313
314 Ok(unsafe {
317 VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
318 })
319}
320
321#[cfg(test)]
322mod tests {
323 use vortex_array::LEGACY_SESSION;
324 use vortex_array::VortexSessionExecute;
325 use vortex_array::arrays::PrimitiveArray;
326 use vortex_array::assert_arrays_eq;
327 use vortex_array::validity::Validity;
328 use vortex_buffer::BitBuffer;
329 use vortex_buffer::buffer;
330 use vortex_error::VortexResult;
331
332 use crate::compress::runend_decode_primitive;
333 use crate::compress::runend_encode;
334
335 #[test]
336 fn encode() -> VortexResult<()> {
337 let mut ctx = LEGACY_SESSION.create_execution_ctx();
338 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
339 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
340 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
341
342 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
343 assert_arrays_eq!(ends, expected_ends);
344 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
345 assert_arrays_eq!(values, expected_values);
346 Ok(())
347 }
348
349 #[test]
350 fn encode_nullable() -> VortexResult<()> {
351 let mut ctx = LEGACY_SESSION.create_execution_ctx();
352 let arr = PrimitiveArray::new(
353 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
354 Validity::from(BitBuffer::from(vec![
355 true, true, false, false, true, true, true, true, false, false,
356 ])),
357 );
358 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
359 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
360
361 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
362 assert_arrays_eq!(ends, expected_ends);
363 let expected_values =
364 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
365 assert_arrays_eq!(values, expected_values);
366 Ok(())
367 }
368
369 #[test]
370 fn encode_all_null() -> VortexResult<()> {
371 let mut ctx = LEGACY_SESSION.create_execution_ctx();
372 let arr = PrimitiveArray::new(
373 buffer![0, 0, 0, 0, 0],
374 Validity::from(BitBuffer::new_unset(5)),
375 );
376 let (ends, values) = runend_encode(arr.as_view(), &mut ctx);
377 let values = values.execute::<PrimitiveArray>(&mut ctx)?;
378
379 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
380 assert_arrays_eq!(ends, expected_ends);
381 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
382 assert_arrays_eq!(values, expected_values);
383 Ok(())
384 }
385
386 #[test]
387 fn decode() -> VortexResult<()> {
388 let mut ctx = LEGACY_SESSION.create_execution_ctx();
389 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
390 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
391 let decoded = runend_decode_primitive(ends, values, 0, 10, &mut ctx)?;
392
393 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
394 assert_arrays_eq!(decoded, expected);
395 Ok(())
396 }
397}