1use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20
21use crate::ArrayRef;
22use crate::Canonical;
23use crate::ExecutionCtx;
24use crate::ExecutionResult;
25use crate::IntoArray;
26use crate::Precision;
27use crate::array::Array;
28use crate::array::ArrayId;
29use crate::array::ArrayParts;
30use crate::array::ArrayView;
31use crate::array::VTable;
32use crate::array::ValidityChild;
33use crate::array::ValidityVTableFromChild;
34use crate::arrays::Primitive;
35use crate::arrays::PrimitiveArray;
36use crate::arrays::patched::PatchedArrayExt;
37use crate::arrays::patched::PatchedArraySlotsExt;
38use crate::arrays::patched::PatchedData;
39use crate::arrays::patched::PatchedSlots;
40use crate::arrays::patched::PatchedSlotsView;
41use crate::arrays::patched::compute::rules::PARENT_RULES;
42use crate::arrays::patched::vtable::kernels::PARENT_KERNELS;
43use crate::arrays::primitive::PrimitiveDataParts;
44use crate::buffer::BufferHandle;
45use crate::builders::ArrayBuilder;
46use crate::builders::PrimitiveBuilder;
47use crate::dtype::DType;
48use crate::dtype::NativePType;
49use crate::dtype::PType;
50use crate::match_each_native_ptype;
51use crate::require_child;
52use crate::serde::ArrayChildren;
53
54pub type PatchedArray = Array<Patched>;
56
57#[derive(Clone, Debug)]
58pub struct Patched;
59
60impl ValidityChild<Patched> for Patched {
61 fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
62 array.inner().clone()
63 }
64}
65
66#[derive(Clone, prost::Message)]
67pub struct PatchedMetadata {
68 #[prost(uint32, tag = "1")]
70 pub(crate) n_patches: u32,
71
72 #[prost(uint32, tag = "2")]
74 pub(crate) n_lanes: u32,
75
76 #[prost(uint32, tag = "3")]
80 pub(crate) offset: u32,
81}
82
83impl ArrayHash for PatchedData {
84 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
85 self.offset.hash(state);
86 self.n_lanes.hash(state);
87 }
88}
89
90impl ArrayEq for PatchedData {
91 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
92 self.offset == other.offset && self.n_lanes == other.n_lanes
93 }
94}
95
96impl VTable for Patched {
97 type ArrayData = PatchedData;
98 type OperationsVTable = Self;
99 type ValidityVTable = ValidityVTableFromChild;
100
101 fn id(&self) -> ArrayId {
102 ArrayId::new_ref("vortex.patched")
103 }
104
105 fn validate(
106 &self,
107 data: &PatchedData,
108 dtype: &DType,
109 len: usize,
110 slots: &[Option<ArrayRef>],
111 ) -> VortexResult<()> {
112 data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
113 }
114
115 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
116 0
117 }
118
119 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
120 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
121 }
122
123 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
124 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
125 }
126
127 fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
128 match idx {
129 PatchedSlots::INNER => array.inner().clone(),
130 PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
131 PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
132 PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
133 _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
134 }
135 }
136
137 fn serialize(
138 array: ArrayView<'_, Self>,
139 _session: &VortexSession,
140 ) -> VortexResult<Option<Vec<u8>>> {
141 Ok(Some(
142 PatchedMetadata {
143 n_patches: u32::try_from(array.patch_indices().len())?,
144 n_lanes: u32::try_from(array.n_lanes())?,
145 offset: u32::try_from(array.offset())?,
146 }
147 .encode_to_vec(),
148 ))
149 }
150
151 fn deserialize(
152 &self,
153 dtype: &DType,
154 len: usize,
155 metadata: &[u8],
156 _buffers: &[BufferHandle],
157 children: &dyn ArrayChildren,
158 _session: &VortexSession,
159 ) -> VortexResult<ArrayParts<Self>> {
160 let metadata = PatchedMetadata::decode(metadata)?;
161 let n_patches = metadata.n_patches as usize;
162 let n_lanes = metadata.n_lanes as usize;
163 let offset = metadata.offset as usize;
164
165 let n_chunks = (len + offset).div_ceil(1024);
168
169 let inner = children.get(0, dtype, len)?;
170 let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
171 let indices = children.get(2, PType::U16.into(), n_patches)?;
172 let values = children.get(3, dtype, n_patches)?;
173
174 let data = PatchedData { n_lanes, offset };
175 let slots = PatchedSlots {
176 inner,
177 lane_offsets,
178 patch_indices: indices,
179 patch_values: values,
180 }
181 .into_slots();
182 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
183 }
184
185 fn append_to_builder(
186 array: ArrayView<'_, Self>,
187 builder: &mut dyn ArrayBuilder,
188 ctx: &mut ExecutionCtx,
189 ) -> VortexResult<()> {
190 let dtype = array.array().dtype();
191
192 if !dtype.is_primitive() {
193 let canonical = array
195 .array()
196 .clone()
197 .execute::<Canonical>(ctx)?
198 .into_array();
199 builder.extend_from_array(&canonical);
200 return Ok(());
201 }
202
203 let ptype = dtype.as_ptype();
204
205 let len = array.len();
206
207 array.inner().append_to_builder(builder, ctx)?;
208
209 let offset = array.offset();
210 let lane_offsets = array
211 .lane_offsets()
212 .clone()
213 .execute::<PrimitiveArray>(ctx)?;
214 let indices = array
215 .patch_indices()
216 .clone()
217 .execute::<PrimitiveArray>(ctx)?;
218 let values = array
219 .patch_values()
220 .clone()
221 .execute::<PrimitiveArray>(ctx)?;
222
223 match_each_native_ptype!(ptype, |V| {
224 let typed_builder = builder
225 .as_any_mut()
226 .downcast_mut::<PrimitiveBuilder<V>>()
227 .vortex_expect("correctly typed builder");
228
229 let output = typed_builder.values_mut();
232 let trailer = output.len() - len;
233
234 apply_patches_primitive::<V>(
235 &mut output[trailer..],
236 offset,
237 len,
238 array.n_lanes(),
239 lane_offsets.as_slice::<u32>(),
240 indices.as_slice::<u16>(),
241 values.as_slice::<V>(),
242 );
243 });
244
245 Ok(())
246 }
247
248 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
249 PatchedSlots::NAMES[idx].to_string()
250 }
251
252 fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
253 let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
254 let array =
255 require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
256 let array =
257 require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
258 let array =
259 require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
260
261 let len = array.len();
262
263 let n_lanes = array.n_lanes;
264 let offset = array.offset;
265 let slots = match array.try_into_parts() {
266 Ok(parts) => PatchedSlots::from_slots(parts.slots),
267 Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
268 };
269
270 let PrimitiveDataParts {
272 buffer,
273 ptype,
274 validity,
275 } = slots.inner.downcast::<Primitive>().into_data_parts();
276
277 let values = slots.patch_values.downcast::<Primitive>();
278 let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
279 let patch_indices = slots.patch_indices.downcast::<Primitive>();
280
281 let patched_values = match_each_native_ptype!(values.ptype(), |V| {
282 let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
283
284 apply_patches_primitive::<V>(
285 &mut output,
286 offset,
287 len,
288 n_lanes,
289 lane_offsets.as_slice::<u32>(),
290 patch_indices.as_slice::<u16>(),
291 values.as_slice::<V>(),
292 );
293
294 let output = output.freeze();
295
296 PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
297 });
298
299 Ok(ExecutionResult::done(patched_values.into_array()))
300 }
301
302 fn execute_parent(
303 array: ArrayView<'_, Self>,
304 parent: &ArrayRef,
305 child_idx: usize,
306 ctx: &mut ExecutionCtx,
307 ) -> VortexResult<Option<ArrayRef>> {
308 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
309 }
310
311 fn reduce_parent(
312 array: ArrayView<'_, Self>,
313 parent: &ArrayRef,
314 child_idx: usize,
315 ) -> VortexResult<Option<ArrayRef>> {
316 PARENT_RULES.evaluate(array, parent, child_idx)
317 }
318}
319
320#[allow(clippy::too_many_arguments)]
322fn apply_patches_primitive<V: NativePType>(
323 output: &mut [V],
324 offset: usize,
325 len: usize,
326 n_lanes: usize,
327 lane_offsets: &[u32],
328 indices: &[u16],
329 values: &[V],
330) {
331 let n_chunks = (offset + len).div_ceil(1024);
332 for chunk in 0..n_chunks {
333 let start = lane_offsets[chunk * n_lanes] as usize;
334 let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
335
336 for idx in start..stop {
337 let index = chunk * 1024 + indices[idx] as usize;
339 if index < offset || index >= offset + len {
340 continue;
341 }
342
343 let value = values[idx];
344 output[index - offset] = value;
345 }
346 }
347}
348
349#[cfg(test)]
350mod tests {
351 use rstest::rstest;
352 use vortex_buffer::ByteBufferMut;
353 use vortex_buffer::buffer;
354 use vortex_buffer::buffer_mut;
355 use vortex_error::VortexResult;
356 use vortex_session::VortexSession;
357 use vortex_session::registry::ReadContext;
358
359 use crate::ArrayContext;
360 use crate::Canonical;
361 use crate::ExecutionCtx;
362 use crate::IntoArray;
363 use crate::LEGACY_SESSION;
364 use crate::arrays::Patched;
365 use crate::arrays::PatchedArray;
366 use crate::arrays::PrimitiveArray;
367 use crate::arrays::patched::PatchedArraySlotsExt;
368 use crate::arrays::patched::PatchedSlots;
369 use crate::arrays::patched::PatchedSlotsView;
370 use crate::assert_arrays_eq;
371 use crate::builders::builder_with_capacity;
372 use crate::patches::Patches;
373 use crate::serde::SerializeOptions;
374 use crate::serde::SerializedArray;
375 use crate::validity::Validity;
376
377 #[test]
378 fn test_execute() {
379 let values = buffer![0u16; 1024].into_array();
380 let patches = Patches::new(
381 1024,
382 0,
383 buffer![1u32, 2, 3].into_array(),
384 buffer![1u16; 3].into_array(),
385 None,
386 )
387 .unwrap();
388
389 let session = VortexSession::empty();
390 let mut ctx = ExecutionCtx::new(session);
391
392 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
393 .unwrap()
394 .into_array();
395
396 let executed = array
397 .execute::<Canonical>(&mut ctx)
398 .unwrap()
399 .into_primitive()
400 .into_buffer::<u16>();
401
402 let mut expected = buffer_mut![0u16; 1024];
403 expected[1] = 1;
404 expected[2] = 1;
405 expected[3] = 1;
406
407 assert_eq!(executed, expected.freeze());
408 }
409
410 #[test]
411 fn test_execute_sliced() {
412 let values = buffer![0u16; 1024].into_array();
413 let patches = Patches::new(
414 1024,
415 0,
416 buffer![1u32, 2, 3].into_array(),
417 buffer![1u16; 3].into_array(),
418 None,
419 )
420 .unwrap();
421
422 let session = VortexSession::empty();
423 let mut ctx = ExecutionCtx::new(session);
424
425 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
426 .unwrap()
427 .into_array()
428 .slice(3..1024)
429 .unwrap();
430
431 let executed = array
432 .execute::<Canonical>(&mut ctx)
433 .unwrap()
434 .into_primitive()
435 .into_buffer::<u16>();
436
437 let mut expected = buffer_mut![0u16; 1021];
438 expected[0] = 1;
439
440 assert_eq!(executed, expected.freeze());
441 }
442
443 #[test]
444 fn test_append_to_builder_non_nullable() {
445 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
446 let patches = Patches::new(
447 1024,
448 0,
449 buffer![1u32, 2, 3].into_array(),
450 buffer![10u16, 20, 30].into_array(),
451 None,
452 )
453 .unwrap();
454
455 let session = VortexSession::empty();
456 let mut ctx = ExecutionCtx::new(session);
457
458 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
459 .unwrap()
460 .into_array();
461
462 let mut builder = builder_with_capacity(array.dtype(), array.len());
463 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
464
465 let result = builder.finish();
466
467 let mut expected = buffer_mut![0u16; 1024];
468 expected[1] = 10;
469 expected[2] = 20;
470 expected[3] = 30;
471 let expected = expected.into_array();
472
473 assert_arrays_eq!(expected, result);
474 }
475
476 #[test]
477 fn test_append_to_builder_sliced() {
478 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
479 let patches = Patches::new(
480 1024,
481 0,
482 buffer![1u32, 2, 3].into_array(),
483 buffer![10u16, 20, 30].into_array(),
484 None,
485 )
486 .unwrap();
487
488 let session = VortexSession::empty();
489 let mut ctx = ExecutionCtx::new(session);
490
491 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
492 .unwrap()
493 .into_array()
494 .slice(3..1024)
495 .unwrap();
496
497 let mut builder = builder_with_capacity(array.dtype(), array.len());
498 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
499
500 let result = builder.finish();
501
502 let mut expected = buffer_mut![0u16; 1021];
503 expected[0] = 30;
504 let expected = expected.into_array();
505
506 assert_arrays_eq!(expected, result);
507 }
508
509 #[test]
510 fn test_append_to_builder_with_validity() {
511 let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
513 let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
514
515 let patches = Patches::new(
517 10,
518 0,
519 buffer![1u32, 2, 3].into_array(),
520 buffer![10u16, 20, 30].into_array(),
521 None,
522 )
523 .unwrap();
524
525 let session = VortexSession::empty();
526 let mut ctx = ExecutionCtx::new(session);
527
528 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
529 .unwrap()
530 .into_array();
531
532 let mut builder = builder_with_capacity(array.dtype(), array.len());
533 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
534
535 let result = builder.finish();
536
537 let expected = PrimitiveArray::from_option_iter([
539 None,
540 Some(10u16),
541 Some(20),
542 Some(30),
543 Some(0),
544 None,
545 Some(0),
546 Some(0),
547 Some(0),
548 Some(0),
549 ])
550 .into_array();
551
552 assert_arrays_eq!(expected, result);
553 }
554
555 fn make_patched_array(
556 inner: impl IntoIterator<Item = u16>,
557 patch_indices: &[u32],
558 patch_values: &[u16],
559 ) -> VortexResult<PatchedArray> {
560 let values: Vec<u16> = inner.into_iter().collect();
561 let len = values.len();
562 let array = PrimitiveArray::from_iter(values).into_array();
563
564 let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
565 let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
566
567 let patches = Patches::new(len, 0, indices, patch_vals, None)?;
568
569 let session = VortexSession::empty();
570 let mut ctx = ExecutionCtx::new(session);
571
572 Patched::from_array_and_patches(array, &patches, &mut ctx)
573 }
574
575 #[rstest]
576 #[case::basic(
577 make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
578 )]
579 #[case::multi_chunk(
580 make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
581 )]
582 #[case::sliced({
583 let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
584 arr.into_array().slice(2..1024).unwrap()
585 })]
586 fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
587 let dtype = array.dtype().clone();
588 let len = array.len();
589
590 let ctx = ArrayContext::empty();
591 let serialized = array
592 .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
593 .unwrap();
594
595 let mut concat = ByteBufferMut::empty();
597 for buf in serialized {
598 concat.extend_from_slice(buf.as_ref());
599 }
600 let concat = concat.freeze();
601
602 let parts = SerializedArray::try_from(concat).unwrap();
603 let decoded = parts
604 .decode(
605 &dtype,
606 len,
607 &ReadContext::new(ctx.to_ids()),
608 &LEGACY_SESSION,
609 )
610 .unwrap();
611
612 assert!(decoded.is::<Patched>());
613 assert_eq!(
614 array.display_values().to_string(),
615 decoded.display_values().to_string()
616 );
617 }
618
619 #[test]
620 fn test_with_slots_basic() -> VortexResult<()> {
621 let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
622
623 let slots = PatchedSlots::from_slots(array.as_array().slots().to_vec());
625 let view = PatchedSlotsView::from_slots(array.as_array().slots());
626 assert_eq!(view.inner.len(), array.inner().len());
627
628 let array_ref = array.into_array();
630 let new_array = array_ref.clone().with_slots(slots.into_slots())?;
631
632 assert!(new_array.is::<Patched>());
633 assert_eq!(array_ref.len(), new_array.len());
634 assert_eq!(array_ref.dtype(), new_array.dtype());
635
636 let mut ctx = ExecutionCtx::new(VortexSession::empty());
638 let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
639 let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
640
641 assert_arrays_eq!(original_executed, new_executed);
642
643 Ok(())
644 }
645
646 #[test]
647 fn test_with_slots_modified_inner() -> VortexResult<()> {
648 let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
649
650 let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
652 let slots = PatchedSlots {
653 inner: new_inner,
654 lane_offsets: array.lane_offsets().clone(),
655 patch_indices: array.patch_indices().clone(),
656 patch_values: array.patch_values().clone(),
657 };
658
659 let array_ref = array.into_array();
660 let new_array = array_ref.with_slots(slots.into_slots())?;
661
662 let mut ctx = ExecutionCtx::new(VortexSession::empty());
664 let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
665
666 let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
668 assert_arrays_eq!(expected, executed);
669
670 Ok(())
671 }
672}