1use prost::Message;
5
6use crate::ArrayEq;
7use crate::ArrayHash;
8mod kernels;
9mod operations;
10mod slice;
11
12use std::hash::Hash;
13use std::hash::Hasher;
14
15use vortex_buffer::Buffer;
16use vortex_error::VortexExpect;
17use vortex_error::VortexResult;
18use vortex_error::vortex_panic;
19use vortex_session::VortexSession;
20use vortex_session::registry::CachedId;
21
22use crate::ArrayRef;
23use crate::Canonical;
24use crate::ExecutionCtx;
25use crate::ExecutionResult;
26use crate::IntoArray;
27use crate::Precision;
28use crate::array::Array;
29use crate::array::ArrayId;
30use crate::array::ArrayParts;
31use crate::array::ArrayView;
32use crate::array::VTable;
33use crate::array::ValidityChild;
34use crate::array::ValidityVTableFromChild;
35use crate::arrays::Primitive;
36use crate::arrays::PrimitiveArray;
37use crate::arrays::patched::PatchedArrayExt;
38use crate::arrays::patched::PatchedArraySlotsExt;
39use crate::arrays::patched::PatchedData;
40use crate::arrays::patched::PatchedSlots;
41use crate::arrays::patched::PatchedSlotsView;
42use crate::arrays::patched::compute::rules::PARENT_RULES;
43use crate::arrays::patched::vtable::kernels::PARENT_KERNELS;
44use crate::arrays::primitive::PrimitiveDataParts;
45use crate::buffer::BufferHandle;
46use crate::builders::ArrayBuilder;
47use crate::builders::PrimitiveBuilder;
48use crate::dtype::DType;
49use crate::dtype::NativePType;
50use crate::dtype::PType;
51use crate::match_each_native_ptype;
52use crate::require_child;
53use crate::serde::ArrayChildren;
54
55pub type PatchedArray = Array<Patched>;
57
58#[derive(Clone, Debug)]
59pub struct Patched;
60
61impl ValidityChild<Patched> for Patched {
62 fn validity_child(array: ArrayView<'_, Patched>) -> ArrayRef {
63 array.inner().clone()
64 }
65}
66
67#[derive(Clone, prost::Message)]
68pub struct PatchedMetadata {
69 #[prost(uint32, tag = "1")]
71 pub(crate) n_patches: u32,
72
73 #[prost(uint32, tag = "2")]
75 pub(crate) n_lanes: u32,
76
77 #[prost(uint32, tag = "3")]
81 pub(crate) offset: u32,
82}
83
84impl ArrayHash for PatchedData {
85 fn array_hash<H: Hasher>(&self, state: &mut H, _precision: Precision) {
86 self.offset.hash(state);
87 self.n_lanes.hash(state);
88 }
89}
90
91impl ArrayEq for PatchedData {
92 fn array_eq(&self, other: &Self, _precision: Precision) -> bool {
93 self.offset == other.offset && self.n_lanes == other.n_lanes
94 }
95}
96
97impl VTable for Patched {
98 type ArrayData = PatchedData;
99 type OperationsVTable = Self;
100 type ValidityVTable = ValidityVTableFromChild;
101
102 fn id(&self) -> ArrayId {
103 static ID: CachedId = CachedId::new("vortex.patched");
104 *ID
105 }
106
107 fn validate(
108 &self,
109 data: &PatchedData,
110 dtype: &DType,
111 len: usize,
112 slots: &[Option<ArrayRef>],
113 ) -> VortexResult<()> {
114 data.validate(dtype, len, &PatchedSlotsView::from_slots(slots))
115 }
116
117 fn nbuffers(_array: ArrayView<'_, Self>) -> usize {
118 0
119 }
120
121 fn buffer(_array: ArrayView<'_, Self>, idx: usize) -> BufferHandle {
122 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
123 }
124
125 fn buffer_name(_array: ArrayView<'_, Self>, idx: usize) -> Option<String> {
126 vortex_panic!("invalid buffer index for PatchedArray: {idx}");
127 }
128
129 fn child(array: ArrayView<'_, Self>, idx: usize) -> ArrayRef {
130 match idx {
131 PatchedSlots::INNER => array.inner().clone(),
132 PatchedSlots::LANE_OFFSETS => array.lane_offsets().clone(),
133 PatchedSlots::PATCH_INDICES => array.patch_indices().clone(),
134 PatchedSlots::PATCH_VALUES => array.patch_values().clone(),
135 _ => vortex_panic!("invalid child index for PatchedArray: {idx}"),
136 }
137 }
138
139 fn serialize(
140 array: ArrayView<'_, Self>,
141 _session: &VortexSession,
142 ) -> VortexResult<Option<Vec<u8>>> {
143 Ok(Some(
144 PatchedMetadata {
145 n_patches: u32::try_from(array.patch_indices().len())?,
146 n_lanes: u32::try_from(array.n_lanes())?,
147 offset: u32::try_from(array.offset())?,
148 }
149 .encode_to_vec(),
150 ))
151 }
152
153 fn deserialize(
154 &self,
155 dtype: &DType,
156 len: usize,
157 metadata: &[u8],
158 _buffers: &[BufferHandle],
159 children: &dyn ArrayChildren,
160 _session: &VortexSession,
161 ) -> VortexResult<ArrayParts<Self>> {
162 let metadata = PatchedMetadata::decode(metadata)?;
163 let n_patches = metadata.n_patches as usize;
164 let n_lanes = metadata.n_lanes as usize;
165 let offset = metadata.offset as usize;
166
167 let n_chunks = (len + offset).div_ceil(1024);
170
171 let inner = children.get(0, dtype, len)?;
172 let lane_offsets = children.get(1, PType::U32.into(), n_chunks * n_lanes + 1)?;
173 let indices = children.get(2, PType::U16.into(), n_patches)?;
174 let values = children.get(3, dtype, n_patches)?;
175
176 let data = PatchedData { n_lanes, offset };
177 let slots = PatchedSlots {
178 inner,
179 lane_offsets,
180 patch_indices: indices,
181 patch_values: values,
182 }
183 .into_slots();
184 Ok(ArrayParts::new(self.clone(), dtype.clone(), len, data).with_slots(slots))
185 }
186
187 fn append_to_builder(
188 array: ArrayView<'_, Self>,
189 builder: &mut dyn ArrayBuilder,
190 ctx: &mut ExecutionCtx,
191 ) -> VortexResult<()> {
192 let dtype = array.array().dtype();
193
194 if !dtype.is_primitive() {
195 let canonical = array
197 .array()
198 .clone()
199 .execute::<Canonical>(ctx)?
200 .into_array();
201 builder.extend_from_array(&canonical);
202 return Ok(());
203 }
204
205 let ptype = dtype.as_ptype();
206
207 let len = array.len();
208
209 array.inner().append_to_builder(builder, ctx)?;
210
211 let offset = array.offset();
212 let lane_offsets = array
213 .lane_offsets()
214 .clone()
215 .execute::<PrimitiveArray>(ctx)?;
216 let indices = array
217 .patch_indices()
218 .clone()
219 .execute::<PrimitiveArray>(ctx)?;
220 let values = array
221 .patch_values()
222 .clone()
223 .execute::<PrimitiveArray>(ctx)?;
224
225 match_each_native_ptype!(ptype, |V| {
226 let typed_builder = builder
227 .as_any_mut()
228 .downcast_mut::<PrimitiveBuilder<V>>()
229 .vortex_expect("correctly typed builder");
230
231 let output = typed_builder.values_mut();
234 let trailer = output.len() - len;
235
236 apply_patches_primitive::<V>(
237 &mut output[trailer..],
238 offset,
239 len,
240 array.n_lanes(),
241 lane_offsets.as_slice::<u32>(),
242 indices.as_slice::<u16>(),
243 values.as_slice::<V>(),
244 );
245 });
246
247 Ok(())
248 }
249
250 fn slot_name(_array: ArrayView<'_, Self>, idx: usize) -> String {
251 PatchedSlots::NAMES[idx].to_string()
252 }
253
254 fn execute(array: Array<Self>, _ctx: &mut ExecutionCtx) -> VortexResult<ExecutionResult> {
255 let array = require_child!(array, array.inner(), PatchedSlots::INNER => Primitive);
256 let array =
257 require_child!(array, array.lane_offsets(), PatchedSlots::LANE_OFFSETS => Primitive);
258 let array =
259 require_child!(array, array.patch_indices(), PatchedSlots::PATCH_INDICES => Primitive);
260 let array =
261 require_child!(array, array.patch_values(), PatchedSlots::PATCH_VALUES => Primitive);
262
263 let len = array.len();
264
265 let n_lanes = array.n_lanes;
266 let offset = array.offset;
267 let slots = match array.try_into_parts() {
268 Ok(parts) => PatchedSlots::from_slots(parts.slots),
269 Err(array) => PatchedSlotsView::from_slots(array.slots()).to_owned(),
270 };
271
272 let PrimitiveDataParts {
274 buffer,
275 ptype,
276 validity,
277 } = slots.inner.downcast::<Primitive>().into_data_parts();
278
279 let values = slots.patch_values.downcast::<Primitive>();
280 let lane_offsets = slots.lane_offsets.downcast::<Primitive>();
281 let patch_indices = slots.patch_indices.downcast::<Primitive>();
282
283 let patched_values = match_each_native_ptype!(values.ptype(), |V| {
284 let mut output = Buffer::<V>::from_byte_buffer(buffer.unwrap_host()).into_mut();
285
286 apply_patches_primitive::<V>(
287 &mut output,
288 offset,
289 len,
290 n_lanes,
291 lane_offsets.as_slice::<u32>(),
292 patch_indices.as_slice::<u16>(),
293 values.as_slice::<V>(),
294 );
295
296 let output = output.freeze();
297
298 PrimitiveArray::from_byte_buffer(output.into_byte_buffer(), ptype, validity)
299 });
300
301 Ok(ExecutionResult::done(patched_values.into_array()))
302 }
303
304 fn execute_parent(
305 array: ArrayView<'_, Self>,
306 parent: &ArrayRef,
307 child_idx: usize,
308 ctx: &mut ExecutionCtx,
309 ) -> VortexResult<Option<ArrayRef>> {
310 PARENT_KERNELS.execute(array, parent, child_idx, ctx)
311 }
312
313 fn reduce_parent(
314 array: ArrayView<'_, Self>,
315 parent: &ArrayRef,
316 child_idx: usize,
317 ) -> VortexResult<Option<ArrayRef>> {
318 PARENT_RULES.evaluate(array, parent, child_idx)
319 }
320}
321
322fn apply_patches_primitive<V: NativePType>(
324 output: &mut [V],
325 offset: usize,
326 len: usize,
327 n_lanes: usize,
328 lane_offsets: &[u32],
329 indices: &[u16],
330 values: &[V],
331) {
332 let n_chunks = (offset + len).div_ceil(1024);
333 for chunk in 0..n_chunks {
334 let start = lane_offsets[chunk * n_lanes] as usize;
335 let stop = lane_offsets[chunk * n_lanes + n_lanes] as usize;
336
337 for idx in start..stop {
338 let index = chunk * 1024 + indices[idx] as usize;
340 if index < offset || index >= offset + len {
341 continue;
342 }
343
344 let value = values[idx];
345 output[index - offset] = value;
346 }
347 }
348}
349
350#[cfg(test)]
351mod tests {
352 use rstest::rstest;
353 use vortex_buffer::ByteBufferMut;
354 use vortex_buffer::buffer;
355 use vortex_buffer::buffer_mut;
356 use vortex_error::VortexResult;
357 use vortex_session::VortexSession;
358 use vortex_session::registry::ReadContext;
359
360 use crate::ArrayContext;
361 use crate::Canonical;
362 use crate::ExecutionCtx;
363 use crate::IntoArray;
364 use crate::LEGACY_SESSION;
365 use crate::arrays::Patched;
366 use crate::arrays::PatchedArray;
367 use crate::arrays::PrimitiveArray;
368 use crate::arrays::patched::PatchedArraySlotsExt;
369 use crate::arrays::patched::PatchedSlots;
370 use crate::arrays::patched::PatchedSlotsView;
371 use crate::assert_arrays_eq;
372 use crate::builders::builder_with_capacity;
373 use crate::patches::Patches;
374 use crate::serde::SerializeOptions;
375 use crate::serde::SerializedArray;
376 use crate::session::ArraySessionExt;
377 use crate::validity::Validity;
378
379 #[test]
380 fn test_execute() {
381 let values = buffer![0u16; 1024].into_array();
382 let patches = Patches::new(
383 1024,
384 0,
385 buffer![1u32, 2, 3].into_array(),
386 buffer![1u16; 3].into_array(),
387 None,
388 )
389 .unwrap();
390
391 let session = VortexSession::empty();
392 let mut ctx = ExecutionCtx::new(session);
393
394 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
395 .unwrap()
396 .into_array();
397
398 let executed = array
399 .execute::<Canonical>(&mut ctx)
400 .unwrap()
401 .into_primitive()
402 .into_buffer::<u16>();
403
404 let mut expected = buffer_mut![0u16; 1024];
405 expected[1] = 1;
406 expected[2] = 1;
407 expected[3] = 1;
408
409 assert_eq!(executed, expected.freeze());
410 }
411
412 #[test]
413 fn test_execute_sliced() {
414 let values = buffer![0u16; 1024].into_array();
415 let patches = Patches::new(
416 1024,
417 0,
418 buffer![1u32, 2, 3].into_array(),
419 buffer![1u16; 3].into_array(),
420 None,
421 )
422 .unwrap();
423
424 let session = VortexSession::empty();
425 let mut ctx = ExecutionCtx::new(session);
426
427 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
428 .unwrap()
429 .into_array()
430 .slice(3..1024)
431 .unwrap();
432
433 let executed = array
434 .execute::<Canonical>(&mut ctx)
435 .unwrap()
436 .into_primitive()
437 .into_buffer::<u16>();
438
439 let mut expected = buffer_mut![0u16; 1021];
440 expected[0] = 1;
441
442 assert_eq!(executed, expected.freeze());
443 }
444
445 #[test]
446 fn test_append_to_builder_non_nullable() {
447 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
448 let patches = Patches::new(
449 1024,
450 0,
451 buffer![1u32, 2, 3].into_array(),
452 buffer![10u16, 20, 30].into_array(),
453 None,
454 )
455 .unwrap();
456
457 let session = VortexSession::empty();
458 let mut ctx = ExecutionCtx::new(session);
459
460 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
461 .unwrap()
462 .into_array();
463
464 let mut builder = builder_with_capacity(array.dtype(), array.len());
465 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
466
467 let result = builder.finish();
468
469 let mut expected = buffer_mut![0u16; 1024];
470 expected[1] = 10;
471 expected[2] = 20;
472 expected[3] = 30;
473 let expected = expected.into_array();
474
475 assert_arrays_eq!(expected, result);
476 }
477
478 #[test]
479 fn test_append_to_builder_sliced() {
480 let values = PrimitiveArray::new(buffer![0u16; 1024], Validity::NonNullable).into_array();
481 let patches = Patches::new(
482 1024,
483 0,
484 buffer![1u32, 2, 3].into_array(),
485 buffer![10u16, 20, 30].into_array(),
486 None,
487 )
488 .unwrap();
489
490 let session = VortexSession::empty();
491 let mut ctx = ExecutionCtx::new(session);
492
493 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
494 .unwrap()
495 .into_array()
496 .slice(3..1024)
497 .unwrap();
498
499 let mut builder = builder_with_capacity(array.dtype(), array.len());
500 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
501
502 let result = builder.finish();
503
504 let mut expected = buffer_mut![0u16; 1021];
505 expected[0] = 30;
506 let expected = expected.into_array();
507
508 assert_arrays_eq!(expected, result);
509 }
510
511 #[test]
512 fn test_append_to_builder_with_validity() {
513 let validity = Validity::from_iter((0..10).map(|i| i != 0 && i != 5));
515 let values = PrimitiveArray::new(buffer![0u16; 10], validity).into_array();
516
517 let patches = Patches::new(
519 10,
520 0,
521 buffer![1u32, 2, 3].into_array(),
522 buffer![10u16, 20, 30].into_array(),
523 None,
524 )
525 .unwrap();
526
527 let session = VortexSession::empty();
528 let mut ctx = ExecutionCtx::new(session);
529
530 let array = Patched::from_array_and_patches(values, &patches, &mut ctx)
531 .unwrap()
532 .into_array();
533
534 let mut builder = builder_with_capacity(array.dtype(), array.len());
535 array.append_to_builder(builder.as_mut(), &mut ctx).unwrap();
536
537 let result = builder.finish();
538
539 let expected = PrimitiveArray::from_option_iter([
541 None,
542 Some(10u16),
543 Some(20),
544 Some(30),
545 Some(0),
546 None,
547 Some(0),
548 Some(0),
549 Some(0),
550 Some(0),
551 ])
552 .into_array();
553
554 assert_arrays_eq!(expected, result);
555 }
556
557 fn make_patched_array(
558 inner: impl IntoIterator<Item = u16>,
559 patch_indices: &[u32],
560 patch_values: &[u16],
561 ) -> VortexResult<PatchedArray> {
562 let values: Vec<u16> = inner.into_iter().collect();
563 let len = values.len();
564 let array = PrimitiveArray::from_iter(values).into_array();
565
566 let indices = PrimitiveArray::from_iter(patch_indices.iter().copied()).into_array();
567 let patch_vals = PrimitiveArray::from_iter(patch_values.iter().copied()).into_array();
568
569 let patches = Patches::new(len, 0, indices, patch_vals, None)?;
570
571 let session = VortexSession::empty();
572 let mut ctx = ExecutionCtx::new(session);
573
574 Patched::from_array_and_patches(array, &patches, &mut ctx)
575 }
576
577 #[rstest]
578 #[case::basic(
579 make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap().into_array()
580 )]
581 #[case::multi_chunk(
582 make_patched_array(vec![0u16; 4096], &[100, 1500, 2500, 3500], &[11, 22, 33, 44]).unwrap().into_array()
583 )]
584 #[case::sliced({
585 let arr = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30]).unwrap();
586 arr.into_array().slice(2..1024).unwrap()
587 })]
588 fn test_serde_roundtrip(#[case] array: crate::ArrayRef) {
589 let dtype = array.dtype().clone();
590 let len = array.len();
591
592 LEGACY_SESSION.arrays().register(Patched);
593
594 let ctx = ArrayContext::empty().with_registry(LEGACY_SESSION.arrays().registry().clone());
595 let serialized = array
596 .serialize(&ctx, &LEGACY_SESSION, &SerializeOptions::default())
597 .unwrap();
598
599 let mut concat = ByteBufferMut::empty();
601 for buf in serialized {
602 concat.extend_from_slice(buf.as_ref());
603 }
604 let concat = concat.freeze();
605
606 let parts = SerializedArray::try_from(concat).unwrap();
607 let decoded = parts
608 .decode(
609 &dtype,
610 len,
611 &ReadContext::new(ctx.to_ids()),
612 &LEGACY_SESSION,
613 )
614 .unwrap();
615
616 assert!(decoded.is::<Patched>());
617 assert_eq!(
618 array.display_values().to_string(),
619 decoded.display_values().to_string()
620 );
621 }
622
623 #[test]
624 fn test_with_slots_basic() -> VortexResult<()> {
625 let array = make_patched_array(vec![0u16; 1024], &[1, 2, 3], &[10, 20, 30])?;
626
627 let slots = PatchedSlots::from_slots(array.as_array().slots().to_vec());
629 let view = PatchedSlotsView::from_slots(array.as_array().slots());
630 assert_eq!(view.inner.len(), array.inner().len());
631
632 let array_ref = array.into_array();
634 let new_array = array_ref.clone().with_slots(slots.into_slots())?;
635
636 assert!(new_array.is::<Patched>());
637 assert_eq!(array_ref.len(), new_array.len());
638 assert_eq!(array_ref.dtype(), new_array.dtype());
639
640 let mut ctx = ExecutionCtx::new(VortexSession::empty());
642 let original_executed = array_ref.execute::<Canonical>(&mut ctx)?.into_primitive();
643 let new_executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
644
645 assert_arrays_eq!(original_executed, new_executed);
646
647 Ok(())
648 }
649
650 #[test]
651 fn test_with_slots_modified_inner() -> VortexResult<()> {
652 let array = make_patched_array(vec![0u16; 10], &[1, 2, 3], &[10, 20, 30])?;
653
654 let new_inner = PrimitiveArray::from_iter(vec![5u16; 10]).into_array();
656 let slots = PatchedSlots {
657 inner: new_inner,
658 lane_offsets: array.lane_offsets().clone(),
659 patch_indices: array.patch_indices().clone(),
660 patch_values: array.patch_values().clone(),
661 };
662
663 let array_ref = array.into_array();
664 let new_array = array_ref.with_slots(slots.into_slots())?;
665
666 let mut ctx = ExecutionCtx::new(VortexSession::empty());
668 let executed = new_array.execute::<Canonical>(&mut ctx)?.into_primitive();
669
670 let expected = PrimitiveArray::from_iter([5u16, 10, 20, 30, 5, 5, 5, 5, 5, 5]);
672 assert_arrays_eq!(expected, executed);
673
674 Ok(())
675 }
676}