vortex_fastlanes/rle/array/
mod.rs1use std::fmt::Display;
5use std::fmt::Formatter;
6
7use vortex_array::ArrayRef;
8use vortex_array::ExecutionCtx;
9use vortex_array::TypedArrayRef;
10use vortex_error::VortexExpect as _;
11use vortex_error::VortexResult;
12use vortex_error::vortex_ensure;
13
14pub mod rle_compress;
15pub mod rle_decompress;
16
17pub(super) const VALUES_SLOT: usize = 0;
19pub(super) const INDICES_SLOT: usize = 1;
21pub(super) const VALUES_IDX_OFFSETS_SLOT: usize = 2;
31pub(super) const NUM_SLOTS: usize = 3;
32pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["values", "indices", "values_idx_offsets"];
33
34#[derive(Clone, Debug)]
35pub struct RLEData {
36 pub(super) offset: usize,
38}
39
40impl Display for RLEData {
41 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
42 write!(f, "offset: {}", self.offset)
43 }
44}
45
46impl RLEData {
47 pub fn try_new(offset: usize) -> VortexResult<Self> {
57 vortex_ensure!(
58 offset < 1024,
59 "Offset must be smaller than 1024, got {}",
60 offset
61 );
62 Ok(Self { offset })
63 }
64
65 pub unsafe fn new_unchecked(offset: usize) -> Self {
73 Self { offset }
74 }
75
76 #[inline]
77 pub fn offset(&self) -> usize {
78 self.offset
79 }
80}
81
82pub trait RLEArrayExt: TypedArrayRef<crate::RLE> {
83 #[inline]
84 fn values(&self) -> &ArrayRef {
85 self.as_ref().slots()[VALUES_SLOT]
86 .as_ref()
87 .vortex_expect("RLEArray values slot must be populated")
88 }
89
90 #[inline]
91 fn indices(&self) -> &ArrayRef {
92 self.as_ref().slots()[INDICES_SLOT]
93 .as_ref()
94 .vortex_expect("RLEArray indices slot must be populated")
95 }
96
97 #[inline]
98 fn values_idx_offsets(&self) -> &ArrayRef {
99 self.as_ref().slots()[VALUES_IDX_OFFSETS_SLOT]
100 .as_ref()
101 .vortex_expect("RLEArray values_idx_offsets slot must be populated")
102 }
103
104 #[expect(
110 clippy::expect_used,
111 reason = "expect is safe here as scalar_at returns a valid primitive"
112 )]
113 fn values_idx_offset(&self, chunk_idx: usize, ctx: &mut ExecutionCtx) -> usize {
114 self.values_idx_offsets()
115 .execute_scalar(chunk_idx, ctx)
116 .expect("index must be in bounds")
117 .as_primitive()
118 .as_::<usize>()
119 .expect("index must be of type usize")
120 - self
121 .values_idx_offsets()
122 .execute_scalar(0, ctx)
123 .expect("index must be in bounds")
124 .as_primitive()
125 .as_::<usize>()
126 .expect("index must be of type usize")
127 }
128
129 #[inline]
131 fn offset(&self) -> usize {
132 self.offset
133 }
134}
135
136impl<T: TypedArrayRef<crate::RLE>> RLEArrayExt for T {}
137
138#[cfg(test)]
139mod tests {
140 use vortex_array::ArrayContext;
141 use vortex_array::Canonical;
142 use vortex_array::IntoArray;
143 use vortex_array::LEGACY_SESSION;
144 use vortex_array::VortexSessionExecute;
145 use vortex_array::arrays::PrimitiveArray;
146 use vortex_array::arrays::primitive::PrimitiveArrayExt;
147 use vortex_array::assert_arrays_eq;
148 use vortex_array::dtype::DType;
149 use vortex_array::dtype::Nullability;
150 use vortex_array::dtype::PType;
151 use vortex_array::serde::SerializeOptions;
152 use vortex_array::serde::SerializedArray;
153 use vortex_array::validity::Validity;
154 use vortex_buffer::Buffer;
155 use vortex_buffer::ByteBufferMut;
156 use vortex_error::VortexExpect;
157 use vortex_error::VortexResult;
158 use vortex_session::registry::ReadContext;
159
160 use crate::FL_CHUNK_SIZE;
161 use crate::RLE;
162 use crate::RLEData;
163 use crate::rle::array::RLEArrayExt;
164 use crate::test::SESSION;
165
166 #[test]
167 fn test_try_new() {
168 let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
169
170 let indices =
172 PrimitiveArray::from_iter([0u16, 0, 1, 1, 2].iter().cycle().take(1024).copied())
173 .into_array();
174 let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
175 let rle_array = RLE::try_new(values, indices, values_idx_offsets, 0, 5)
176 .vortex_expect("RLEData is always valid");
177
178 assert_eq!(rle_array.len(), 5);
179 assert_eq!(rle_array.values().len(), 3);
180 assert_eq!(rle_array.values().dtype().as_ptype(), PType::U32);
181 }
182
183 #[test]
184 fn test_try_new_with_validity() {
185 let values = PrimitiveArray::from_iter([10u32, 20]).into_array();
186 let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
187
188 let indices_pattern = [0u16, 1, 0];
189 let validity_pattern = [true, false, true];
190
191 let indices_with_validity = PrimitiveArray::new(
193 indices_pattern
194 .iter()
195 .cycle()
196 .take(1024)
197 .copied()
198 .collect::<Buffer<u16>>(),
199 Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
200 )
201 .into_array();
202
203 let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 3)
204 .vortex_expect("RLEData is always valid");
205
206 assert_eq!(rle_array.len(), 3);
207 assert_eq!(rle_array.values().len(), 2);
208 let mut ctx = SESSION.create_execution_ctx();
209 assert!(rle_array.is_valid(0, &mut ctx).unwrap());
210 assert!(!rle_array.is_valid(1, &mut ctx).unwrap());
211 assert!(rle_array.is_valid(2, &mut ctx).unwrap());
212 }
213
214 #[test]
215 fn test_all_valid() {
216 let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
217 let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
218
219 let indices_pattern = [0u16, 1, 2, 0, 1];
220 let validity_pattern = [true, true, true, false, false];
221
222 let indices_with_validity = PrimitiveArray::new(
224 indices_pattern
225 .iter()
226 .cycle()
227 .take(1024)
228 .copied()
229 .collect::<Buffer<u16>>(),
230 Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
231 )
232 .into_array();
233
234 let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 5)
235 .vortex_expect("RLEData is always valid");
236
237 let mut ctx = SESSION.create_execution_ctx();
238 let valid_slice = rle_array
239 .slice(0..3)
240 .unwrap()
241 .execute::<PrimitiveArray>(&mut ctx)
242 .unwrap();
243 assert!(valid_slice.all_valid(&mut ctx).unwrap());
245
246 let mixed_slice = rle_array.slice(1..5).unwrap();
247 assert!(!mixed_slice.all_valid(&mut ctx).unwrap());
248 }
249
250 #[test]
251 fn test_all_invalid() {
252 let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
253 let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
254
255 let indices_pattern = [0u16, 1, 2, 0, 1];
257 let validity_pattern = [true, true, false, false, false];
258
259 let indices_with_validity = PrimitiveArray::new(
260 indices_pattern
261 .iter()
262 .cycle()
263 .take(1024)
264 .copied()
265 .collect::<Buffer<u16>>(),
266 Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
267 )
268 .into_array();
269
270 let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 5)
271 .vortex_expect("RLEData is always valid");
272
273 let invalid_slice = rle_array
275 .slice(2..5)
276 .unwrap()
277 .execute::<Canonical>(&mut LEGACY_SESSION.create_execution_ctx())
278 .unwrap()
279 .into_primitive();
280 let mut ctx = SESSION.create_execution_ctx();
281 assert!(invalid_slice.all_invalid(&mut ctx).unwrap());
282
283 let mixed_slice = rle_array.slice(1..4).unwrap();
284 assert!(!mixed_slice.all_invalid(&mut ctx).unwrap());
285 }
286
287 #[test]
288 fn test_validity_mask() {
289 let values = PrimitiveArray::from_iter([10u32, 20, 30]).into_array();
290 let values_idx_offsets = PrimitiveArray::from_iter([0u64]).into_array();
291
292 let indices_pattern = [0u16, 1, 2, 0];
294 let validity_pattern = [true, false, true, false];
295
296 let indices_with_validity = PrimitiveArray::new(
297 indices_pattern
298 .iter()
299 .cycle()
300 .take(1024)
301 .copied()
302 .collect::<Buffer<u16>>(),
303 Validity::from_iter(validity_pattern.iter().cycle().take(1024).copied()),
304 )
305 .into_array();
306
307 let rle_array = RLE::try_new(values, indices_with_validity, values_idx_offsets, 0, 4)
308 .vortex_expect("RLEData is always valid");
309
310 let sliced_array = rle_array.slice(1..4).unwrap();
311 let validity_mask = sliced_array
312 .validity()
313 .unwrap()
314 .execute_mask(
315 sliced_array.len(),
316 &mut LEGACY_SESSION.create_execution_ctx(),
317 )
318 .unwrap();
319
320 let mut ctx = LEGACY_SESSION.create_execution_ctx();
321 let expected_mask = Validity::from_iter([false, true, false])
322 .execute_mask(3, &mut ctx)
323 .unwrap();
324 assert_eq!(validity_mask.len(), expected_mask.len());
325 assert_eq!(validity_mask, expected_mask);
326 assert_eq!(validity_mask.len(), expected_mask.len());
327 assert_eq!(validity_mask, expected_mask);
328 }
329
330 #[test]
331 fn test_try_new_empty() {
332 let values = PrimitiveArray::from_iter(Vec::<u32>::new()).into_array();
333 let indices = PrimitiveArray::from_iter(Vec::<u16>::new()).into_array();
334 let values_idx_offsets = PrimitiveArray::from_iter(Vec::<u64>::new()).into_array();
335 let rle_array = RLE::try_new(
336 values,
337 indices.clone(),
338 values_idx_offsets,
339 0,
340 indices.len(),
341 )
342 .vortex_expect("RLEData is always valid");
343
344 assert_eq!(rle_array.len(), 0);
345 assert_eq!(rle_array.values().len(), 0);
346 }
347
348 #[test]
349 fn test_multi_chunk_two_chunks() {
350 let mut ctx = LEGACY_SESSION.create_execution_ctx();
351 let values = PrimitiveArray::from_iter([10u32, 20, 30, 40]).into_array();
352 let indices = PrimitiveArray::from_iter([0u16, 1].repeat(1024)).into_array();
353 let values_idx_offsets = PrimitiveArray::from_iter([0u64, 2]).into_array();
354 let rle_array = RLE::try_new(values, indices, values_idx_offsets, 0, 2048)
355 .vortex_expect("RLEData is always valid");
356
357 assert_eq!(rle_array.len(), 2048);
358 assert_eq!(rle_array.values().len(), 4);
359
360 assert_eq!(rle_array.values_idx_offset(0, &mut ctx), 0);
361 assert_eq!(rle_array.values_idx_offset(1, &mut ctx), 2);
362 }
363
364 #[test]
365 fn test_rle_serialization() -> VortexResult<()> {
366 let mut exec_ctx = SESSION.create_execution_ctx();
367 let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
368 let rle_array = RLEData::encode(primitive.as_view(), &mut exec_ctx)?;
369 assert_eq!(rle_array.len(), 2048);
370
371 let original_data = rle_array
372 .as_array()
373 .clone()
374 .execute::<PrimitiveArray>(&mut exec_ctx)?;
375
376 let ctx = ArrayContext::empty();
377 let serialized =
378 rle_array
379 .into_array()
380 .serialize(&ctx, &SESSION, &SerializeOptions::default())?;
381
382 let mut concat = ByteBufferMut::empty();
383 for buf in serialized {
384 concat.extend_from_slice(buf.as_ref());
385 }
386 let concat = concat.freeze();
387
388 let parts = SerializedArray::try_from(concat)?;
389 let decoded = parts.decode(
390 &DType::Primitive(PType::U32, Nullability::NonNullable),
391 2048,
392 &ReadContext::new(ctx.to_ids()),
393 &SESSION,
394 )?;
395
396 let decoded_data = decoded.execute::<PrimitiveArray>(&mut exec_ctx)?;
397
398 assert_arrays_eq!(original_data, decoded_data);
399 Ok(())
400 }
401
402 #[test]
403 fn test_rle_serialization_slice() -> VortexResult<()> {
404 let mut exec_ctx = SESSION.create_execution_ctx();
405 let primitive = PrimitiveArray::from_iter((0..2048).map(|i| (i / 100) as u32));
406 let rle_array = RLEData::encode(primitive.as_view(), &mut exec_ctx)?;
407
408 let sliced = RLE::try_new(
409 rle_array.values().clone(),
410 rle_array.indices().clone(),
411 rle_array.values_idx_offsets().clone(),
412 100,
413 100,
414 )
415 .vortex_expect("RLEData is always valid");
416 assert_eq!(sliced.len(), 100);
417
418 let ctx = ArrayContext::empty();
419 let serialized =
420 sliced
421 .clone()
422 .into_array()
423 .serialize(&ctx, &SESSION, &SerializeOptions::default())?;
424
425 let mut concat = ByteBufferMut::empty();
426 for buf in serialized {
427 concat.extend_from_slice(buf.as_ref());
428 }
429 let concat = concat.freeze();
430
431 let parts = SerializedArray::try_from(concat)?;
432 let decoded = parts.decode(
433 sliced.dtype(),
434 sliced.len(),
435 &ReadContext::new(ctx.to_ids()),
436 &SESSION,
437 )?;
438
439 let original_data = sliced
440 .as_array()
441 .clone()
442 .execute::<PrimitiveArray>(&mut exec_ctx)?;
443 let decoded_data = decoded.execute::<PrimitiveArray>(&mut exec_ctx)?;
444
445 assert_arrays_eq!(original_data, decoded_data);
446 Ok(())
447 }
448
449 #[test]
458 fn test_recompress_indices_no_cross_chunk_leak() -> VortexResult<()> {
459 let mut ctx = SESSION.create_execution_ctx();
460 let len = FL_CHUNK_SIZE + 100;
461 let mut values: Vec<Option<i16>> = vec![None; len];
462 values[0] = Some(10);
464 values[500] = Some(20);
465 let original = PrimitiveArray::from_option_iter(values);
468 let rle = RLEData::encode(original.as_view(), &mut ctx)?;
469
470 let indices_prim = rle
473 .indices()
474 .clone()
475 .execute::<PrimitiveArray>(&mut ctx)?
476 .narrow(&mut ctx)?;
477 let re_encoded = RLEData::encode(indices_prim.as_view(), &mut ctx)?;
478
479 let reconstructed = unsafe {
482 RLE::new_unchecked(
483 rle.values().clone(),
484 re_encoded.into_array(),
485 rle.values_idx_offsets().clone(),
486 rle.offset(),
487 rle.len(),
488 )
489 };
490
491 let decoded = reconstructed
493 .as_array()
494 .clone()
495 .execute::<PrimitiveArray>(&mut ctx)?;
496 assert_arrays_eq!(decoded, original);
497 Ok(())
498 }
499}