Skip to main content

vortex_array/arrays/patched/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::ExecutionCtx;
24use crate::ExecutionResult;
25use crate::IntoArray;
26use crate::Precision;
27use crate::array::Array;
28use crate::array::ArrayId;
29use crate::array::ArrayParts;
30use crate::array::ArrayView;
31use crate::array::VTable;
32use crate::array::ValidityChild;
33use crate::array::ValidityVTableFromChild;
34use crate::arrays::Primitive;
35use crate::arrays::PrimitiveArray;
36use crate::arrays::patched::PatchedArrayExt;
37use crate::arrays::patched::PatchedArraySlotsExt;
38use crate::arrays::patched::PatchedData;
39use crate::arrays::patched::PatchedSlots;
40use crate::arrays::patched::PatchedSlotsView;
41use crate::arrays::patched::compute::rules::PARENT_RULES;
42use crate::arrays::patched::vtable::kernels::PARENT_KERNELS;
43use crate::arrays::primitive::PrimitiveDataParts;
44use crate::buffer::BufferHandle;
45use crate::builders::ArrayBuilder;
46use crate::builders::PrimitiveBuilder;
47use crate::dtype::DType;
48use crate::dtype::NativePType;
49use crate::dtype::PType;
50use crate::match_each_native_ptype;
51use crate::require_child;
52use crate::serde::ArrayChildren;
53
54/// A [`Patched`]-encoded Vortex array.
55pub type PatchedArray = Array<Patched>;
56
57#[derive(Clone, Debug)]
58pub struct Patched;
59
60impl ValidityChild<Patched> for Patched {
61    fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
62        array.inner().clone()
63    }
64}
65
66#[derive(Clone, prost::Message)]
67pub struct PatchedMetadata {
68    /// The total number of patches, and the length of the indices and values child arrays.
69    #[prost(uint32, tag = "1")]
70    pub(crate) n_patches: u32,
71
72    /// The number of lanes used for patch indexing. Must be a power of two between 1 and 128.
73    #[prost(uint32, tag = "2")]
74    pub(crate) n_lanes: u32,
75
76    /// An offset into the first chunk's patches that should be considered in-view.
77    ///
78    /// Always between 0 and 1023.
79    #[prost(uint32, tag = "3")]
80    pub(crate) offset: u32,
81}
82
83impl ArrayHash for PatchedData {
84    fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
85        self.offset.hash(state);
86        self.n_lanes.hash(state);
87    }
88}
89
90impl ArrayEq for PatchedData {
91    fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
92        self.offset == other.offset && self.n_lanes == other.n_lanes
93    }
94}
95
96impl VTable for Patched {
97    type ArrayData = PatchedData;
98    type OperationsVTable = Self;
99    type ValidityVTable = ValidityVTableFromChild;
100
101    fn id(&self) -> ArrayId {
102        ArrayId::new_ref("vortex.patched")
103    }
104
105    fn validate(
106        &self,
107        data: &PatchedData,
108        dtype: &DType,
109        len: usize,
110        slots: &[Option<ArrayRef>],
111    ) -> VortexResult<()> {
112        data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
113    }
114
115    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
116        0
117    }
118
119    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
120        vortex_panic!("invalid buffer index for PatchedArray: {idx}");
121    }
122
123    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
124        vortex_panic!("invalid buffer index for PatchedArray: {idx}");
125    }
126
127    fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
128        match idx {
129            PatchedSlots::INNER => array.inner().clone(),
130            PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
131            PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
132            PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
133            _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
134        }
135    }
136
137    fn serialize(
138        array: ArrayView<'_, Self>,
139        _session: &VortexSession,
140    ) -> VortexResult<Option<Vec<u8>>> {
141        Ok(Some(
142            PatchedMetadata {
143                n_patches: u32::try_from(array.patch_indices().len())?,
144                n_lanes: u32::try_from(array.n_lanes())?,
145                offset: u32::try_from(array.offset())?,
146            }
147            .encode_to_vec(),
148        ))
149    }
150
151    fn deserialize(
152        &self,
153        dtype: &DType,
154        len: usize,
155        metadata: &[u8],
156        _buffers: &[BufferHandle],
157        children: &dyn ArrayChildren,
158        _session: &VortexSession,
159    ) -> VortexResult<ArrayParts<Self>> {
160        let metadata = PatchedMetadata::decode(metadata)?;
161        let n_patches = metadata.n_patches as usize;
162        let n_lanes = metadata.n_lanes as usize;
163        let offset = metadata.offset as usize;
164
165        // n_chunks should correspond to the chunk in the `inner`.
166        // After slicing when offset > 0, there may be additional chunks.
167        let n_chunks = (len + offset).div_ceil(1024);
168
169        let inner = children.get(0, dtype, len)?;
170        let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
171        let indices = children.get(2, PType::U16.into(), n_patches)?;
172        let values = children.get(3, dtype, n_patches)?;
173
174        let data = PatchedData { n_lanes, offset };
175        let slots = PatchedSlots {
176            inner,
177            lane_offsets,
178            patch_indices: indices,
179            patch_values: values,
180        }
181        .into_slots();
182        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
183    }
184
185    fn append_to_builder(
186        array: ArrayView<'_, Self>,
187        builder: &mut dyn ArrayBuilder,
188        ctx: &mut ExecutionCtx,
189    ) -> VortexResult<()> {
190        let dtype = array.array().dtype();
191
192        if !dtype.is_primitive() {
193            // Default pathway: canonicalize and propagate.
194            let canonical = array
195                .array()
196                .clone()
197                .execute::<Canonical>(ctx)?
198                .into_array();
199            builder.extend_from_array(&canonical);
200            return Ok(());
201        }
202
203        let ptype = dtype.as_ptype();
204
205        let len = array.len();
206
207        array.inner().append_to_builder(builder, ctx)?;
208
209        let offset = array.offset();
210        let lane_offsets = array
211            .lane_offsets()
212            .clone()
213            .execute::<PrimitiveArray>(ctx)?;
214        let indices = array
215            .patch_indices()
216            .clone()
217            .execute::<PrimitiveArray>(ctx)?;
218        let values = array
219            .patch_values()
220            .clone()
221            .execute::<PrimitiveArray>(ctx)?;
222
223        match_each_native_ptype!(ptype, |V| {
224            let typed_builder = builder
225                .as_any_mut()
226                .downcast_mut::<PrimitiveBuilder<V>>()
227                .vortex_expect("correctly typed builder");
228
229            // Overwrite the last `len` elements of the builder. These would have been
230            // populated by the inner.append_to_builder() call above.
231            let output = typed_builder.values_mut();
232            let trailer = output.len() - len;
233
234            apply_patches_primitive::<V>(
235                &mut output[trailer..],
236                offset,
237                len,
238                array.n_lanes(),
239                lane_offsets.as_slice::<u32>(),
240                indices.as_slice::<u16>(),
241                values.as_slice::<V>(),
242            );
243        });
244
245        Ok(())
246    }
247
248    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
249        PatchedSlots::NAMES[idx].to_string()
250    }
251
252    fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
253        let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
254        let array =
255            require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
256        let array =
257            require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
258        let array =
259            require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
260
261        let len = array.len();
262
263        let n_lanes = array.n_lanes;
264        let offset = array.offset;
265        let slots = match array.try_into_parts() {
266            Ok(parts) => PatchedSlots::from_slots(parts.slots),
267            Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
268        };
269
270        // TODO(joe): use iterative execution
271        let PrimitiveDataParts {
272            buffer,
273            ptype,
274            validity,
275        } = slots.inner.downcast::<Primitive>().into_data_parts();
276
277        let values = slots.patch_values.downcast::<Primitive>();
278        let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
279        let patch_indices = slots.patch_indices.downcast::<Primitive>();
280
281        let patched_values = match_each_native_ptype!(values.ptype(), |V| {
282            let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
283
284            apply_patches_primitive::<V>(
285                &mut output,
286                offset,
287                len,
288                n_lanes,
289                lane_offsets.as_slice::<u32>(),
290                patch_indices.as_slice::<u16>(),
291                values.as_slice::<V>(),
292            );
293
294            let output = output.freeze();
295
296            PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
297        });
298
299        Ok(ExecutionResult::done(patched_values.into_array()))
300    }
301
302    fn execute_parent(
303        array: ArrayView<'_, Self>,
304        parent: &ArrayRef,
305        child_idx: usize,
306        ctx: &mut ExecutionCtx,
307    ) -> VortexResult<Option<ArrayRef>> {
308        PARENT_KERNELS.execute(array, parent, child_idx, ctx)
309    }
310
311    fn reduce_parent(
312        array: ArrayView<'_, Self>,
313        parent: &ArrayRef,
314        child_idx: usize,
315    ) -> VortexResult<Option<ArrayRef>> {
316        PARENT_RULES.evaluate(array, parent, child_idx)
317    }
318}
319
320/// Apply patches on top of the existing value types.
321#[allow(clippy::too_many_arguments)]
322fn apply_patches_primitive<V: NativePType>(
323    output: &mut [V],
324    offset: usize,
325    len: usize,
326    n_lanes: usize,
327    lane_offsets: &[u32],
328    indices: &[u16],
329    values: &[V],
330) {
331    let n_chunks = (offset + len).div_ceil(1024);
332    for chunk in 0..n_chunks {
333        let start = lane_offsets[chunk * n_lanes] as usize;
334        let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
335
336        for idx in start..stop {
337            // the indices slice is measured as an offset into the 1024-value chunk.
338            let index = chunk * 1024 + indices[idx] as usize;
339            if index < offset || index >= offset + len {
340                continue;
341            }
342
343            let value = values[idx];
344            output[index - offset] = value;
345        }
346    }
347}
348
349#[cfg(test)]
350mod tests {
351    use rstest::rstest;
352    use vortex_buffer::ByteBufferMut;
353    use vortex_buffer::buffer;
354    use vortex_buffer::buffer_mut;
355    use vortex_error::VortexResult;
356    use vortex_session::VortexSession;
357    use vortex_session::registry::ReadContext;
358
359    use crate::ArrayContext;
360    use crate::Canonical;
361    use crate::ExecutionCtx;
362    use crate::IntoArray;
363    use crate::LEGACY_SESSION;
364    use crate::arrays::Patched;
365    use crate::arrays::PatchedArray;
366    use crate::arrays::PrimitiveArray;
367    use crate::arrays::patched::PatchedArraySlotsExt;
368    use crate::arrays::patched::PatchedSlots;
369    use crate::arrays::patched::PatchedSlotsView;
370    use crate::assert_arrays_eq;
371    use crate::builders::builder_with_capacity;
372    use crate::patches::Patches;
373    use crate::serde::SerializeOptions;
374    use crate::serde::SerializedArray;
375    use crate::validity::Validity;
376
377    #[test]
378    fn test_execute() {
379        let values = buffer![0u16; 1024].into_array();
380        let patches = Patches::new(
381            1024,
382            0,
383            buffer![1u32, 2, 3].into_array(),
384            buffer![1u16; 3].into_array(),
385            None,
386        )
387        .unwrap();
388
389        let session = VortexSession::empty();
390        let mut ctx = ExecutionCtx::new(session);
391
392        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
393            .unwrap()
394            .into_array();
395
396        let executed = array
397            .execute::<Canonical>(&mut ctx)
398            .unwrap()
399            .into_primitive()
400            .into_buffer::<u16>();
401
402        let mut expected = buffer_mut![0u16; 1024];
403        expected[1] = 1;
404        expected[2] = 1;
405        expected[3] = 1;
406
407        assert_eq!(executed, expected.freeze());
408    }
409
410    #[test]
411    fn test_execute_sliced() {
412        let values = buffer![0u16; 1024].into_array();
413        let patches = Patches::new(
414            1024,
415            0,
416            buffer![1u32, 2, 3].into_array(),
417            buffer![1u16; 3].into_array(),
418            None,
419        )
420        .unwrap();
421
422        let session = VortexSession::empty();
423        let mut ctx = ExecutionCtx::new(session);
424
425        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
426            .unwrap()
427            .into_array()
428            .slice(3..1024)
429            .unwrap();
430
431        let executed = array
432            .execute::<Canonical>(&mut ctx)
433            .unwrap()
434            .into_primitive()
435            .into_buffer::<u16>();
436
437        let mut expected = buffer_mut![0u16; 1021];
438        expected[0] = 1;
439
440        assert_eq!(executed, expected.freeze());
441    }
442
443    #[test]
444    fn test_append_to_builder_non_nullable() {
445        let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
446        let patches = Patches::new(
447            1024,
448            0,
449            buffer![1u32, 2, 3].into_array(),
450            buffer![10u16, 20, 30].into_array(),
451            None,
452        )
453        .unwrap();
454
455        let session = VortexSession::empty();
456        let mut ctx = ExecutionCtx::new(session);
457
458        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
459            .unwrap()
460            .into_array();
461
462        let mut builder = builder_with_capacity(array.dtype(), array.len());
463        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
464
465        let result = builder.finish();
466
467        let mut expected = buffer_mut![0u16; 1024];
468        expected[1] = 10;
469        expected[2] = 20;
470        expected[3] = 30;
471        let expected = expected.into_array();
472
473        assert_arrays_eq!(expected, result);
474    }
475
476    #[test]
477    fn test_append_to_builder_sliced() {
478        let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
479        let patches = Patches::new(
480            1024,
481            0,
482            buffer![1u32, 2, 3].into_array(),
483            buffer![10u16, 20, 30].into_array(),
484            None,
485        )
486        .unwrap();
487
488        let session = VortexSession::empty();
489        let mut ctx = ExecutionCtx::new(session);
490
491        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
492            .unwrap()
493            .into_array()
494            .slice(3..1024)
495            .unwrap();
496
497        let mut builder = builder_with_capacity(array.dtype(), array.len());
498        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
499
500        let result = builder.finish();
501
502        let mut expected = buffer_mut![0u16; 1021];
503        expected[0] = 30;
504        let expected = expected.into_array();
505
506        assert_arrays_eq!(expected, result);
507    }
508
509    #[test]
510    fn test_append_to_builder_with_validity() {
511        // Create inner array with nulls at indices 0 and 5.
512        let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
513        let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
514
515        // Apply patches at indices 1, 2, 3.
516        let patches = Patches::new(
517            10,
518            0,
519            buffer![1u32, 2, 3].into_array(),
520            buffer![10u16, 20, 30].into_array(),
521            None,
522        )
523        .unwrap();
524
525        let session = VortexSession::empty();
526        let mut ctx = ExecutionCtx::new(session);
527
528        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
529            .unwrap()
530            .into_array();
531
532        let mut builder = builder_with_capacity(array.dtype(), array.len());
533        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
534
535        let result = builder.finish();
536
537        // Expected: null at 0, patched 10/20/30 at 1/2/3, zero at 4, null at 5, zeros at 6-9.
538        let expected = PrimitiveArray::from_option_iter([
539            None,
540            Some(10u16),
541            Some(20),
542            Some(30),
543            Some(0),
544            None,
545            Some(0),
546            Some(0),
547            Some(0),
548            Some(0),
549        ])
550        .into_array();
551
552        assert_arrays_eq!(expected, result);
553    }
554
555    fn make_patched_array(
556        inner: impl IntoIterator<Item = u16>,
557        patch_indices: &[u32],
558        patch_values: &[u16],
559    ) -> VortexResult<PatchedArray> {
560        let values: Vec<u16> = inner.into_iter().collect();
561        let len = values.len();
562        let array = PrimitiveArray::from_iter(values).into_array();
563
564        let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
565        let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
566
567        let patches = Patches::new(len, 0, indices, patch_vals, None)?;
568
569        let session = VortexSession::empty();
570        let mut ctx = ExecutionCtx::new(session);
571
572        Patched::from_array_and_patches(array, &patches, &mut ctx)
573    }
574
575    #[rstest]
576    #[case::basic(
577        make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
578    )]
579    #[case::multi_chunk(
580        make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
581    )]
582    #[case::sliced({
583        let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
584        arr.into_array().slice(2..1024).unwrap()
585    })]
586    fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
587        let dtype = array.dtype().clone();
588        let len = array.len();
589
590        let ctx = ArrayContext::empty();
591        let serialized = array
592            .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
593            .unwrap();
594
595        // Concat into a single buffer.
596        let mut concat = ByteBufferMut::empty();
597        for buf in serialized {
598            concat.extend_from_slice(buf.as_ref());
599        }
600        let concat = concat.freeze();
601
602        let parts = SerializedArray::try_from(concat).unwrap();
603        let decoded = parts
604            .decode(
605                &dtype,
606                len,
607                &ReadContext::new(ctx.to_ids()),
608                &LEGACY_SESSION,
609            )
610            .unwrap();
611
612        assert!(decoded.is::<Patched>());
613        assert_eq!(
614            array.display_values().to_string(),
615            decoded.display_values().to_string()
616        );
617    }
618
619    #[test]
620    fn test_with_slots_basic() -> VortexResult<()> {
621        let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
622
623        // Get original children via accessor methods
624        let slots = PatchedSlots::from_slots(array.as_array().slots().to_vec());
625        let view = PatchedSlotsView::from_slots(array.as_array().slots());
626        assert_eq!(view.inner.len(), array.inner().len());
627
628        // Create new PatchedArray with same children using with_slots
629        let array_ref = array.into_array();
630        let new_array = array_ref.clone().with_slots(slots.into_slots())?;
631
632        assert!(new_array.is::<Patched>());
633        assert_eq!(array_ref.len(), new_array.len());
634        assert_eq!(array_ref.dtype(), new_array.dtype());
635
636        // Execute both and compare results
637        let mut ctx = ExecutionCtx::new(VortexSession::empty());
638        let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
639        let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
640
641        assert_arrays_eq!(original_executed, new_executed);
642
643        Ok(())
644    }
645
646    #[test]
647    fn test_with_slots_modified_inner() -> VortexResult<()> {
648        let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
649
650        // Create a different inner array (all 5s instead of 0s)
651        let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
652        let slots = PatchedSlots {
653            inner: new_inner,
654            lane_offsets: array.lane_offsets().clone(),
655            patch_indices: array.patch_indices().clone(),
656            patch_values: array.patch_values().clone(),
657        };
658
659        let array_ref = array.into_array();
660        let new_array = array_ref.with_slots(slots.into_slots())?;
661
662        // Execute and verify the inner values changed (except at patch positions)
663        let mut ctx = ExecutionCtx::new(VortexSession::empty());
664        let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
665
666        // Expected: all 5s except indices 1, 2, 3 which are patched to 10, 20, 30
667        let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
668        assert_arrays_eq!(expected, executed);
669
670        Ok(())
671    }
672}