1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use itertools::Itertools;
11use prost::Message;
12use vortex_array::Array;
13use vortex_array::ArrayEq;
14use vortex_array::ArrayHash;
15use vortex_array::ArrayId;
16use vortex_array::ArrayParts;
17use vortex_array::ArrayRef;
18use vortex_array::ArrayView;
19use vortex_array::Canonical;
20use vortex_array::ExecutionCtx;
21use vortex_array::ExecutionResult;
22use vortex_array::IntoArray;
23use vortex_array::LEGACY_SESSION;
24use vortex_array::Precision;
25use vortex_array::TypedArrayRef;
26use vortex_array::VortexSessionExecute;
27use vortex_array::arrays::Primitive;
28use vortex_array::arrays::PrimitiveArray;
29use vortex_array::buffer::BufferHandle;
30use vortex_array::dtype::DType;
31use vortex_array::dtype::Nullability;
32use vortex_array::dtype::PType;
33use vortex_array::patches::Patches;
34use vortex_array::patches::PatchesMetadata;
35use vortex_array::require_child;
36use vortex_array::require_patches;
37use vortex_array::serde::ArrayChildren;
38use vortex_array::validity::Validity;
39use vortex_array::vtable::VTable;
40use vortex_array::vtable::ValidityChild;
41use vortex_array::vtable::ValidityVTableFromChild;
42use vortex_buffer::Buffer;
43use vortex_error::VortexExpect;
44use vortex_error::VortexResult;
45use vortex_error::vortex_bail;
46use vortex_error::vortex_ensure;
47use vortex_error::vortex_err;
48use vortex_error::vortex_panic;
49use vortex_session::VortexSession;
50use vortex_session::registry::CachedId;
51
52use crate::alp_rd::kernel::PARENT_KERNELS;
53use crate::alp_rd::rules::RULES;
54use crate::alp_rd_decode;
55
56pub type ALPRDArray = Array<ALPRD>;
58
59#[derive(Clone, prost::Message)]
60pub struct ALPRDMetadata {
61 #[prost(uint32, tag = "1")]
62 right_bit_width: u32,
63 #[prost(uint32, tag = "2")]
64 dict_len: u32,
65 #[prost(uint32, repeated, tag = "3")]
66 dict: Vec<u32>,
67 #[prost(enumeration = "PType", tag = "4")]
68 left_parts_ptype: i32,
69 #[prost(message, tag = "5")]
70 patches: Option<PatchesMetadata>,
71}
72
73impl ArrayHash for ALPRDData {
74 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
75 self.left_parts_dictionary.array_hash(state, precision);
76 self.right_bit_width.hash(state);
77 self.patch_offset.hash(state);
78 self.patch_offset_within_chunk.hash(state);
79 }
80}
81
82impl ArrayEq for ALPRDData {
83 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
84 self.left_parts_dictionary
85 .array_eq(&other.left_parts_dictionary, precision)
86 && self.right_bit_width == other.right_bit_width
87 && self.patch_offset == other.patch_offset
88 && self.patch_offset_within_chunk == other.patch_offset_within_chunk
89 }
90}
91
92impl VTable for ALPRD {
93 type ArrayData = ALPRDData;
94
95 type OperationsVTable = Self;
96 type ValidityVTable = ValidityVTableFromChild;
97
98 fn id(&self) -> ArrayId {
99 static ID: CachedId = CachedId::new("vortex.alprd");
100 *ID
101 }
102
103 fn validate(
104 &self,
105 data: &ALPRDData,
106 dtype: &DType,
107 len: usize,
108 slots: &[Option<ArrayRef>],
109 ) -> VortexResult<()> {
110 validate_parts(
111 dtype,
112 len,
113 left_parts_from_slots(slots),
114 right_parts_from_slots(slots),
115 patches_from_slots(
116 slots,
117 data.patch_offset,
118 data.patch_offset_within_chunk,
119 len,
120 )
121 .as_ref(),
122 )
123 }
124
125 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
126 0
127 }
128
129 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
130 vortex_panic!("ALPRDArray buffer index {idx} out of bounds")
131 }
132
133 fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> {
134 None
135 }
136
137 fn serialize(
138 array: ArrayView<'_, Self>,
139 _session: &VortexSession,
140 ) -> VortexResult<Option<Vec<u8>>> {
141 let dict = array
142 .left_parts_dictionary()
143 .iter()
144 .map(|&i| i as u32)
145 .collect::<Vec<_>>();
146
147 Ok(Some(
148 ALPRDMetadata {
149 right_bit_width: array.right_bit_width() as u32,
150 dict_len: array.left_parts_dictionary().len() as u32,
151 dict,
152 left_parts_ptype: array.left_parts().dtype().as_ptype() as i32,
153 patches: array
154 .left_parts_patches()
155 .map(|p| p.to_metadata(array.len(), p.dtype()))
156 .transpose()?,
157 }
158 .encode_to_vec(),
159 ))
160 }
161
162 fn deserialize(
163 &self,
164 dtype: &DType,
165 len: usize,
166 metadata: &[u8],
167 _buffers: &[BufferHandle],
168 children: &dyn ArrayChildren,
169 _session: &VortexSession,
170 ) -> VortexResult<ArrayParts<Self>> {
171 let metadata = ALPRDMetadata::decode(metadata)?;
172 if children.len() < 2 {
173 vortex_bail!(
174 "Expected at least 2 children for ALPRD encoding, found {}",
175 children.len()
176 );
177 }
178
179 let left_parts_dtype = DType::Primitive(metadata.left_parts_ptype(), dtype.nullability());
180 let left_parts = children.get(0, &left_parts_dtype, len)?;
181 let left_parts_dictionary: Buffer<u16> = metadata.dict.as_slice()
182 [0..metadata.dict_len as usize]
183 .iter()
184 .map(|&i| {
185 u16::try_from(i)
186 .map_err(|_| vortex_err!("left_parts_dictionary code {i} does not fit in u16"))
187 })
188 .try_collect()?;
189
190 let right_parts_dtype = match &dtype {
191 DType::Primitive(PType::F32, _) => {
192 DType::Primitive(PType::U32, Nullability::NonNullable)
193 }
194 DType::Primitive(PType::F64, _) => {
195 DType::Primitive(PType::U64, Nullability::NonNullable)
196 }
197 _ => vortex_bail!("Expected f32 or f64 dtype, got {:?}", dtype),
198 };
199 let right_parts = children.get(1, &right_parts_dtype, len)?;
200
201 let left_parts_patches = metadata
202 .patches
203 .map(|p| {
204 let indices = children.get(2, &p.indices_dtype()?, p.len()?)?;
205 let values = children.get(3, &left_parts_dtype.as_nonnullable(), p.len()?)?;
206
207 Patches::new(
208 len,
209 p.offset()?,
210 indices,
211 values,
212 None,
214 )
215 })
216 .transpose()?;
217 let left_parts_patches = ALPRDData::canonicalize_patches(
220 &left_parts,
221 left_parts_patches,
222 &mut LEGACY_SESSION.create_execution_ctx(),
223 )?;
224 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
225 let data = ALPRDData::new(
226 left_parts_dictionary,
227 u8::try_from(metadata.right_bit_width).map_err(|_| {
228 vortex_err!(
229 "right_bit_width {} out of u8 range",
230 metadata.right_bit_width
231 )
232 })?,
233 left_parts_patches,
234 );
235 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
236 }
237
238 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
239 SLOT_NAMES[idx].to_string()
240 }
241
242 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
243 let array = require_child!(array, array.left_parts(), 0 => Primitive);
244 let array = require_child!(array, array.right_parts(), 1 => Primitive);
245 require_patches!(
246 array,
247 LP_PATCH_INDICES_SLOT,
248 LP_PATCH_VALUES_SLOT,
249 LP_PATCH_CHUNK_OFFSETS_SLOT
250 );
251
252 let dtype = array.dtype().clone();
253 let right_bit_width = array.right_bit_width();
254 let ALPRDDataParts {
255 left_parts,
256 right_parts,
257 left_parts_dictionary,
258 left_parts_patches,
259 } = ALPRDArrayOwnedExt::into_data_parts(array);
260 let ptype = dtype.as_ptype();
261
262 let left_parts = left_parts
263 .try_downcast::<Primitive>()
264 .ok()
265 .vortex_expect("ALPRD execute: left_parts is primitive");
266 let right_parts = right_parts
267 .try_downcast::<Primitive>()
268 .ok()
269 .vortex_expect("ALPRD execute: right_parts is primitive");
270
271 let left_parts_dict = left_parts_dictionary;
273 let validity = left_parts
274 .as_ref()
275 .validity()?
276 .execute_mask(left_parts.as_ref().len(), ctx)?;
277
278 let decoded_array = if ptype == PType::F32 {
279 PrimitiveArray::new(
280 alp_rd_decode::<f32>(
281 left_parts.into_buffer_mut::<u16>(),
282 &left_parts_dict,
283 right_bit_width,
284 right_parts.into_buffer_mut::<u32>(),
285 left_parts_patches,
286 ctx,
287 )?,
288 Validity::from_mask(validity, dtype.nullability()),
289 )
290 } else {
291 PrimitiveArray::new(
292 alp_rd_decode::<f64>(
293 left_parts.into_buffer_mut::<u16>(),
294 &left_parts_dict,
295 right_bit_width,
296 right_parts.into_buffer_mut::<u64>(),
297 left_parts_patches,
298 ctx,
299 )?,
300 Validity::from_mask(validity, dtype.nullability()),
301 )
302 };
303
304 Ok(ExecutionResult::done(decoded_array.into_array()))
305 }
306
307 fn reduce_parent(
308 array: ArrayView<'_, Self>,
309 parent: &ArrayRef,
310 child_idx: usize,
311 ) -> VortexResult<Option<ArrayRef>> {
312 RULES.evaluate(array, parent, child_idx)
313 }
314
315 fn execute_parent(
316 array: ArrayView<'_, Self>,
317 parent: &ArrayRef,
318 child_idx: usize,
319 ctx: &mut ExecutionCtx,
320 ) -> VortexResult<Option<ArrayRef>> {
321 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
322 }
323}
324
325pub(super) const LEFT_PARTS_SLOT: usize = 0;
327pub(super) const RIGHT_PARTS_SLOT: usize = 1;
329pub(super) const LP_PATCH_INDICES_SLOT: usize = 2;
331pub(super) const LP_PATCH_VALUES_SLOT: usize = 3;
333pub(super) const LP_PATCH_CHUNK_OFFSETS_SLOT: usize = 4;
335pub(super) const NUM_SLOTS: usize = 5;
336pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = [
337 "left_parts",
338 "right_parts",
339 "patch_indices",
340 "patch_values",
341 "patch_chunk_offsets",
342];
343
344#[derive(Clone, Debug)]
345pub struct ALPRDData {
346 patch_offset: Option<usize>,
347 patch_offset_within_chunk: Option<usize>,
348 left_parts_dictionary: Buffer<u16>,
349 right_bit_width: u8,
350}
351
352impl Display for ALPRDData {
353 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354 write!(f, "right_bit_width: {}", self.right_bit_width)?;
355 if let Some(offset) = self.patch_offset {
356 write!(f, ", patch_offset: {offset}")?;
357 }
358 Ok(())
359 }
360}
361
362#[derive(Clone, Debug)]
363pub struct ALPRDDataParts {
364 pub left_parts: ArrayRef,
365 pub left_parts_patches: Option<Patches>,
366 pub left_parts_dictionary: Buffer<u16>,
367 pub right_parts: ArrayRef,
368}
369
370#[derive(Clone, Debug)]
371pub struct ALPRD;
372
373impl ALPRD {
374 pub fn try_new(
375 dtype: DType,
376 left_parts: ArrayRef,
377 left_parts_dictionary: Buffer<u16>,
378 right_parts: ArrayRef,
379 right_bit_width: u8,
380 left_parts_patches: Option<Patches>,
381 ctx: &mut ExecutionCtx,
382 ) -> VortexResult<ALPRDArray> {
383 let len = left_parts.len();
384 let left_parts_patches =
385 ALPRDData::canonicalize_patches(&left_parts, left_parts_patches, ctx)?;
386 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
387 let data = ALPRDData::new(left_parts_dictionary, right_bit_width, left_parts_patches);
388 Array::try_from_parts(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
389 }
390
391 pub unsafe fn new_unchecked(
394 dtype: DType,
395 left_parts: ArrayRef,
396 left_parts_dictionary: Buffer<u16>,
397 right_parts: ArrayRef,
398 right_bit_width: u8,
399 left_parts_patches: Option<Patches>,
400 ) -> ALPRDArray {
401 let len = left_parts.len();
402 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
403 let data = unsafe {
404 ALPRDData::new_unchecked(left_parts_dictionary, right_bit_width, left_parts_patches)
405 };
406 unsafe {
407 Array::from_parts_unchecked(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
408 }
409 }
410}
411
412impl ALPRDData {
413 fn canonicalize_patches(
414 left_parts: &ArrayRef,
415 left_parts_patches: Option<Patches>,
416 ctx: &mut ExecutionCtx,
417 ) -> VortexResult<Option<Patches>> {
418 left_parts_patches
419 .map(|patches| {
420 if !patches.values().all_valid(ctx)? {
421 vortex_bail!("patches must be all valid: {}", patches.values());
422 }
423 let mut patches = patches.cast_values(&left_parts.dtype().as_nonnullable())?;
426 let canonical = patches.values().clone().execute::<Canonical>(ctx)?;
429 *patches.values_mut() = canonical.into_array();
430 Ok(patches)
431 })
432 .transpose()
433 }
434
435 pub fn new(
437 left_parts_dictionary: Buffer<u16>,
438 right_bit_width: u8,
439 left_parts_patches: Option<Patches>,
440 ) -> Self {
441 let (patch_offset, patch_offset_within_chunk) = match &left_parts_patches {
442 Some(patches) => (Some(patches.offset()), patches.offset_within_chunk()),
443 None => (None, None),
444 };
445
446 Self {
447 patch_offset,
448 patch_offset_within_chunk,
449 left_parts_dictionary,
450 right_bit_width,
451 }
452 }
453
454 pub(crate) unsafe fn new_unchecked(
457 left_parts_dictionary: Buffer<u16>,
458 right_bit_width: u8,
459 left_parts_patches: Option<Patches>,
460 ) -> Self {
461 Self::new(left_parts_dictionary, right_bit_width, left_parts_patches)
462 }
463
464 fn make_slots(
465 left_parts: &ArrayRef,
466 right_parts: &ArrayRef,
467 patches: Option<&Patches>,
468 ) -> Vec<Option<ArrayRef>> {
469 let (pi, pv, pco) = match patches {
470 Some(p) => (
471 Some(p.indices().clone()),
472 Some(p.values().clone()),
473 p.chunk_offsets().clone(),
474 ),
475 None => (None, None, None),
476 };
477 vec![
478 Some(left_parts.clone()),
479 Some(right_parts.clone()),
480 pi,
481 pv,
482 pco,
483 ]
484 }
485
486 pub fn into_parts(self, left_parts: ArrayRef, right_parts: ArrayRef) -> ALPRDDataParts {
488 ALPRDDataParts {
489 left_parts,
490 left_parts_patches: None,
491 left_parts_dictionary: self.left_parts_dictionary,
492 right_parts,
493 }
494 }
495
496 #[inline]
497 pub fn right_bit_width(&self) -> u8 {
498 self.right_bit_width
499 }
500
501 #[inline]
503 pub fn left_parts_dictionary(&self) -> &Buffer<u16> {
504 &self.left_parts_dictionary
505 }
506}
507
508fn left_parts_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
509 slots[LEFT_PARTS_SLOT]
510 .as_ref()
511 .vortex_expect("ALPRDArray left_parts slot")
512}
513
514fn right_parts_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
515 slots[RIGHT_PARTS_SLOT]
516 .as_ref()
517 .vortex_expect("ALPRDArray right_parts slot")
518}
519
520fn patches_from_slots(
521 slots: &[Option<ArrayRef>],
522 patch_offset: Option<usize>,
523 patch_offset_within_chunk: Option<usize>,
524 len: usize,
525) -> Option<Patches> {
526 match (&slots[LP_PATCH_INDICES_SLOT], &slots[LP_PATCH_VALUES_SLOT]) {
527 (Some(indices), Some(values)) => {
528 let patch_offset = patch_offset.vortex_expect("ALPRDArray patch slots without offset");
529 Some(unsafe {
530 Patches::new_unchecked(
531 len,
532 patch_offset,
533 indices.clone(),
534 values.clone(),
535 slots[LP_PATCH_CHUNK_OFFSETS_SLOT].clone(),
536 patch_offset_within_chunk,
537 )
538 })
539 }
540 _ => None,
541 }
542}
543
544fn validate_parts(
545 dtype: &DType,
546 len: usize,
547 left_parts: &ArrayRef,
548 right_parts: &ArrayRef,
549 left_parts_patches: Option<&Patches>,
550) -> VortexResult<()> {
551 if !dtype.is_float() {
552 vortex_bail!("ALPRDArray given invalid DType ({dtype})");
553 }
554
555 vortex_ensure!(
556 left_parts.len() == len,
557 "left_parts len {} != outer len {len}",
558 left_parts.len(),
559 );
560 vortex_ensure!(
561 right_parts.len() == len,
562 "right_parts len {} != outer len {len}",
563 right_parts.len(),
564 );
565
566 if !left_parts.dtype().is_unsigned_int() {
567 vortex_bail!("left_parts dtype must be uint");
568 }
569 if dtype.is_nullable() != left_parts.dtype().is_nullable() {
570 vortex_bail!(
571 "ALPRDArray dtype nullability ({}) must match left_parts dtype nullability ({})",
572 dtype,
573 left_parts.dtype()
574 );
575 }
576
577 let expected_right_parts_dtype = match dtype {
578 DType::Primitive(PType::F32, _) => DType::Primitive(PType::U32, Nullability::NonNullable),
579 DType::Primitive(PType::F64, _) => DType::Primitive(PType::U64, Nullability::NonNullable),
580 _ => vortex_bail!("Expected f32 or f64 dtype, got {:?}", dtype),
581 };
582 vortex_ensure!(
583 right_parts.dtype() == &expected_right_parts_dtype,
584 "right_parts dtype {} does not match expected {}",
585 right_parts.dtype(),
586 expected_right_parts_dtype,
587 );
588
589 if let Some(patches) = left_parts_patches {
590 vortex_ensure!(
591 patches.array_len() == len,
592 "patches array_len {} != outer len {len}",
593 patches.array_len(),
594 );
595 vortex_ensure!(
596 patches.dtype().eq_ignore_nullability(left_parts.dtype()),
597 "patches dtype {} does not match left_parts dtype {}",
598 patches.dtype(),
599 left_parts.dtype(),
600 );
601 vortex_ensure!(
602 patches
603 .values()
604 .all_valid(&mut LEGACY_SESSION.create_execution_ctx())?,
605 "patches must be all valid: {}",
606 patches.values()
607 );
608 }
609
610 Ok(())
611}
612
613pub trait ALPRDArrayExt: TypedArrayRef<ALPRD> {
614 fn left_parts(&self) -> &ArrayRef {
615 left_parts_from_slots(self.as_ref().slots())
616 }
617
618 fn right_parts(&self) -> &ArrayRef {
619 right_parts_from_slots(self.as_ref().slots())
620 }
621
622 fn right_bit_width(&self) -> u8 {
623 ALPRDData::right_bit_width(self)
624 }
625
626 fn left_parts_patches(&self) -> Option<Patches> {
627 patches_from_slots(
628 self.as_ref().slots(),
629 self.patch_offset,
630 self.patch_offset_within_chunk,
631 self.as_ref().len(),
632 )
633 }
634
635 fn left_parts_dictionary(&self) -> &Buffer<u16> {
636 ALPRDData::left_parts_dictionary(self)
637 }
638}
639impl<T: TypedArrayRef<ALPRD>> ALPRDArrayExt for T {}
640
641pub trait ALPRDArrayOwnedExt {
642 fn into_data_parts(self) -> ALPRDDataParts;
643}
644
645impl ALPRDArrayOwnedExt for Array<ALPRD> {
646 fn into_data_parts(self) -> ALPRDDataParts {
647 let left_parts_patches = self.left_parts_patches();
648 let left_parts = self.left_parts().clone();
649 let right_parts = self.right_parts().clone();
650 let mut parts = ALPRDDataParts {
651 left_parts,
652 left_parts_patches: None,
653 left_parts_dictionary: self.left_parts_dictionary().clone(),
654 right_parts,
655 };
656 parts.left_parts_patches = left_parts_patches;
657 parts
658 }
659}
660
661impl ValidityChild<ALPRD> for ALPRD {
662 fn validity_child(array: ArrayView<'_, ALPRD>) -> ArrayRef {
663 array.left_parts().clone()
664 }
665}
666
667#[cfg(test)]
668mod test {
669 use prost::Message;
670 use rstest::rstest;
671 use vortex_array::LEGACY_SESSION;
672 use vortex_array::VortexSessionExecute;
673 use vortex_array::arrays::PrimitiveArray;
674 use vortex_array::assert_arrays_eq;
675 use vortex_array::dtype::PType;
676 use vortex_array::patches::PatchesMetadata;
677 use vortex_array::test_harness::check_metadata;
678
679 use super::ALPRDMetadata;
680 use crate::ALPRDFloat;
681 use crate::alp_rd;
682
683 #[rstest]
684 #[case(vec![0.1f32.next_up(); 1024], 1.123_848_f32)]
685 #[case(vec![0.1f64.next_up(); 1024], 1.123_848_591_110_992_f64)]
686 fn test_array_encode_with_nulls_and_patches<T: ALPRDFloat>(
687 #[case] reals: Vec<T>,
688 #[case] seed: T,
689 ) {
690 let mut ctx = LEGACY_SESSION.create_execution_ctx();
691 assert_eq!(reals.len(), 1024, "test expects 1024-length fixture");
692 let mut reals: Vec<Option<T>> = reals.into_iter().map(Some).collect();
694 reals[1] = None;
695 reals[5] = None;
696 reals[900] = None;
697
698 let real_array = PrimitiveArray::from_option_iter(reals.iter().cloned());
700
701 let encoder: alp_rd::RDEncoder = alp_rd::RDEncoder::new(&[seed.powi(-2)]);
703
704 let rd_array = encoder.encode(real_array.as_view(), &mut ctx);
705
706 let decoded = rd_array
707 .as_array()
708 .clone()
709 .execute::<PrimitiveArray>(&mut ctx)
710 .unwrap();
711
712 assert_arrays_eq!(decoded, PrimitiveArray::from_option_iter(reals));
713 }
714
715 #[cfg_attr(miri, ignore)]
716 #[test]
717 fn test_alprd_metadata() {
718 check_metadata(
719 "alprd.metadata",
720 &ALPRDMetadata {
721 right_bit_width: u32::MAX,
722 patches: Some(PatchesMetadata::new(
723 usize::MAX,
724 usize::MAX,
725 PType::U64,
726 None,
727 None,
728 None,
729 )),
730 dict: Vec::new(),
731 left_parts_ptype: PType::U64 as i32,
732 dict_len: 8,
733 }
734 .encode_to_vec(),
735 );
736 }
737}