1use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20use vortex_session::registry::CachedId;
21
22use crate::ArrayRef;
23use crate::Canonical;
24use crate::ExecutionCtx;
25use crate::ExecutionResult;
26use crate::IntoArray;
27use crate::Precision;
28use crate::array::Array;
29use crate::array::ArrayId;
30use crate::array::ArrayParts;
31use crate::array::ArrayView;
32use crate::array::VTable;
33use crate::array::ValidityChild;
34use crate::array::ValidityVTableFromChild;
35use crate::arrays::Primitive;
36use crate::arrays::PrimitiveArray;
37use crate::arrays::patched::PatchedArrayExt;
38use crate::arrays::patched::PatchedArraySlotsExt;
39use crate::arrays::patched::PatchedData;
40use crate::arrays::patched::PatchedSlots;
41use crate::arrays::patched::PatchedSlotsView;
42use crate::arrays::patched::compute::rules::PARENT_RULES;
43use crate::arrays::patched::vtable::kernels::PARENT_KERNELS;
44use crate::arrays::primitive::PrimitiveDataParts;
45use crate::buffer::BufferHandle;
46use crate::builders::ArrayBuilder;
47use crate::builders::PrimitiveBuilder;
48use crate::dtype::DType;
49use crate::dtype::NativePType;
50use crate::dtype::PType;
51use crate::match_each_native_ptype;
52use crate::require_child;
53use crate::serde::ArrayChildren;
54
55pub type PatchedArray = Array<Patched>;
57
58#[derive(Clone, Debug)]
59pub struct Patched;
60
61impl ValidityChild<Patched> for Patched {
62 fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
63 array.inner().clone()
64 }
65}
66
67#[derive(Clone, prost::Message)]
68pub struct PatchedMetadata {
69 #[prost(uint32, tag = "1")]
71 pub(crate) n_patches: u32,
72
73 #[prost(uint32, tag = "2")]
75 pub(crate) n_lanes: u32,
76
77 #[prost(uint32, tag = "3")]
81 pub(crate) offset: u32,
82}
83
84impl ArrayHash for PatchedData {
85 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
86 self.offset.hash(state);
87 self.n_lanes.hash(state);
88 }
89}
90
91impl ArrayEq for PatchedData {
92 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
93 self.offset == other.offset && self.n_lanes == other.n_lanes
94 }
95}
96
97impl VTable for Patched {
98 type TypedArrayData = PatchedData;
99 type OperationsVTable = Self;
100 type ValidityVTable = ValidityVTableFromChild;
101
102 fn id(&self) -> ArrayId {
103 static ID: CachedId = CachedId::new("vortex.patched");
104 *ID
105 }
106
107 fn validate(
108 &self,
109 data: &PatchedData,
110 dtype: &DType,
111 len: usize,
112 slots: &[Option<ArrayRef>],
113 ) -> VortexResult<()> {
114 data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
115 }
116
117 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
118 0
119 }
120
121 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
122 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
123 }
124
125 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
126 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
127 }
128
129 fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
130 match idx {
131 PatchedSlots::INNER => array.inner().clone(),
132 PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
133 PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
134 PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
135 _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
136 }
137 }
138
139 fn serialize(
140 array: ArrayView<'_, Self>,
141 _session: &VortexSession,
142 ) -> VortexResult<Option<Vec<u8>>> {
143 Ok(Some(
144 PatchedMetadata {
145 n_patches: u32::try_from(array.patch_indices().len())?,
146 n_lanes: u32::try_from(array.n_lanes())?,
147 offset: u32::try_from(array.offset())?,
148 }
149 .encode_to_vec(),
150 ))
151 }
152
153 fn deserialize(
154 &self,
155 dtype: &DType,
156 len: usize,
157 metadata: &[u8],
158 _buffers: &[BufferHandle],
159 children: &dyn ArrayChildren,
160 _session: &VortexSession,
161 ) -> VortexResult<ArrayParts<Self>> {
162 let metadata = PatchedMetadata::decode(metadata)?;
163 let n_patches = metadata.n_patches as usize;
164 let n_lanes = metadata.n_lanes as usize;
165 let offset = metadata.offset as usize;
166
167 let n_chunks = (len + offset).div_ceil(1024);
170
171 let inner = children.get(0, dtype, len)?;
172 let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
173 let indices = children.get(2, PType::U16.into(), n_patches)?;
174 let values = children.get(3, dtype, n_patches)?;
175
176 let data = PatchedData { n_lanes, offset };
177 let slots = PatchedSlots {
178 inner,
179 lane_offsets,
180 patch_indices: indices,
181 patch_values: values,
182 }
183 .into_slots();
184 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
185 }
186
187 fn append_to_builder(
188 array: ArrayView<'_, Self>,
189 builder: &mut dyn ArrayBuilder,
190 ctx: &mut ExecutionCtx,
191 ) -> VortexResult<()> {
192 let dtype = array.array().dtype();
193
194 if !dtype.is_primitive() {
195 let canonical = array
197 .array()
198 .clone()
199 .execute::<Canonical>(ctx)?
200 .into_array();
201 builder.extend_from_array(&canonical);
202 return Ok(());
203 }
204
205 let ptype = dtype.as_ptype();
206
207 let len = array.len();
208
209 array.inner().append_to_builder(builder, ctx)?;
210
211 let offset = array.offset();
212 let lane_offsets = array
213 .lane_offsets()
214 .clone()
215 .execute::<PrimitiveArray>(ctx)?;
216 let indices = array
217 .patch_indices()
218 .clone()
219 .execute::<PrimitiveArray>(ctx)?;
220 let values = array
221 .patch_values()
222 .clone()
223 .execute::<PrimitiveArray>(ctx)?;
224
225 match_each_native_ptype!(ptype, |V| {
226 let typed_builder = builder
227 .as_any_mut()
228 .downcast_mut::<PrimitiveBuilder<V>>()
229 .vortex_expect("correctly typed builder");
230
231 let output = typed_builder.values_mut();
234 let trailer = output.len() - len;
235
236 apply_patches_primitive::<V>(
237 &mut output[trailer..],
238 offset,
239 len,
240 array.n_lanes(),
241 lane_offsets.as_slice::<u32>(),
242 indices.as_slice::<u16>(),
243 values.as_slice::<V>(),
244 );
245 });
246
247 Ok(())
248 }
249
250 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
251 PatchedSlots::NAMES[idx].to_string()
252 }
253
254 fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
255 let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
256 let array =
257 require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
258 let array =
259 require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
260 let array =
261 require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
262
263 let len = array.len();
264
265 let n_lanes = array.n_lanes;
266 let offset = array.offset;
267 let slots = match array.try_into_parts() {
268 Ok(parts) => PatchedSlots::from_slots(parts.slots),
269 Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
270 };
271
272 let PrimitiveDataParts {
274 buffer,
275 ptype,
276 validity,
277 } = slots.inner.downcast::<Primitive>().into_data_parts();
278
279 let values = slots.patch_values.downcast::<Primitive>();
280 let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
281 let patch_indices = slots.patch_indices.downcast::<Primitive>();
282
283 let patched_values = match_each_native_ptype!(values.ptype(), |V| {
284 let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
285
286 apply_patches_primitive::<V>(
287 &mut output,
288 offset,
289 len,
290 n_lanes,
291 lane_offsets.as_slice::<u32>(),
292 patch_indices.as_slice::<u16>(),
293 values.as_slice::<V>(),
294 );
295
296 let output = output.freeze();
297
298 PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
299 });
300
301 Ok(ExecutionResult::done(patched_values.into_array()))
302 }
303
304 fn execute_parent(
305 array: ArrayView<'_, Self>,
306 parent: &ArrayRef,
307 child_idx: usize,
308 ctx: &mut ExecutionCtx,
309 ) -> VortexResult<Option<ArrayRef>> {
310 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
311 }
312
313 fn reduce_parent(
314 array: ArrayView<'_, Self>,
315 parent: &ArrayRef,
316 child_idx: usize,
317 ) -> VortexResult<Option<ArrayRef>> {
318 PARENT_RULES.evaluate(array, parent, child_idx)
319 }
320}
321
322fn apply_patches_primitive<V: NativePType>(
324 output: &mut [V],
325 offset: usize,
326 len: usize,
327 n_lanes: usize,
328 lane_offsets: &[u32],
329 indices: &[u16],
330 values: &[V],
331) {
332 let n_chunks = (offset + len).div_ceil(1024);
333 for chunk in 0..n_chunks {
334 let start = lane_offsets[chunk * n_lanes] as usize;
335 let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
336
337 for idx in start..stop {
338 let index = chunk * 1024 + indices[idx] as usize;
340 if index < offset || index >= offset + len {
341 continue;
342 }
343
344 let value = values[idx];
345 output[index - offset] = value;
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use rstest::rstest;
353 use vortex_buffer::ByteBufferMut;
354 use vortex_buffer::buffer;
355 use vortex_buffer::buffer_mut;
356 use vortex_error::VortexResult;
357 use vortex_session::VortexSession;
358 use vortex_session::registry::ReadContext;
359
360 use crate::ArrayContext;
361 use crate::ArraySlots;
362 use crate::Canonical;
363 use crate::ExecutionCtx;
364 use crate::IntoArray;
365 use crate::LEGACY_SESSION;
366 use crate::arrays::Patched;
367 use crate::arrays::PatchedArray;
368 use crate::arrays::PrimitiveArray;
369 use crate::arrays::patched::PatchedArraySlotsExt;
370 use crate::arrays::patched::PatchedSlots;
371 use crate::arrays::patched::PatchedSlotsView;
372 use crate::assert_arrays_eq;
373 use crate::builders::builder_with_capacity;
374 use crate::patches::Patches;
375 use crate::serde::SerializeOptions;
376 use crate::serde::SerializedArray;
377 use crate::session::ArraySessionExt;
378 use crate::validity::Validity;
379
380 #[test]
381 fn test_execute() {
382 let values = buffer![0u16; 1024].into_array();
383 let patches = Patches::new(
384 1024,
385 0,
386 buffer![1u32, 2, 3].into_array(),
387 buffer![1u16; 3].into_array(),
388 None,
389 )
390 .unwrap();
391
392 let session = VortexSession::empty();
393 let mut ctx = ExecutionCtx::new(session);
394
395 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
396 .unwrap()
397 .into_array();
398
399 let executed = array
400 .execute::<Canonical>(&mut ctx)
401 .unwrap()
402 .into_primitive()
403 .into_buffer::<u16>();
404
405 let mut expected = buffer_mut![0u16; 1024];
406 expected[1] = 1;
407 expected[2] = 1;
408 expected[3] = 1;
409
410 assert_eq!(executed, expected.freeze());
411 }
412
413 #[test]
414 fn test_execute_sliced() {
415 let values = buffer![0u16; 1024].into_array();
416 let patches = Patches::new(
417 1024,
418 0,
419 buffer![1u32, 2, 3].into_array(),
420 buffer![1u16; 3].into_array(),
421 None,
422 )
423 .unwrap();
424
425 let session = VortexSession::empty();
426 let mut ctx = ExecutionCtx::new(session);
427
428 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
429 .unwrap()
430 .into_array()
431 .slice(3..1024)
432 .unwrap();
433
434 let executed = array
435 .execute::<Canonical>(&mut ctx)
436 .unwrap()
437 .into_primitive()
438 .into_buffer::<u16>();
439
440 let mut expected = buffer_mut![0u16; 1021];
441 expected[0] = 1;
442
443 assert_eq!(executed, expected.freeze());
444 }
445
446 #[test]
447 fn test_append_to_builder_non_nullable() {
448 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
449 let patches = Patches::new(
450 1024,
451 0,
452 buffer![1u32, 2, 3].into_array(),
453 buffer![10u16, 20, 30].into_array(),
454 None,
455 )
456 .unwrap();
457
458 let session = VortexSession::empty();
459 let mut ctx = ExecutionCtx::new(session);
460
461 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
462 .unwrap()
463 .into_array();
464
465 let mut builder = builder_with_capacity(array.dtype(), array.len());
466 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
467
468 let result = builder.finish();
469
470 let mut expected = buffer_mut![0u16; 1024];
471 expected[1] = 10;
472 expected[2] = 20;
473 expected[3] = 30;
474 let expected = expected.into_array();
475
476 assert_arrays_eq!(expected, result);
477 }
478
479 #[test]
480 fn test_append_to_builder_sliced() {
481 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
482 let patches = Patches::new(
483 1024,
484 0,
485 buffer![1u32, 2, 3].into_array(),
486 buffer![10u16, 20, 30].into_array(),
487 None,
488 )
489 .unwrap();
490
491 let session = VortexSession::empty();
492 let mut ctx = ExecutionCtx::new(session);
493
494 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
495 .unwrap()
496 .into_array()
497 .slice(3..1024)
498 .unwrap();
499
500 let mut builder = builder_with_capacity(array.dtype(), array.len());
501 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
502
503 let result = builder.finish();
504
505 let mut expected = buffer_mut![0u16; 1021];
506 expected[0] = 30;
507 let expected = expected.into_array();
508
509 assert_arrays_eq!(expected, result);
510 }
511
512 #[test]
513 fn test_append_to_builder_with_validity() {
514 let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
516 let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
517
518 let patches = Patches::new(
520 10,
521 0,
522 buffer![1u32, 2, 3].into_array(),
523 buffer![10u16, 20, 30].into_array(),
524 None,
525 )
526 .unwrap();
527
528 let session = VortexSession::empty();
529 let mut ctx = ExecutionCtx::new(session);
530
531 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
532 .unwrap()
533 .into_array();
534
535 let mut builder = builder_with_capacity(array.dtype(), array.len());
536 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
537
538 let result = builder.finish();
539
540 let expected = PrimitiveArray::from_option_iter([
542 None,
543 Some(10u16),
544 Some(20),
545 Some(30),
546 Some(0),
547 None,
548 Some(0),
549 Some(0),
550 Some(0),
551 Some(0),
552 ])
553 .into_array();
554
555 assert_arrays_eq!(expected, result);
556 }
557
558 fn make_patched_array(
559 inner: impl IntoIterator<Item = u16>,
560 patch_indices: &[u32],
561 patch_values: &[u16],
562 ) -> VortexResult<PatchedArray> {
563 let values: Vec<u16> = inner.into_iter().collect();
564 let len = values.len();
565 let array = PrimitiveArray::from_iter(values).into_array();
566
567 let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
568 let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
569
570 let patches = Patches::new(len, 0, indices, patch_vals, None)?;
571
572 let session = VortexSession::empty();
573 let mut ctx = ExecutionCtx::new(session);
574
575 Patched::from_array_and_patches(array, &patches, &mut ctx)
576 }
577
578 #[rstest]
579 #[case::basic(
580 make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
581 )]
582 #[case::multi_chunk(
583 make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
584 )]
585 #[case::sliced({
586 let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
587 arr.into_array().slice(2..1024).unwrap()
588 })]
589 fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
590 let dtype = array.dtype().clone();
591 let len = array.len();
592
593 LEGACY_SESSION.arrays().register(Patched);
594
595 let ctx = ArrayContext::empty().with_registry(LEGACY_SESSION.arrays().registry().clone());
596 let serialized = array
597 .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
598 .unwrap();
599
600 let mut concat = ByteBufferMut::empty();
602 for buf in serialized {
603 concat.extend_from_slice(buf.as_ref());
604 }
605 let concat = concat.freeze();
606
607 let parts = SerializedArray::try_from(concat).unwrap();
608 let decoded = parts
609 .decode(
610 &dtype,
611 len,
612 &ReadContext::new(ctx.to_ids()),
613 &LEGACY_SESSION,
614 )
615 .unwrap();
616
617 assert!(decoded.is::<Patched>());
618 assert_eq!(
619 array.display_values().to_string(),
620 decoded.display_values().to_string()
621 );
622 }
623
624 #[test]
625 fn test_with_slots_basic() -> VortexResult<()> {
626 let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
627
628 let slots = PatchedSlots::from_slots(
630 array
631 .as_array()
632 .slots()
633 .iter()
634 .cloned()
635 .collect::<ArraySlots>(),
636 );
637 let view = PatchedSlotsView::from_slots(array.as_array().slots());
638 assert_eq!(view.inner.len(), array.inner().len());
639
640 let array_ref = array.into_array();
642 let new_array = array_ref.clone().with_slots(slots.into_slots())?;
643
644 assert!(new_array.is::<Patched>());
645 assert_eq!(array_ref.len(), new_array.len());
646 assert_eq!(array_ref.dtype(), new_array.dtype());
647
648 let mut ctx = ExecutionCtx::new(VortexSession::empty());
650 let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
651 let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
652
653 assert_arrays_eq!(original_executed, new_executed);
654
655 Ok(())
656 }
657
658 #[test]
659 fn test_with_slots_modified_inner() -> VortexResult<()> {
660 let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
661
662 let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
664 let slots = PatchedSlots {
665 inner: new_inner,
666 lane_offsets: array.lane_offsets().clone(),
667 patch_indices: array.patch_indices().clone(),
668 patch_values: array.patch_values().clone(),
669 };
670
671 let array_ref = array.into_array();
672 let new_array = array_ref.with_slots(slots.into_slots())?;
673
674 let mut ctx = ExecutionCtx::new(VortexSession::empty());
676 let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
677
678 let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
680 assert_arrays_eq!(expected, executed);
681
682 Ok(())
683 }
684}