1use itertools::Itertools;
5use vortex_array::ArrayRef;
6use vortex_array::ArrayView;
7use vortex_array::IntoArray;
8use vortex_array::LEGACY_SESSION;
9use vortex_array::ToCanonical;
10use vortex_array::VortexSessionExecute;
11use vortex_array::arrays::BoolArray;
12use vortex_array::arrays::ConstantArray;
13use vortex_array::arrays::Primitive;
14use vortex_array::arrays::PrimitiveArray;
15use vortex_array::arrays::VarBinViewArray;
16use vortex_array::arrays::bool::BoolArrayExt;
17use vortex_array::arrays::primitive::PrimitiveArrayExt;
18use vortex_array::buffer::BufferHandle;
19use vortex_array::dtype::NativePType;
20use vortex_array::dtype::Nullability;
21use vortex_array::expr::stats::Precision;
22use vortex_array::expr::stats::Stat;
23use vortex_array::match_each_native_ptype;
24use vortex_array::match_each_unsigned_integer_ptype;
25use vortex_array::scalar::Scalar;
26use vortex_array::validity::Validity;
27use vortex_buffer::BitBuffer;
28use vortex_buffer::BitBufferMut;
29use vortex_buffer::Buffer;
30use vortex_buffer::BufferMut;
31use vortex_buffer::buffer;
32use vortex_error::VortexExpect;
33use vortex_error::VortexResult;
34use vortex_mask::Mask;
35
36use crate::iter::trimmed_ends_iter;
37
38pub fn runend_encode(array: ArrayView<Primitive>) -> (PrimitiveArray, ArrayRef) {
40 let validity = match array
41 .validity()
42 .vortex_expect("run-end validity should be derivable")
43 {
44 Validity::NonNullable => None,
45 Validity::AllValid => None,
46 Validity::AllInvalid => {
47 let ends = PrimitiveArray::new(buffer![array.len() as u64], Validity::NonNullable);
49 ends.statistics()
50 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
51 return (
52 ends,
53 ConstantArray::new(Scalar::null(array.dtype().clone()), 1).into_array(),
54 );
55 }
56 Validity::Array(a) => Some(a.to_bool().to_bit_buffer()),
57 };
58
59 let (ends, values) = match validity {
60 None => {
61 match_each_native_ptype!(array.ptype(), |P| {
62 let (ends, values) = runend_encode_primitive(array.as_slice::<P>());
63 (
64 PrimitiveArray::new(ends, Validity::NonNullable),
65 PrimitiveArray::new(values, array.dtype().nullability().into()).into_array(),
66 )
67 })
68 }
69 Some(validity) => {
70 match_each_native_ptype!(array.ptype(), |P| {
71 let (ends, values) =
72 runend_encode_nullable_primitive(array.as_slice::<P>(), validity);
73 (
74 PrimitiveArray::new(ends, Validity::NonNullable),
75 values.into_array(),
76 )
77 })
78 }
79 };
80
81 let ends = ends.narrow().vortex_expect("Ends must succeed downcasting");
82
83 ends.statistics()
84 .set(Stat::IsStrictSorted, Precision::Exact(true.into()));
85
86 (ends, values)
87}
88
89fn runend_encode_primitive<T: NativePType>(elements: &[T]) -> (Buffer<u64>, Buffer<T>) {
90 let mut ends = BufferMut::empty();
91 let mut values = BufferMut::empty();
92
93 if elements.is_empty() {
94 return (ends.freeze(), values.freeze());
95 }
96
97 let mut prev = elements[0];
99 let mut end = 1;
100 for &e in elements.iter().skip(1) {
101 if e != prev {
102 ends.push(end);
103 values.push(prev);
104 }
105 prev = e;
106 end += 1;
107 }
108 ends.push(end);
109 values.push(prev);
110
111 (ends.freeze(), values.freeze())
112}
113
114fn runend_encode_nullable_primitive<T: NativePType>(
115 elements: &[T],
116 element_validity: BitBuffer,
117) -> (Buffer<u64>, PrimitiveArray) {
118 let mut ends = BufferMut::empty();
119 let mut values = BufferMut::empty();
120 let mut validity = BitBufferMut::with_capacity(values.capacity());
121
122 if elements.is_empty() {
123 return (
124 ends.freeze(),
125 PrimitiveArray::new(
126 values,
127 Validity::Array(BoolArray::from(validity.freeze()).into_array()),
128 ),
129 );
130 }
131
132 let mut prev = element_validity.value(0).then(|| elements[0]);
134 let mut end = 1;
135 for e in elements
136 .iter()
137 .zip(element_validity.iter())
138 .map(|(&e, is_valid)| is_valid.then_some(e))
139 .skip(1)
140 {
141 if e != prev {
142 ends.push(end);
143 match prev {
144 None => {
145 validity.append(false);
146 values.push(T::default());
147 }
148 Some(p) => {
149 validity.append(true);
150 values.push(p);
151 }
152 }
153 }
154 prev = e;
155 end += 1;
156 }
157 ends.push(end);
158
159 match prev {
160 None => {
161 validity.append(false);
162 values.push(T::default());
163 }
164 Some(p) => {
165 validity.append(true);
166 values.push(p);
167 }
168 }
169
170 (
171 ends.freeze(),
172 PrimitiveArray::new(values, Validity::from(validity.freeze())),
173 )
174}
175
176pub fn runend_decode_primitive(
177 ends: PrimitiveArray,
178 values: PrimitiveArray,
179 offset: usize,
180 length: usize,
181) -> VortexResult<PrimitiveArray> {
182 let validity_mask = values.as_ref().validity()?.to_mask(
183 values.as_ref().len(),
184 &mut LEGACY_SESSION.create_execution_ctx(),
185 )?;
186 Ok(match_each_native_ptype!(values.ptype(), |P| {
187 match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
188 runend_decode_typed_primitive(
189 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
190 values.as_slice::<P>(),
191 validity_mask,
192 values.dtype().nullability(),
193 length,
194 )
195 })
196 }))
197}
198
199fn runend_decode_slice<T: Copy + Default>(
203 run_ends: impl Iterator<Item = usize>,
204 values: &[T],
205 values_validity: Mask,
206 values_nullability: Nullability,
207 length: usize,
208) -> (Buffer<T>, Validity) {
209 match values_validity {
210 Mask::AllTrue(_) => {
211 let mut decoded: BufferMut<T> = BufferMut::with_capacity(length);
212 for (end, value) in run_ends.zip_eq(values) {
213 assert!(
214 end >= decoded.len(),
215 "Runend ends must be monotonic, got {end} after {}",
216 decoded.len()
217 );
218 assert!(end <= length, "Runend end must be less than overall length");
219 unsafe { decoded.push_n_unchecked(*value, end - decoded.len()) };
222 }
223 (decoded.into(), values_nullability.into())
224 }
225 Mask::AllFalse(_) => (Buffer::<T>::zeroed(length), Validity::AllInvalid),
226 Mask::Values(mask) => {
227 let mut decoded = BufferMut::with_capacity(length);
228 let mut decoded_validity = BitBufferMut::with_capacity(length);
229 for (end, value) in run_ends.zip_eq(
230 values
231 .iter()
232 .zip(mask.bit_buffer().iter())
233 .map(|(&v, is_valid)| is_valid.then_some(v)),
234 ) {
235 assert!(
236 end >= decoded.len(),
237 "Runend ends must be monotonic, got {end} after {}",
238 decoded.len()
239 );
240 assert!(end <= length, "Runend end must be less than overall length");
241 match value {
242 None => {
243 decoded_validity.append_n(false, end - decoded.len());
244 unsafe { decoded.push_n_unchecked(T::default(), end - decoded.len()) };
247 }
248 Some(value) => {
249 decoded_validity.append_n(true, end - decoded.len());
250 unsafe { decoded.push_n_unchecked(value, end - decoded.len()) };
253 }
254 }
255 }
256 (decoded.into(), Validity::from(decoded_validity.freeze()))
257 }
258 }
259}
260
261pub fn runend_decode_typed_primitive<T: NativePType>(
262 run_ends: impl Iterator<Item = usize>,
263 values: &[T],
264 values_validity: Mask,
265 values_nullability: Nullability,
266 length: usize,
267) -> PrimitiveArray {
268 let (decoded, validity) = runend_decode_slice(
269 run_ends,
270 values,
271 values_validity,
272 values_nullability,
273 length,
274 );
275 PrimitiveArray::new(decoded, validity)
276}
277
278pub fn runend_decode_varbinview(
280 ends: PrimitiveArray,
281 values: VarBinViewArray,
282 offset: usize,
283 length: usize,
284) -> VortexResult<VarBinViewArray> {
285 let validity_mask = values.as_ref().validity()?.to_mask(
286 values.as_ref().len(),
287 &mut LEGACY_SESSION.create_execution_ctx(),
288 )?;
289 let views = values.views();
290
291 let (decoded_views, validity) = match_each_unsigned_integer_ptype!(ends.ptype(), |E| {
292 runend_decode_slice(
293 trimmed_ends_iter(ends.as_slice::<E>(), offset, length),
294 views,
295 validity_mask,
296 values.dtype().nullability(),
297 length,
298 )
299 });
300
301 let parts = values.into_data_parts();
302 let view_handle = BufferHandle::new_host(decoded_views.into_byte_buffer());
303
304 Ok(unsafe {
307 VarBinViewArray::new_handle_unchecked(view_handle, parts.buffers, parts.dtype, validity)
308 })
309}
310
311#[cfg(test)]
312mod test {
313 use vortex_array::ToCanonical;
314 use vortex_array::arrays::PrimitiveArray;
315 use vortex_array::assert_arrays_eq;
316 use vortex_array::validity::Validity;
317 use vortex_buffer::BitBuffer;
318 use vortex_buffer::buffer;
319 use vortex_error::VortexResult;
320
321 use crate::compress::runend_decode_primitive;
322 use crate::compress::runend_encode;
323
324 #[test]
325 fn encode() {
326 let arr = PrimitiveArray::from_iter([1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
327 let (ends, values) = runend_encode(arr.as_view());
328 let values = values.to_primitive();
329
330 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 5, 10]);
331 assert_arrays_eq!(ends, expected_ends);
332 let expected_values = PrimitiveArray::from_iter(vec![1i32, 2, 3]);
333 assert_arrays_eq!(values, expected_values);
334 }
335
336 #[test]
337 fn encode_nullable() {
338 let arr = PrimitiveArray::new(
339 buffer![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3],
340 Validity::from(BitBuffer::from(vec![
341 true, true, false, false, true, true, true, true, false, false,
342 ])),
343 );
344 let (ends, values) = runend_encode(arr.as_view());
345 let values = values.to_primitive();
346
347 let expected_ends = PrimitiveArray::from_iter(vec![2u8, 4, 5, 8, 10]);
348 assert_arrays_eq!(ends, expected_ends);
349 let expected_values =
350 PrimitiveArray::from_option_iter(vec![Some(1i32), None, Some(2), Some(3), None]);
351 assert_arrays_eq!(values, expected_values);
352 }
353
354 #[test]
355 fn encode_all_null() {
356 let arr = PrimitiveArray::new(
357 buffer![0, 0, 0, 0, 0],
358 Validity::from(BitBuffer::new_unset(5)),
359 );
360 let (ends, values) = runend_encode(arr.as_view());
361 let values = values.to_primitive();
362
363 let expected_ends = PrimitiveArray::from_iter(vec![5u64]);
364 assert_arrays_eq!(ends, expected_ends);
365 let expected_values = PrimitiveArray::from_option_iter(vec![Option::<i32>::None]);
366 assert_arrays_eq!(values, expected_values);
367 }
368
369 #[test]
370 fn decode() -> VortexResult<()> {
371 let ends = PrimitiveArray::from_iter([2u32, 5, 10]);
372 let values = PrimitiveArray::from_iter([1i32, 2, 3]);
373 let decoded = runend_decode_primitive(ends, values, 0, 10)?;
374
375 let expected = PrimitiveArray::from_iter(vec![1i32, 1, 2, 2, 2, 3, 3, 3, 3, 3]);
376 assert_arrays_eq!(decoded, expected);
377 Ok(())
378 }
379}