Skip to main content

vortex_array/arrays/patched/vtable/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20use vortex_session::registry::CachedId;
21
22use crate::ArrayRef;
23use crate::Canonical;
24use crate::EqMode;
25use crate::ExecutionCtx;
26use crate::ExecutionResult;
27use crate::IntoArray;
28use crate::array::Array;
29use crate::array::ArrayId;
30use crate::array::ArrayParts;
31use crate::array::ArrayView;
32use crate::array::VTable;
33use crate::array::ValidityChild;
34use crate::array::ValidityVTableFromChild;
35use crate::array::with_empty_buffers;
36use crate::arrays::Primitive;
37use crate::arrays::PrimitiveArray;
38use crate::arrays::patched::PatchedArrayExt;
39use crate::arrays::patched::PatchedArraySlotsExt;
40use crate::arrays::patched::PatchedData;
41use crate::arrays::patched::PatchedSlots;
42use crate::arrays::patched::PatchedSlotsView;
43use crate::arrays::patched::compute::rules::PARENT_RULES;
44use crate::arrays::primitive::PrimitiveDataParts;
45use crate::buffer::BufferHandle;
46use crate::builders::ArrayBuilder;
47use crate::builders::PrimitiveBuilder;
48use crate::dtype::DType;
49use crate::dtype::NativePType;
50use crate::dtype::PType;
51use crate::match_each_native_ptype;
52use crate::require_child;
53use crate::serde::ArrayChildren;
54
55/// A [`Patched`]-encoded Vortex array.
56pub type PatchedArray = Array<Patched>;
57
58pub(crate) fn initialize(session: &VortexSession) {
59    kernels::initialize(session);
60}
61
62#[derive(Clone, Debug)]
63pub struct Patched;
64
65impl ValidityChild<Patched> for Patched {
66    fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
67        array.inner().clone()
68    }
69}
70
71#[derive(Clone, prost::Message)]
72pub struct PatchedMetadata {
73    /// The total number of patches, and the length of the indices and values child arrays.
74    #[prost(uint32, tag = "1")]
75    pub(crate) n_patches: u32,
76
77    /// The number of lanes used for patch indexing. Must be a power of two between 1 and 128.
78    #[prost(uint32, tag = "2")]
79    pub(crate) n_lanes: u32,
80
81    /// An offset into the first chunk's patches that should be considered in-view.
82    ///
83    /// Always between 0 and 1023.
84    #[prost(uint32, tag = "3")]
85    pub(crate) offset: u32,
86}
87
88impl ArrayHash for PatchedData {
89    fn array_hash<H: Hasher>(&self, state: &mut H, _accuracy: EqMode) {
90        self.offset.hash(state);
91        self.n_lanes.hash(state);
92    }
93}
94
95impl ArrayEq for PatchedData {
96    fn array_eq(&self, other: &Self, _accuracy: EqMode) -> bool {
97        self.offset == other.offset && self.n_lanes == other.n_lanes
98    }
99}
100
101impl VTable for Patched {
102    type TypedArrayData = PatchedData;
103    type OperationsVTable = Self;
104    type ValidityVTable = ValidityVTableFromChild;
105
106    fn id(&self) -> ArrayId {
107        static ID: CachedId = CachedId::new("vortex.patched");
108        *ID
109    }
110
111    fn validate(
112        &self,
113        data: &PatchedData,
114        dtype: &DType,
115        len: usize,
116        slots: &[Option<ArrayRef>],
117    ) -> VortexResult<()> {
118        data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
119    }
120
121    fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
122        0
123    }
124
125    fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
126        vortex_panic!("invalid buffer index for PatchedArray: {idx}");
127    }
128
129    fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
130        vortex_panic!("invalid buffer index for PatchedArray: {idx}");
131    }
132
133    fn with_buffers(
134        &self,
135        array: ArrayView<'_, Self>,
136        buffers: &[BufferHandle],
137    ) -> VortexResult<ArrayParts<Self>> {
138        with_empty_buffers(self, array, buffers)
139    }
140
141    fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
142        match idx {
143            PatchedSlots::INNER => array.inner().clone(),
144            PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
145            PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
146            PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
147            _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
148        }
149    }
150
151    fn serialize(
152        array: ArrayView<'_, Self>,
153        _session: &VortexSession,
154    ) -> VortexResult<Option<Vec<u8>>> {
155        Ok(Some(
156            PatchedMetadata {
157                n_patches: u32::try_from(array.patch_indices().len())?,
158                n_lanes: u32::try_from(array.n_lanes())?,
159                offset: u32::try_from(array.offset())?,
160            }
161            .encode_to_vec(),
162        ))
163    }
164
165    fn deserialize(
166        &self,
167        dtype: &DType,
168        len: usize,
169        metadata: &[u8],
170        _buffers: &[BufferHandle],
171        children: &dyn ArrayChildren,
172        _session: &VortexSession,
173    ) -> VortexResult<ArrayParts<Self>> {
174        let metadata = PatchedMetadata::decode(metadata)?;
175        let n_patches = metadata.n_patches as usize;
176        let n_lanes = metadata.n_lanes as usize;
177        let offset = metadata.offset as usize;
178
179        // n_chunks should correspond to the chunk in the `inner`.
180        // After slicing when offset > 0, there may be additional chunks.
181        let n_chunks = (len + offset).div_ceil(1024);
182
183        let inner = children.get(0, dtype, len)?;
184        let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
185        let indices = children.get(2, PType::U16.into(), n_patches)?;
186        let values = children.get(3, dtype, n_patches)?;
187
188        let data = PatchedData { n_lanes, offset };
189        let slots = PatchedSlots {
190            inner,
191            lane_offsets,
192            patch_indices: indices,
193            patch_values: values,
194        }
195        .into_slots();
196        Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
197    }
198
199    fn append_to_builder(
200        array: ArrayView<'_, Self>,
201        builder: &mut dyn ArrayBuilder,
202        ctx: &mut ExecutionCtx,
203    ) -> VortexResult<()> {
204        let dtype = array.array().dtype();
205
206        if !dtype.is_primitive() {
207            // Default pathway: canonicalize and propagate.
208            let canonical = array
209                .array()
210                .clone()
211                .execute::<Canonical>(ctx)?
212                .into_array();
213            builder.extend_from_array(&canonical);
214            return Ok(());
215        }
216
217        let ptype = dtype.as_ptype();
218
219        let len = array.len();
220
221        array.inner().append_to_builder(builder, ctx)?;
222
223        let offset = array.offset();
224        let lane_offsets = array
225            .lane_offsets()
226            .clone()
227            .execute::<PrimitiveArray>(ctx)?;
228        let indices = array
229            .patch_indices()
230            .clone()
231            .execute::<PrimitiveArray>(ctx)?;
232        let values = array
233            .patch_values()
234            .clone()
235            .execute::<PrimitiveArray>(ctx)?;
236
237        match_each_native_ptype!(ptype, |V| {
238            let typed_builder = builder
239                .as_any_mut()
240                .downcast_mut::<PrimitiveBuilder<V>>()
241                .vortex_expect("correctly typed builder");
242
243            // Overwrite the last `len` elements of the builder. These would have been
244            // populated by the inner.append_to_builder() call above.
245            let output = typed_builder.values_mut();
246            let trailer = output.len() - len;
247
248            apply_patches_primitive::<V>(
249                &mut output[trailer..],
250                offset,
251                len,
252                array.n_lanes(),
253                lane_offsets.as_slice::<u32>(),
254                indices.as_slice::<u16>(),
255                values.as_slice::<V>(),
256            );
257        });
258
259        Ok(())
260    }
261
262    fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
263        PatchedSlots::NAMES[idx].to_string()
264    }
265
266    fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
267        let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
268        let array =
269            require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
270        let array =
271            require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
272        let array =
273            require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
274
275        let len = array.len();
276
277        let n_lanes = array.n_lanes;
278        let offset = array.offset;
279        let slots = match array.try_into_parts() {
280            Ok(parts) => PatchedSlots::from_slots(parts.slots),
281            Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
282        };
283
284        // TODO(joe): use iterative execution
285        let PrimitiveDataParts {
286            buffer,
287            ptype,
288            validity,
289        } = slots.inner.downcast::<Primitive>().into_data_parts();
290
291        let values = slots.patch_values.downcast::<Primitive>();
292        let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
293        let patch_indices = slots.patch_indices.downcast::<Primitive>();
294
295        let patched_values = match_each_native_ptype!(values.ptype(), |V| {
296            let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
297
298            apply_patches_primitive::<V>(
299                &mut output,
300                offset,
301                len,
302                n_lanes,
303                lane_offsets.as_slice::<u32>(),
304                patch_indices.as_slice::<u16>(),
305                values.as_slice::<V>(),
306            );
307
308            let output = output.freeze();
309
310            PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
311        });
312
313        Ok(ExecutionResult::done(patched_values.into_array()))
314    }
315
316    fn reduce_parent(
317        array: ArrayView<'_, Self>,
318        parent: &ArrayRef,
319        child_idx: usize,
320    ) -> VortexResult<Option<ArrayRef>> {
321        PARENT_RULES.evaluate(array, parent, child_idx)
322    }
323}
324
325/// Apply patches on top of the existing value types.
326fn apply_patches_primitive<V: NativePType>(
327    output: &mut [V],
328    offset: usize,
329    len: usize,
330    n_lanes: usize,
331    lane_offsets: &[u32],
332    indices: &[u16],
333    values: &[V],
334) {
335    let n_chunks = (offset + len).div_ceil(1024);
336    for chunk in 0..n_chunks {
337        let start = lane_offsets[chunk * n_lanes] as usize;
338        let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
339
340        for idx in start..stop {
341            // the indices slice is measured as an offset into the 1024-value chunk.
342            let index = chunk * 1024 + indices[idx] as usize;
343            if index < offset || index >= offset + len {
344                continue;
345            }
346
347            let value = values[idx];
348            output[index - offset] = value;
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use rstest::rstest;
356    use vortex_buffer::ByteBufferMut;
357    use vortex_buffer::buffer;
358    use vortex_buffer::buffer_mut;
359    use vortex_error::VortexResult;
360    use vortex_session::registry::ReadContext;
361
362    use crate::Array;
363    use crate::ArrayContext;
364    use crate::ArrayParts;
365    use crate::ArraySlots;
366    use crate::Canonical;
367    use crate::IntoArray;
368    use crate::VortexSessionExecute;
369    use crate::array_session;
370    use crate::arrays::Patched;
371    use crate::arrays::PatchedArray;
372    use crate::arrays::PrimitiveArray;
373    use crate::arrays::patched::PatchedArrayExt;
374    use crate::arrays::patched::PatchedArraySlotsExt;
375    use crate::arrays::patched::PatchedData;
376    use crate::arrays::patched::PatchedSlots;
377    use crate::arrays::patched::PatchedSlotsView;
378    use crate::assert_arrays_eq;
379    use crate::builders::builder_with_capacity;
380    use crate::patches::Patches;
381    use crate::serde::SerializeOptions;
382    use crate::serde::SerializedArray;
383    use crate::session::ArraySessionExt;
384    use crate::validity::Validity;
385
386    #[test]
387    fn test_execute() {
388        let values = buffer![0u16; 1024].into_array();
389        let patches = Patches::new(
390            1024,
391            0,
392            buffer![1u32, 2, 3].into_array(),
393            buffer![1u16; 3].into_array(),
394            None,
395        )
396        .unwrap();
397
398        let session = array_session();
399        let mut ctx = session.create_execution_ctx();
400
401        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
402            .unwrap()
403            .into_array();
404
405        let executed = array
406            .execute::<Canonical>(&mut ctx)
407            .unwrap()
408            .into_primitive()
409            .into_buffer::<u16>();
410
411        let mut expected = buffer_mut![0u16; 1024];
412        expected[1] = 1;
413        expected[2] = 1;
414        expected[3] = 1;
415
416        assert_eq!(executed, expected.freeze());
417    }
418
419    #[test]
420    fn test_execute_sliced() {
421        let values = buffer![0u16; 1024].into_array();
422        let patches = Patches::new(
423            1024,
424            0,
425            buffer![1u32, 2, 3].into_array(),
426            buffer![1u16; 3].into_array(),
427            None,
428        )
429        .unwrap();
430
431        let session = array_session();
432        let mut ctx = session.create_execution_ctx();
433
434        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
435            .unwrap()
436            .into_array()
437            .slice(3..1024)
438            .unwrap();
439
440        let executed = array
441            .execute::<Canonical>(&mut ctx)
442            .unwrap()
443            .into_primitive()
444            .into_buffer::<u16>();
445
446        let mut expected = buffer_mut![0u16; 1021];
447        expected[0] = 1;
448
449        assert_eq!(executed, expected.freeze());
450    }
451
452    #[test]
453    fn test_append_to_builder_non_nullable() {
454        let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
455        let patches = Patches::new(
456            1024,
457            0,
458            buffer![1u32, 2, 3].into_array(),
459            buffer![10u16, 20, 30].into_array(),
460            None,
461        )
462        .unwrap();
463
464        let session = array_session();
465        let mut ctx = session.create_execution_ctx();
466
467        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
468            .unwrap()
469            .into_array();
470
471        let mut builder = builder_with_capacity(array.dtype(), array.len());
472        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
473
474        let result = builder.finish();
475
476        let mut expected = buffer_mut![0u16; 1024];
477        expected[1] = 10;
478        expected[2] = 20;
479        expected[3] = 30;
480        let expected = expected.into_array();
481
482        assert_arrays_eq!(expected, result, &mut ctx);
483    }
484
485    #[test]
486    fn test_append_to_builder_sliced() {
487        let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
488        let patches = Patches::new(
489            1024,
490            0,
491            buffer![1u32, 2, 3].into_array(),
492            buffer![10u16, 20, 30].into_array(),
493            None,
494        )
495        .unwrap();
496
497        let session = array_session();
498        let mut ctx = session.create_execution_ctx();
499
500        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
501            .unwrap()
502            .into_array()
503            .slice(3..1024)
504            .unwrap();
505
506        let mut builder = builder_with_capacity(array.dtype(), array.len());
507        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
508
509        let result = builder.finish();
510
511        let mut expected = buffer_mut![0u16; 1021];
512        expected[0] = 30;
513        let expected = expected.into_array();
514
515        assert_arrays_eq!(expected, result, &mut ctx);
516    }
517
518    #[test]
519    fn test_append_to_builder_with_validity() {
520        // Create inner array with nulls at indices 0 and 5.
521        let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
522        let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
523
524        // Apply patches at indices 1, 2, 3.
525        let patches = Patches::new(
526            10,
527            0,
528            buffer![1u32, 2, 3].into_array(),
529            buffer![10u16, 20, 30].into_array(),
530            None,
531        )
532        .unwrap();
533
534        let session = array_session();
535        let mut ctx = session.create_execution_ctx();
536
537        let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
538            .unwrap()
539            .into_array();
540
541        let mut builder = builder_with_capacity(array.dtype(), array.len());
542        array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
543
544        let result = builder.finish();
545
546        // Expected: null at 0, patched 10/20/30 at 1/2/3, zero at 4, null at 5, zeros at 6-9.
547        let expected = PrimitiveArray::from_option_iter([
548            None,
549            Some(10u16),
550            Some(20),
551            Some(30),
552            Some(0),
553            None,
554            Some(0),
555            Some(0),
556            Some(0),
557            Some(0),
558        ])
559        .into_array();
560
561        assert_arrays_eq!(expected, result, &mut ctx);
562    }
563
564    fn make_patched_array(
565        inner: impl IntoIterator<Item = u16>,
566        patch_indices: &[u32],
567        patch_values: &[u16],
568    ) -> VortexResult<PatchedArray> {
569        let values: Vec<u16> = inner.into_iter().collect();
570        let len = values.len();
571        let array = PrimitiveArray::from_iter(values).into_array();
572
573        let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
574        let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
575
576        let patches = Patches::new(len, 0, indices, patch_vals, None)?;
577
578        let session = array_session();
579        let mut ctx = session.create_execution_ctx();
580
581        Patched::from_array_and_patches(array, &patches, &mut ctx)
582    }
583
584    #[rstest]
585    #[case::basic(
586        make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
587    )]
588    #[case::multi_chunk(
589        make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
590    )]
591    #[case::sliced({
592        let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
593        arr.into_array().slice(2..1024).unwrap()
594    })]
595    fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
596        let dtype = array.dtype().clone();
597        let len = array.len();
598
599        let session = array_session();
600        session.arrays().register(Patched);
601
602        let ctx = ArrayContext::empty().with_registry(session.arrays().registry().clone());
603        let serialized = array
604            .serialize(&ctx, &session, &SerializeOptions::default())
605            .unwrap();
606
607        // Concat into a single buffer.
608        let mut concat = ByteBufferMut::empty();
609        for buf in serialized {
610            concat.extend_from_slice(buf.as_ref());
611        }
612        let concat = concat.freeze();
613
614        let parts = SerializedArray::try_from(concat).unwrap();
615        let decoded = parts
616            .decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
617            .unwrap();
618
619        assert!(decoded.is::<Patched>());
620        assert_eq!(
621            array.display_values().to_string(),
622            decoded.display_values().to_string()
623        );
624    }
625
626    #[test]
627    fn test_with_slots_basic() -> VortexResult<()> {
628        let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
629
630        // Get original children via accessor methods
631        let slots = PatchedSlots::from_slots(
632            array
633                .as_array()
634                .slots()
635                .iter()
636                .cloned()
637                .collect::<ArraySlots>(),
638        );
639        let view = PatchedSlotsView::from_slots(array.as_array().slots());
640        assert_eq!(view.inner.len(), array.inner().len());
641
642        // Create new PatchedArray with same children using with_slots
643        let array_ref = array.into_array();
644        // SAFETY: the replacement slots are the original children, preserving logical values and
645        // parent statistics.
646        let new_array = unsafe { array_ref.clone().with_slots(slots.into_slots()) }?;
647
648        assert!(new_array.is::<Patched>());
649        assert_eq!(array_ref.len(), new_array.len());
650        assert_eq!(array_ref.dtype(), new_array.dtype());
651
652        // Execute both and compare results
653        let mut ctx = array_session().create_execution_ctx();
654        let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
655        let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
656
657        assert_arrays_eq!(original_executed, new_executed, &mut ctx);
658
659        Ok(())
660    }
661
662    #[test]
663    fn test_rebuild_modified_inner_from_parts() -> VortexResult<()> {
664        let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
665
666        // Create a different inner array (all 5s instead of 0s)
667        let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
668        let slots = PatchedSlots {
669            inner: new_inner,
670            lane_offsets: array.lane_offsets().clone(),
671            patch_indices: array.patch_indices().clone(),
672            patch_values: array.patch_values().clone(),
673        };
674
675        let data = PatchedData {
676            n_lanes: array.n_lanes(),
677            offset: array.offset(),
678        };
679        let new_array = Array::try_from_parts(
680            ArrayParts::new(Patched, array.dtype().clone(), array.len(), data)
681                .with_slots(slots.into_slots()),
682        )?
683        .into_array();
684
685        // Execute and verify the inner values changed (except at patch positions)
686        let mut ctx = array_session().create_execution_ctx();
687        let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
688
689        // Expected: all 5s except indices 1, 2, 3 which are patched to 10, 20, 30
690        let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
691        assert_arrays_eq!(expected, executed, &mut ctx);
692
693        Ok(())
694    }
695}