1use std::fmt::Debug;
5use std::fmt::Display;
6use std::fmt::Formatter;
7use std::hash::Hash;
8use std::hash::Hasher;
9
10use itertools::Itertools;
11use prost::Message;
12use vortex_array::Array;
13use vortex_array::ArrayEq;
14use vortex_array::ArrayHash;
15use vortex_array::ArrayId;
16use vortex_array::ArrayParts;
17use vortex_array::ArrayRef;
18use vortex_array::ArraySlots;
19use vortex_array::ArrayView;
20use vortex_array::Canonical;
21use vortex_array::ExecutionCtx;
22use vortex_array::ExecutionResult;
23use vortex_array::IntoArray;
24use vortex_array::LEGACY_SESSION;
25use vortex_array::Precision;
26use vortex_array::TypedArrayRef;
27use vortex_array::VortexSessionExecute;
28use vortex_array::arrays::Primitive;
29use vortex_array::arrays::PrimitiveArray;
30use vortex_array::buffer::BufferHandle;
31use vortex_array::dtype::DType;
32use vortex_array::dtype::Nullability;
33use vortex_array::dtype::PType;
34use vortex_array::patches::PatchSlotIndices;
35use vortex_array::patches::Patches;
36use vortex_array::patches::PatchesData;
37use vortex_array::patches::PatchesMetadata;
38use vortex_array::require_child;
39use vortex_array::require_patches;
40use vortex_array::serde::ArrayChildren;
41use vortex_array::smallvec::smallvec;
42use vortex_array::validity::Validity;
43use vortex_array::vtable::VTable;
44use vortex_array::vtable::ValidityChild;
45use vortex_array::vtable::ValidityVTableFromChild;
46use vortex_buffer::Buffer;
47use vortex_error::VortexExpect;
48use vortex_error::VortexResult;
49use vortex_error::vortex_bail;
50use vortex_error::vortex_ensure;
51use vortex_error::vortex_err;
52use vortex_error::vortex_panic;
53use vortex_session::VortexSession;
54use vortex_session::registry::CachedId;
55
56use crate::alp_rd::kernel::PARENT_KERNELS;
57use crate::alp_rd::rules::RULES;
58use crate::alp_rd_decode;
59
60pub type ALPRDArray = Array<ALPRD>;
62
63#[derive(Clone, prost::Message)]
64pub struct ALPRDMetadata {
65 #[prost(uint32, tag = "1")]
66 right_bit_width: u32,
67 #[prost(uint32, tag = "2")]
68 dict_len: u32,
69 #[prost(uint32, repeated, tag = "3")]
70 dict: Vec<u32>,
71 #[prost(enumeration = "PType", tag = "4")]
72 left_parts_ptype: i32,
73 #[prost(message, tag = "5")]
74 patches: Option<PatchesMetadata>,
75}
76
77impl ArrayHash for ALPRDData {
78 fn array_hash<H: Hasher>(&self, state: &mut H, precision: Precision) {
79 self.left_parts_dictionary.array_hash(state, precision);
80 self.right_bit_width.hash(state);
81 self.patches_data.hash(state);
82 }
83}
84
85impl ArrayEq for ALPRDData {
86 fn array_eq(&self, other: &Self, precision: Precision) -> bool {
87 self.left_parts_dictionary
88 .array_eq(&other.left_parts_dictionary, precision)
89 && self.right_bit_width == other.right_bit_width
90 && self.patches_data == other.patches_data
91 }
92}
93
94impl VTable for ALPRD {
95 type TypedArrayData = ALPRDData;
96
97 type OperationsVTable = Self;
98 type ValidityVTable = ValidityVTableFromChild;
99
100 fn id(&self) -> ArrayId {
101 static ID: CachedId = CachedId::new("vortex.alprd");
102 *ID
103 }
104
105 fn validate(
106 &self,
107 data: &ALPRDData,
108 dtype: &DType,
109 len: usize,
110 slots: &[Option<ArrayRef>],
111 ) -> VortexResult<()> {
112 validate_parts(
113 dtype,
114 len,
115 left_parts_from_slots(slots),
116 right_parts_from_slots(slots),
117 patches_from_slots(slots, data.patches_data.as_ref(), len).as_ref(),
118 )
119 }
120
121 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
122 0
123 }
124
125 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
126 vortex_panic!("ALPRDArray buffer index {idx} out of bounds")
127 }
128
129 fn buffer_name(_array: ArrayView<'_, Self>, _idx: usize) -> Option<String> {
130 None
131 }
132
133 fn serialize(
134 array: ArrayView<'_, Self>,
135 _session: &VortexSession,
136 ) -> VortexResult<Option<Vec<u8>>> {
137 let dict = array
138 .left_parts_dictionary()
139 .iter()
140 .map(|&i| i as u32)
141 .collect::<Vec<_>>();
142
143 Ok(Some(
144 ALPRDMetadata {
145 right_bit_width: array.right_bit_width() as u32,
146 dict_len: array.left_parts_dictionary().len() as u32,
147 dict,
148 left_parts_ptype: array.left_parts().dtype().as_ptype() as i32,
149 patches: array
150 .left_parts_patches()
151 .map(|p| p.to_metadata(array.len(), p.dtype()))
152 .transpose()?,
153 }
154 .encode_to_vec(),
155 ))
156 }
157
158 fn deserialize(
159 &self,
160 dtype: &DType,
161 len: usize,
162 metadata: &[u8],
163 _buffers: &[BufferHandle],
164 children: &dyn ArrayChildren,
165 _session: &VortexSession,
166 ) -> VortexResult<ArrayParts<Self>> {
167 let metadata = ALPRDMetadata::decode(metadata)?;
168 if children.len() < 2 {
169 vortex_bail!(
170 "Expected at least 2 children for ALPRD encoding, found {}",
171 children.len()
172 );
173 }
174
175 let left_parts_dtype = DType::Primitive(metadata.left_parts_ptype(), dtype.nullability());
176 let left_parts = children.get(0, &left_parts_dtype, len)?;
177 let left_parts_dictionary: Buffer<u16> = metadata.dict.as_slice()
178 [0..metadata.dict_len as usize]
179 .iter()
180 .map(|&i| {
181 u16::try_from(i)
182 .map_err(|_| vortex_err!("left_parts_dictionary code {i} does not fit in u16"))
183 })
184 .try_collect()?;
185
186 let right_parts_dtype = match &dtype {
187 DType::Primitive(PType::F32, _) => {
188 DType::Primitive(PType::U32, Nullability::NonNullable)
189 }
190 DType::Primitive(PType::F64, _) => {
191 DType::Primitive(PType::U64, Nullability::NonNullable)
192 }
193 _ => vortex_bail!("Expected f32 or f64 dtype, got {:?}", dtype),
194 };
195 let right_parts = children.get(1, &right_parts_dtype, len)?;
196
197 let left_parts_patches = metadata
198 .patches
199 .map(|p| {
200 let indices = children.get(2, &p.indices_dtype()?, p.len()?)?;
201 let values = children.get(3, &left_parts_dtype.as_nonnullable(), p.len()?)?;
202
203 Patches::new(
204 len,
205 p.offset()?,
206 indices,
207 values,
208 None,
210 )
211 })
212 .transpose()?;
213 let left_parts_patches = ALPRDData::canonicalize_patches(
216 &left_parts,
217 left_parts_patches,
218 &mut LEGACY_SESSION.create_execution_ctx(),
219 )?;
220 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
221 let data = ALPRDData::new(
222 left_parts_dictionary,
223 u8::try_from(metadata.right_bit_width).map_err(|_| {
224 vortex_err!(
225 "right_bit_width {} out of u8 range",
226 metadata.right_bit_width
227 )
228 })?,
229 left_parts_patches,
230 );
231 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
232 }
233
234 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
235 SLOT_NAMES[idx].to_string()
236 }
237
238 fn execute(array: Array<Self>, ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
239 let array = require_child!(array, array.left_parts(), 0 => Primitive);
240 let array = require_child!(array, array.right_parts(), 1 => Primitive);
241 require_patches!(
242 array,
243 LP_PATCH_INDICES_SLOT,
244 LP_PATCH_VALUES_SLOT,
245 LP_PATCH_CHUNK_OFFSETS_SLOT
246 );
247
248 let dtype = array.dtype().clone();
249 let right_bit_width = array.right_bit_width();
250 let ALPRDDataParts {
251 left_parts,
252 right_parts,
253 left_parts_dictionary,
254 left_parts_patches,
255 } = ALPRDArrayOwnedExt::into_data_parts(array);
256 let ptype = dtype.as_ptype();
257
258 let left_parts = left_parts
259 .try_downcast::<Primitive>()
260 .ok()
261 .vortex_expect("ALPRD execute: left_parts is primitive");
262 let right_parts = right_parts
263 .try_downcast::<Primitive>()
264 .ok()
265 .vortex_expect("ALPRD execute: right_parts is primitive");
266
267 let left_parts_dict = left_parts_dictionary;
269 let validity = left_parts
270 .as_ref()
271 .validity()?
272 .execute_mask(left_parts.as_ref().len(), ctx)?;
273
274 let decoded_array = if ptype == PType::F32 {
275 PrimitiveArray::new(
276 alp_rd_decode::<f32>(
277 left_parts.into_buffer_mut::<u16>(),
278 &left_parts_dict,
279 right_bit_width,
280 right_parts.into_buffer_mut::<u32>(),
281 left_parts_patches,
282 ctx,
283 )?,
284 Validity::from_mask(validity, dtype.nullability()),
285 )
286 } else {
287 PrimitiveArray::new(
288 alp_rd_decode::<f64>(
289 left_parts.into_buffer_mut::<u16>(),
290 &left_parts_dict,
291 right_bit_width,
292 right_parts.into_buffer_mut::<u64>(),
293 left_parts_patches,
294 ctx,
295 )?,
296 Validity::from_mask(validity, dtype.nullability()),
297 )
298 };
299
300 Ok(ExecutionResult::done(decoded_array.into_array()))
301 }
302
303 fn reduce_parent(
304 array: ArrayView<'_, Self>,
305 parent: &ArrayRef,
306 child_idx: usize,
307 ) -> VortexResult<Option<ArrayRef>> {
308 RULES.evaluate(array, parent, child_idx)
309 }
310
311 fn execute_parent(
312 array: ArrayView<'_, Self>,
313 parent: &ArrayRef,
314 child_idx: usize,
315 ctx: &mut ExecutionCtx,
316 ) -> VortexResult<Option<ArrayRef>> {
317 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
318 }
319}
320
321pub(super) const LEFT_PARTS_SLOT: usize = 0;
323pub(super) const RIGHT_PARTS_SLOT: usize = 1;
325pub(super) const LP_PATCH_SLOTS: PatchSlotIndices = PatchSlotIndices {
327 indices: LP_PATCH_INDICES_SLOT,
328 values: LP_PATCH_VALUES_SLOT,
329 chunk_offsets: LP_PATCH_CHUNK_OFFSETS_SLOT,
330};
331pub(super) const LP_PATCH_INDICES_SLOT: usize = 2;
332pub(super) const LP_PATCH_VALUES_SLOT: usize = 3;
334pub(super) const LP_PATCH_CHUNK_OFFSETS_SLOT: usize = 4;
336pub(super) const NUM_SLOTS: usize = 5;
337pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = [
338 "left_parts",
339 "right_parts",
340 "patch_indices",
341 "patch_values",
342 "patch_chunk_offsets",
343];
344
345#[derive(Clone, Debug)]
346pub struct ALPRDData {
347 patches_data: Option<PatchesData>,
348 left_parts_dictionary: Buffer<u16>,
349 right_bit_width: u8,
350}
351
352impl Display for ALPRDData {
353 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
354 write!(f, "right_bit_width: {}", self.right_bit_width)?;
355 if let Some(pd) = &self.patches_data {
356 write!(f, ", patch_offset: {}", pd.offset())?;
357 }
358 Ok(())
359 }
360}
361
362#[derive(Clone, Debug)]
363pub struct ALPRDDataParts {
364 pub left_parts: ArrayRef,
365 pub left_parts_patches: Option<Patches>,
366 pub left_parts_dictionary: Buffer<u16>,
367 pub right_parts: ArrayRef,
368}
369
370#[derive(Clone, Debug)]
371pub struct ALPRD;
372
373impl ALPRD {
374 pub fn try_new(
375 dtype: DType,
376 left_parts: ArrayRef,
377 left_parts_dictionary: Buffer<u16>,
378 right_parts: ArrayRef,
379 right_bit_width: u8,
380 left_parts_patches: Option<Patches>,
381 ctx: &mut ExecutionCtx,
382 ) -> VortexResult<ALPRDArray> {
383 let len = left_parts.len();
384 let left_parts_patches =
385 ALPRDData::canonicalize_patches(&left_parts, left_parts_patches, ctx)?;
386 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
387 let data = ALPRDData::new(left_parts_dictionary, right_bit_width, left_parts_patches);
388 Array::try_from_parts(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
389 }
390
391 pub unsafe fn new_unchecked(
394 dtype: DType,
395 left_parts: ArrayRef,
396 left_parts_dictionary: Buffer<u16>,
397 right_parts: ArrayRef,
398 right_bit_width: u8,
399 left_parts_patches: Option<Patches>,
400 ) -> ALPRDArray {
401 let len = left_parts.len();
402 let slots = ALPRDData::make_slots(&left_parts, &right_parts, left_parts_patches.as_ref());
403 let data = unsafe {
404 ALPRDData::new_unchecked(left_parts_dictionary, right_bit_width, left_parts_patches)
405 };
406 unsafe {
407 Array::from_parts_unchecked(ArrayParts::new(ALPRD, dtype, len, data).with_slots(slots))
408 }
409 }
410}
411
412impl ALPRDData {
413 fn canonicalize_patches(
414 left_parts: &ArrayRef,
415 left_parts_patches: Option<Patches>,
416 ctx: &mut ExecutionCtx,
417 ) -> VortexResult<Option<Patches>> {
418 left_parts_patches
419 .map(|patches| {
420 if !patches.values().all_valid(ctx)? {
421 vortex_bail!("patches must be all valid: {}", patches.values());
422 }
423 let mut patches = patches.cast_values(&left_parts.dtype().as_nonnullable())?;
426 let canonical = patches.values().clone().execute::<Canonical>(ctx)?;
429 *patches.values_mut() = canonical.into_array();
430 Ok(patches)
431 })
432 .transpose()
433 }
434
435 pub fn new(
437 left_parts_dictionary: Buffer<u16>,
438 right_bit_width: u8,
439 left_parts_patches: Option<Patches>,
440 ) -> Self {
441 Self {
442 patches_data: left_parts_patches.as_ref().map(PatchesData::from_patches),
443 left_parts_dictionary,
444 right_bit_width,
445 }
446 }
447
448 pub(crate) unsafe fn new_unchecked(
451 left_parts_dictionary: Buffer<u16>,
452 right_bit_width: u8,
453 left_parts_patches: Option<Patches>,
454 ) -> Self {
455 Self::new(left_parts_dictionary, right_bit_width, left_parts_patches)
456 }
457
458 fn make_slots(
459 left_parts: &ArrayRef,
460 right_parts: &ArrayRef,
461 patches: Option<&Patches>,
462 ) -> ArraySlots {
463 let mut slots: ArraySlots = smallvec![Some(left_parts.clone()), Some(right_parts.clone())];
464 PatchesData::push_slots(&mut slots, patches);
465 slots
466 }
467
468 pub fn into_parts(self, left_parts: ArrayRef, right_parts: ArrayRef) -> ALPRDDataParts {
470 ALPRDDataParts {
471 left_parts,
472 left_parts_patches: None,
473 left_parts_dictionary: self.left_parts_dictionary,
474 right_parts,
475 }
476 }
477
478 #[inline]
479 pub fn right_bit_width(&self) -> u8 {
480 self.right_bit_width
481 }
482
483 #[inline]
485 pub fn left_parts_dictionary(&self) -> &Buffer<u16> {
486 &self.left_parts_dictionary
487 }
488}
489
490fn left_parts_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
491 slots[LEFT_PARTS_SLOT]
492 .as_ref()
493 .vortex_expect("ALPRDArray left_parts slot")
494}
495
496fn right_parts_from_slots(slots: &[Option<ArrayRef>]) -> &ArrayRef {
497 slots[RIGHT_PARTS_SLOT]
498 .as_ref()
499 .vortex_expect("ALPRDArray right_parts slot")
500}
501
502fn patches_from_slots(
503 slots: &[Option<ArrayRef>],
504 patches_data: Option<&PatchesData>,
505 len: usize,
506) -> Option<Patches> {
507 PatchesData::patches_from_slots(patches_data, len, slots, LP_PATCH_SLOTS)
508}
509
510fn validate_parts(
511 dtype: &DType,
512 len: usize,
513 left_parts: &ArrayRef,
514 right_parts: &ArrayRef,
515 left_parts_patches: Option<&Patches>,
516) -> VortexResult<()> {
517 if !dtype.is_float() {
518 vortex_bail!("ALPRDArray given invalid DType ({dtype})");
519 }
520
521 vortex_ensure!(
522 left_parts.len() == len,
523 "left_parts len {} != outer len {len}",
524 left_parts.len(),
525 );
526 vortex_ensure!(
527 right_parts.len() == len,
528 "right_parts len {} != outer len {len}",
529 right_parts.len(),
530 );
531
532 if !left_parts.dtype().is_unsigned_int() {
533 vortex_bail!("left_parts dtype must be uint");
534 }
535 if dtype.is_nullable() != left_parts.dtype().is_nullable() {
536 vortex_bail!(
537 "ALPRDArray dtype nullability ({}) must match left_parts dtype nullability ({})",
538 dtype,
539 left_parts.dtype()
540 );
541 }
542
543 let expected_right_parts_dtype = match dtype {
544 DType::Primitive(PType::F32, _) => DType::Primitive(PType::U32, Nullability::NonNullable),
545 DType::Primitive(PType::F64, _) => DType::Primitive(PType::U64, Nullability::NonNullable),
546 _ => vortex_bail!("Expected f32 or f64 dtype, got {:?}", dtype),
547 };
548 vortex_ensure!(
549 right_parts.dtype() == &expected_right_parts_dtype,
550 "right_parts dtype {} does not match expected {}",
551 right_parts.dtype(),
552 expected_right_parts_dtype,
553 );
554
555 if let Some(patches) = left_parts_patches {
556 vortex_ensure!(
557 patches.array_len() == len,
558 "patches array_len {} != outer len {len}",
559 patches.array_len(),
560 );
561 vortex_ensure!(
562 patches.dtype().eq_ignore_nullability(left_parts.dtype()),
563 "patches dtype {} does not match left_parts dtype {}",
564 patches.dtype(),
565 left_parts.dtype(),
566 );
567 vortex_ensure!(
568 patches
569 .values()
570 .all_valid(&mut LEGACY_SESSION.create_execution_ctx())?,
571 "patches must be all valid: {}",
572 patches.values()
573 );
574 }
575
576 Ok(())
577}
578
579pub trait ALPRDArrayExt: TypedArrayRef<ALPRD> {
580 fn left_parts(&self) -> &ArrayRef {
581 left_parts_from_slots(self.as_ref().slots())
582 }
583
584 fn right_parts(&self) -> &ArrayRef {
585 right_parts_from_slots(self.as_ref().slots())
586 }
587
588 fn right_bit_width(&self) -> u8 {
589 ALPRDData::right_bit_width(self)
590 }
591
592 fn left_parts_patches(&self) -> Option<Patches> {
593 patches_from_slots(
594 self.as_ref().slots(),
595 self.patches_data.as_ref(),
596 self.as_ref().len(),
597 )
598 }
599
600 fn left_parts_dictionary(&self) -> &Buffer<u16> {
601 ALPRDData::left_parts_dictionary(self)
602 }
603}
604impl<T: TypedArrayRef<ALPRD>> ALPRDArrayExt for T {}
605
606pub trait ALPRDArrayOwnedExt {
607 fn into_data_parts(self) -> ALPRDDataParts;
608}
609
610impl ALPRDArrayOwnedExt for Array<ALPRD> {
611 fn into_data_parts(self) -> ALPRDDataParts {
612 let left_parts_patches = self.left_parts_patches();
613 let left_parts = self.left_parts().clone();
614 let right_parts = self.right_parts().clone();
615 let mut parts = ALPRDDataParts {
616 left_parts,
617 left_parts_patches: None,
618 left_parts_dictionary: self.left_parts_dictionary().clone(),
619 right_parts,
620 };
621 parts.left_parts_patches = left_parts_patches;
622 parts
623 }
624}
625
626impl ValidityChild<ALPRD> for ALPRD {
627 fn validity_child(array: ArrayView<'_, ALPRD>) -> ArrayRef {
628 array.left_parts().clone()
629 }
630}
631
632#[cfg(test)]
633mod test {
634 use prost::Message;
635 use rstest::rstest;
636 use vortex_array::LEGACY_SESSION;
637 use vortex_array::VortexSessionExecute;
638 use vortex_array::arrays::PrimitiveArray;
639 use vortex_array::assert_arrays_eq;
640 use vortex_array::dtype::PType;
641 use vortex_array::patches::PatchesMetadata;
642 use vortex_array::test_harness::check_metadata;
643
644 use super::ALPRDMetadata;
645 use crate::ALPRDFloat;
646 use crate::alp_rd;
647
648 #[rstest]
649 #[case(vec![0.1f32.next_up(); 1024], 1.123_848_f32)]
650 #[case(vec![0.1f64.next_up(); 1024], 1.123_848_591_110_992_f64)]
651 fn test_array_encode_with_nulls_and_patches<T: ALPRDFloat>(
652 #[case] reals: Vec<T>,
653 #[case] seed: T,
654 ) {
655 let mut ctx = LEGACY_SESSION.create_execution_ctx();
656 assert_eq!(reals.len(), 1024, "test expects 1024-length fixture");
657 let mut reals: Vec<Option<T>> = reals.into_iter().map(Some).collect();
659 reals[1] = None;
660 reals[5] = None;
661 reals[900] = None;
662
663 let real_array = PrimitiveArray::from_option_iter(reals.iter().cloned());
665
666 let encoder: alp_rd::RDEncoder = alp_rd::RDEncoder::new(&[seed.powi(-2)]);
668
669 let rd_array = encoder.encode(real_array.as_view(), &mut ctx);
670
671 let decoded = rd_array
672 .as_array()
673 .clone()
674 .execute::<PrimitiveArray>(&mut ctx)
675 .unwrap();
676
677 assert_arrays_eq!(decoded, PrimitiveArray::from_option_iter(reals));
678 }
679
680 #[cfg_attr(miri, ignore)]
681 #[test]
682 fn test_alprd_metadata() {
683 check_metadata(
684 "alprd.metadata",
685 &ALPRDMetadata {
686 right_bit_width: u32::MAX,
687 patches: Some(PatchesMetadata::new(
688 usize::MAX,
689 usize::MAX,
690 PType::U64,
691 None,
692 None,
693 None,
694 )),
695 dict: Vec::new(),
696 left_parts_ptype: PType::U64 as i32,
697 dict_len: 8,
698 }
699 .encode_to_vec(),
700 );
701 }
702}