1use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20use vortex_session::registry::CachedId;
21
22use crate::ArrayRef;
23use crate::Canonical;
24use crate::EqMode;
25use crate::ExecutionCtx;
26use crate::ExecutionResult;
27use crate::IntoArray;
28use crate::array::Array;
29use crate::array::ArrayId;
30use crate::array::ArrayParts;
31use crate::array::ArrayView;
32use crate::array::VTable;
33use crate::array::ValidityChild;
34use crate::array::ValidityVTableFromChild;
35use crate::array::with_empty_buffers;
36use crate::arrays::Primitive;
37use crate::arrays::PrimitiveArray;
38use crate::arrays::patched::PatchedArrayExt;
39use crate::arrays::patched::PatchedArraySlotsExt;
40use crate::arrays::patched::PatchedData;
41use crate::arrays::patched::PatchedSlots;
42use crate::arrays::patched::PatchedSlotsView;
43use crate::arrays::patched::compute::rules::PARENT_RULES;
44use crate::arrays::primitive::PrimitiveDataParts;
45use crate::buffer::BufferHandle;
46use crate::builders::ArrayBuilder;
47use crate::builders::PrimitiveBuilder;
48use crate::dtype::DType;
49use crate::dtype::NativePType;
50use crate::dtype::PType;
51use crate::match_each_native_ptype;
52use crate::require_child;
53use crate::serde::ArrayChildren;
54
55pub type PatchedArray = Array<Patched>;
57
58pub(crate) fn initialize(session: &VortexSession) {
59 kernels::initialize(session);
60}
61
62#[derive(Clone, Debug)]
63pub struct Patched;
64
65impl ValidityChild<Patched> for Patched {
66 fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
67 array.inner().clone()
68 }
69}
70
71#[derive(Clone, prost::Message)]
72pub struct PatchedMetadata {
73 #[prost(uint32, tag = "1")]
75 pub(crate) n_patches: u32,
76
77 #[prost(uint32, tag = "2")]
79 pub(crate) n_lanes: u32,
80
81 #[prost(uint32, tag = "3")]
85 pub(crate) offset: u32,
86}
87
88impl ArrayHash for PatchedData {
89 fn array_hash<H: Hasher>(&self, state: &mut H, _accuracy: EqMode) {
90 self.offset.hash(state);
91 self.n_lanes.hash(state);
92 }
93}
94
95impl ArrayEq for PatchedData {
96 fn array_eq(&self, other: &Self, _accuracy: EqMode) -> bool {
97 self.offset == other.offset && self.n_lanes == other.n_lanes
98 }
99}
100
101impl VTable for Patched {
102 type TypedArrayData = PatchedData;
103 type OperationsVTable = Self;
104 type ValidityVTable = ValidityVTableFromChild;
105
106 fn id(&self) -> ArrayId {
107 static ID: CachedId = CachedId::new("vortex.patched");
108 *ID
109 }
110
111 fn validate(
112 &self,
113 data: &PatchedData,
114 dtype: &DType,
115 len: usize,
116 slots: &[Option<ArrayRef>],
117 ) -> VortexResult<()> {
118 data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
119 }
120
121 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
122 0
123 }
124
125 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
126 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
127 }
128
129 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
130 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
131 }
132
133 fn with_buffers(
134 &self,
135 array: ArrayView<'_, Self>,
136 buffers: &[BufferHandle],
137 ) -> VortexResult<ArrayParts<Self>> {
138 with_empty_buffers(self, array, buffers)
139 }
140
141 fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
142 match idx {
143 PatchedSlots::INNER => array.inner().clone(),
144 PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
145 PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
146 PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
147 _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
148 }
149 }
150
151 fn serialize(
152 array: ArrayView<'_, Self>,
153 _session: &VortexSession,
154 ) -> VortexResult<Option<Vec<u8>>> {
155 Ok(Some(
156 PatchedMetadata {
157 n_patches: u32::try_from(array.patch_indices().len())?,
158 n_lanes: u32::try_from(array.n_lanes())?,
159 offset: u32::try_from(array.offset())?,
160 }
161 .encode_to_vec(),
162 ))
163 }
164
165 fn deserialize(
166 &self,
167 dtype: &DType,
168 len: usize,
169 metadata: &[u8],
170 _buffers: &[BufferHandle],
171 children: &dyn ArrayChildren,
172 _session: &VortexSession,
173 ) -> VortexResult<ArrayParts<Self>> {
174 let metadata = PatchedMetadata::decode(metadata)?;
175 let n_patches = metadata.n_patches as usize;
176 let n_lanes = metadata.n_lanes as usize;
177 let offset = metadata.offset as usize;
178
179 let n_chunks = (len + offset).div_ceil(1024);
182
183 let inner = children.get(0, dtype, len)?;
184 let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
185 let indices = children.get(2, PType::U16.into(), n_patches)?;
186 let values = children.get(3, dtype, n_patches)?;
187
188 let data = PatchedData { n_lanes, offset };
189 let slots = PatchedSlots {
190 inner,
191 lane_offsets,
192 patch_indices: indices,
193 patch_values: values,
194 }
195 .into_slots();
196 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
197 }
198
199 fn append_to_builder(
200 array: ArrayView<'_, Self>,
201 builder: &mut dyn ArrayBuilder,
202 ctx: &mut ExecutionCtx,
203 ) -> VortexResult<()> {
204 let dtype = array.array().dtype();
205
206 if !dtype.is_primitive() {
207 let canonical = array
209 .array()
210 .clone()
211 .execute::<Canonical>(ctx)?
212 .into_array();
213 builder.extend_from_array(&canonical);
214 return Ok(());
215 }
216
217 let ptype = dtype.as_ptype();
218
219 let len = array.len();
220
221 array.inner().append_to_builder(builder, ctx)?;
222
223 let offset = array.offset();
224 let lane_offsets = array
225 .lane_offsets()
226 .clone()
227 .execute::<PrimitiveArray>(ctx)?;
228 let indices = array
229 .patch_indices()
230 .clone()
231 .execute::<PrimitiveArray>(ctx)?;
232 let values = array
233 .patch_values()
234 .clone()
235 .execute::<PrimitiveArray>(ctx)?;
236
237 match_each_native_ptype!(ptype, |V| {
238 let typed_builder = builder
239 .as_any_mut()
240 .downcast_mut::<PrimitiveBuilder<V>>()
241 .vortex_expect("correctly typed builder");
242
243 let output = typed_builder.values_mut();
246 let trailer = output.len() - len;
247
248 apply_patches_primitive::<V>(
249 &mut output[trailer..],
250 offset,
251 len,
252 array.n_lanes(),
253 lane_offsets.as_slice::<u32>(),
254 indices.as_slice::<u16>(),
255 values.as_slice::<V>(),
256 );
257 });
258
259 Ok(())
260 }
261
262 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
263 PatchedSlots::NAMES[idx].to_string()
264 }
265
266 fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
267 let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
268 let array =
269 require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
270 let array =
271 require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
272 let array =
273 require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
274
275 let len = array.len();
276
277 let n_lanes = array.n_lanes;
278 let offset = array.offset;
279 let slots = match array.try_into_parts() {
280 Ok(parts) => PatchedSlots::from_slots(parts.slots),
281 Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
282 };
283
284 let PrimitiveDataParts {
286 buffer,
287 ptype,
288 validity,
289 } = slots.inner.downcast::<Primitive>().into_data_parts();
290
291 let values = slots.patch_values.downcast::<Primitive>();
292 let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
293 let patch_indices = slots.patch_indices.downcast::<Primitive>();
294
295 let patched_values = match_each_native_ptype!(values.ptype(), |V| {
296 let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
297
298 apply_patches_primitive::<V>(
299 &mut output,
300 offset,
301 len,
302 n_lanes,
303 lane_offsets.as_slice::<u32>(),
304 patch_indices.as_slice::<u16>(),
305 values.as_slice::<V>(),
306 );
307
308 let output = output.freeze();
309
310 PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
311 });
312
313 Ok(ExecutionResult::done(patched_values.into_array()))
314 }
315
316 fn reduce_parent(
317 array: ArrayView<'_, Self>,
318 parent: &ArrayRef,
319 child_idx: usize,
320 ) -> VortexResult<Option<ArrayRef>> {
321 PARENT_RULES.evaluate(array, parent, child_idx)
322 }
323}
324
325fn apply_patches_primitive<V: NativePType>(
327 output: &mut [V],
328 offset: usize,
329 len: usize,
330 n_lanes: usize,
331 lane_offsets: &[u32],
332 indices: &[u16],
333 values: &[V],
334) {
335 let n_chunks = (offset + len).div_ceil(1024);
336 for chunk in 0..n_chunks {
337 let start = lane_offsets[chunk * n_lanes] as usize;
338 let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
339
340 for idx in start..stop {
341 let index = chunk * 1024 + indices[idx] as usize;
343 if index < offset || index >= offset + len {
344 continue;
345 }
346
347 let value = values[idx];
348 output[index - offset] = value;
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use rstest::rstest;
356 use vortex_buffer::ByteBufferMut;
357 use vortex_buffer::buffer;
358 use vortex_buffer::buffer_mut;
359 use vortex_error::VortexResult;
360 use vortex_session::registry::ReadContext;
361
362 use crate::Array;
363 use crate::ArrayContext;
364 use crate::ArrayParts;
365 use crate::ArraySlots;
366 use crate::Canonical;
367 use crate::IntoArray;
368 use crate::VortexSessionExecute;
369 use crate::array_session;
370 use crate::arrays::Patched;
371 use crate::arrays::PatchedArray;
372 use crate::arrays::PrimitiveArray;
373 use crate::arrays::patched::PatchedArrayExt;
374 use crate::arrays::patched::PatchedArraySlotsExt;
375 use crate::arrays::patched::PatchedData;
376 use crate::arrays::patched::PatchedSlots;
377 use crate::arrays::patched::PatchedSlotsView;
378 use crate::assert_arrays_eq;
379 use crate::builders::builder_with_capacity;
380 use crate::patches::Patches;
381 use crate::serde::SerializeOptions;
382 use crate::serde::SerializedArray;
383 use crate::session::ArraySessionExt;
384 use crate::validity::Validity;
385
386 #[test]
387 fn test_execute() {
388 let values = buffer![0u16; 1024].into_array();
389 let patches = Patches::new(
390 1024,
391 0,
392 buffer![1u32, 2, 3].into_array(),
393 buffer![1u16; 3].into_array(),
394 None,
395 )
396 .unwrap();
397
398 let session = array_session();
399 let mut ctx = session.create_execution_ctx();
400
401 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
402 .unwrap()
403 .into_array();
404
405 let executed = array
406 .execute::<Canonical>(&mut ctx)
407 .unwrap()
408 .into_primitive()
409 .into_buffer::<u16>();
410
411 let mut expected = buffer_mut![0u16; 1024];
412 expected[1] = 1;
413 expected[2] = 1;
414 expected[3] = 1;
415
416 assert_eq!(executed, expected.freeze());
417 }
418
419 #[test]
420 fn test_execute_sliced() {
421 let values = buffer![0u16; 1024].into_array();
422 let patches = Patches::new(
423 1024,
424 0,
425 buffer![1u32, 2, 3].into_array(),
426 buffer![1u16; 3].into_array(),
427 None,
428 )
429 .unwrap();
430
431 let session = array_session();
432 let mut ctx = session.create_execution_ctx();
433
434 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
435 .unwrap()
436 .into_array()
437 .slice(3..1024)
438 .unwrap();
439
440 let executed = array
441 .execute::<Canonical>(&mut ctx)
442 .unwrap()
443 .into_primitive()
444 .into_buffer::<u16>();
445
446 let mut expected = buffer_mut![0u16; 1021];
447 expected[0] = 1;
448
449 assert_eq!(executed, expected.freeze());
450 }
451
452 #[test]
453 fn test_append_to_builder_non_nullable() {
454 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
455 let patches = Patches::new(
456 1024,
457 0,
458 buffer![1u32, 2, 3].into_array(),
459 buffer![10u16, 20, 30].into_array(),
460 None,
461 )
462 .unwrap();
463
464 let session = array_session();
465 let mut ctx = session.create_execution_ctx();
466
467 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
468 .unwrap()
469 .into_array();
470
471 let mut builder = builder_with_capacity(array.dtype(), array.len());
472 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
473
474 let result = builder.finish();
475
476 let mut expected = buffer_mut![0u16; 1024];
477 expected[1] = 10;
478 expected[2] = 20;
479 expected[3] = 30;
480 let expected = expected.into_array();
481
482 assert_arrays_eq!(expected, result, &mut ctx);
483 }
484
485 #[test]
486 fn test_append_to_builder_sliced() {
487 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
488 let patches = Patches::new(
489 1024,
490 0,
491 buffer![1u32, 2, 3].into_array(),
492 buffer![10u16, 20, 30].into_array(),
493 None,
494 )
495 .unwrap();
496
497 let session = array_session();
498 let mut ctx = session.create_execution_ctx();
499
500 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
501 .unwrap()
502 .into_array()
503 .slice(3..1024)
504 .unwrap();
505
506 let mut builder = builder_with_capacity(array.dtype(), array.len());
507 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
508
509 let result = builder.finish();
510
511 let mut expected = buffer_mut![0u16; 1021];
512 expected[0] = 30;
513 let expected = expected.into_array();
514
515 assert_arrays_eq!(expected, result, &mut ctx);
516 }
517
518 #[test]
519 fn test_append_to_builder_with_validity() {
520 let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
522 let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
523
524 let patches = Patches::new(
526 10,
527 0,
528 buffer![1u32, 2, 3].into_array(),
529 buffer![10u16, 20, 30].into_array(),
530 None,
531 )
532 .unwrap();
533
534 let session = array_session();
535 let mut ctx = session.create_execution_ctx();
536
537 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
538 .unwrap()
539 .into_array();
540
541 let mut builder = builder_with_capacity(array.dtype(), array.len());
542 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
543
544 let result = builder.finish();
545
546 let expected = PrimitiveArray::from_option_iter([
548 None,
549 Some(10u16),
550 Some(20),
551 Some(30),
552 Some(0),
553 None,
554 Some(0),
555 Some(0),
556 Some(0),
557 Some(0),
558 ])
559 .into_array();
560
561 assert_arrays_eq!(expected, result, &mut ctx);
562 }
563
564 fn make_patched_array(
565 inner: impl IntoIterator<Item = u16>,
566 patch_indices: &[u32],
567 patch_values: &[u16],
568 ) -> VortexResult<PatchedArray> {
569 let values: Vec<u16> = inner.into_iter().collect();
570 let len = values.len();
571 let array = PrimitiveArray::from_iter(values).into_array();
572
573 let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
574 let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
575
576 let patches = Patches::new(len, 0, indices, patch_vals, None)?;
577
578 let session = array_session();
579 let mut ctx = session.create_execution_ctx();
580
581 Patched::from_array_and_patches(array, &patches, &mut ctx)
582 }
583
584 #[rstest]
585 #[case::basic(
586 make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
587 )]
588 #[case::multi_chunk(
589 make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
590 )]
591 #[case::sliced({
592 let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
593 arr.into_array().slice(2..1024).unwrap()
594 })]
595 fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
596 let dtype = array.dtype().clone();
597 let len = array.len();
598
599 let session = array_session();
600 session.arrays().register(Patched);
601
602 let ctx = ArrayContext::empty().with_registry(session.arrays().registry().clone());
603 let serialized = array
604 .serialize(&ctx, &session, &SerializeOptions::default())
605 .unwrap();
606
607 let mut concat = ByteBufferMut::empty();
609 for buf in serialized {
610 concat.extend_from_slice(buf.as_ref());
611 }
612 let concat = concat.freeze();
613
614 let parts = SerializedArray::try_from(concat).unwrap();
615 let decoded = parts
616 .decode(&dtype, len, &ReadContext::new(ctx.to_ids()), &session)
617 .unwrap();
618
619 assert!(decoded.is::<Patched>());
620 assert_eq!(
621 array.display_values().to_string(),
622 decoded.display_values().to_string()
623 );
624 }
625
626 #[test]
627 fn test_with_slots_basic() -> VortexResult<()> {
628 let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
629
630 let slots = PatchedSlots::from_slots(
632 array
633 .as_array()
634 .slots()
635 .iter()
636 .cloned()
637 .collect::<ArraySlots>(),
638 );
639 let view = PatchedSlotsView::from_slots(array.as_array().slots());
640 assert_eq!(view.inner.len(), array.inner().len());
641
642 let array_ref = array.into_array();
644 let new_array = unsafe { array_ref.clone().with_slots(slots.into_slots()) }?;
647
648 assert!(new_array.is::<Patched>());
649 assert_eq!(array_ref.len(), new_array.len());
650 assert_eq!(array_ref.dtype(), new_array.dtype());
651
652 let mut ctx = array_session().create_execution_ctx();
654 let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
655 let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
656
657 assert_arrays_eq!(original_executed, new_executed, &mut ctx);
658
659 Ok(())
660 }
661
662 #[test]
663 fn test_rebuild_modified_inner_from_parts() -> VortexResult<()> {
664 let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
665
666 let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
668 let slots = PatchedSlots {
669 inner: new_inner,
670 lane_offsets: array.lane_offsets().clone(),
671 patch_indices: array.patch_indices().clone(),
672 patch_values: array.patch_values().clone(),
673 };
674
675 let data = PatchedData {
676 n_lanes: array.n_lanes(),
677 offset: array.offset(),
678 };
679 let new_array = Array::try_from_parts(
680 ArrayParts::new(Patched, array.dtype().clone(), array.len(), data)
681 .with_slots(slots.into_slots()),
682 )?
683 .into_array();
684
685 let mut ctx = array_session().create_execution_ctx();
687 let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
688
689 let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
691 assert_arrays_eq!(expected, executed, &mut ctx);
692
693 Ok(())
694 }
695}