vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use itertools::Itertools;
10use num_traits::NumCast;
11use vortex_buffer::BitBuffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_dtype::IntegerPType;
15use vortex_dtype::NativePType;
16use vortex_dtype::Nullability::NonNullable;
17use vortex_dtype::PType;
18use vortex_dtype::UnsignedPType;
19use vortex_dtype::match_each_integer_ptype;
20use vortex_dtype::match_each_unsigned_integer_ptype;
21use vortex_error::VortexError;
22use vortex_error::VortexExpect;
23use vortex_error::VortexResult;
24use vortex_error::vortex_bail;
25use vortex_error::vortex_err;
26use vortex_error::vortex_panic;
27use vortex_mask::AllOr;
28use vortex_mask::Mask;
29use vortex_mask::MaskMut;
30use vortex_scalar::PValue;
31use vortex_scalar::Scalar;
32use vortex_utils::aliases::hash_map::HashMap;
33
34use crate::Array;
35use crate::ArrayRef;
36use crate::IntoArray;
37use crate::ToCanonical;
38use crate::arrays::PrimitiveArray;
39use crate::compute::cast;
40use crate::compute::filter;
41use crate::compute::is_sorted;
42use crate::compute::take;
43use crate::search_sorted::SearchResult;
44use crate::search_sorted::SearchSorted;
45use crate::search_sorted::SearchSortedSide;
46use crate::validity::Validity;
47use crate::vtable::ValidityHelper;
48
49/// One patch index offset is stored for each chunk.
50/// This allows for constant time patch index lookups.
51const PATCH_CHUNK_SIZE: usize = 1024;
52
53#[derive(Copy, Clone, prost::Message)]
54pub struct PatchesMetadata {
55    #[prost(uint64, tag = "1")]
56    len: u64,
57    #[prost(uint64, tag = "2")]
58    offset: u64,
59    #[prost(enumeration = "PType", tag = "3")]
60    indices_ptype: i32,
61    #[prost(uint64, optional, tag = "4")]
62    chunk_offsets_len: Option<u64>,
63    #[prost(enumeration = "PType", optional, tag = "5")]
64    chunk_offsets_ptype: Option<i32>,
65    #[prost(uint64, optional, tag = "6")]
66    offset_within_chunk: Option<u64>,
67}
68
69impl PatchesMetadata {
70    #[inline]
71    pub fn new(
72        len: usize,
73        offset: usize,
74        indices_ptype: PType,
75        chunk_offsets_len: Option<usize>,
76        chunk_offsets_ptype: Option<PType>,
77        offset_within_chunk: Option<usize>,
78    ) -> Self {
79        Self {
80            len: len as u64,
81            offset: offset as u64,
82            indices_ptype: indices_ptype as i32,
83            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
84            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
85            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
86        }
87    }
88
89    #[inline]
90    pub fn len(&self) -> usize {
91        usize::try_from(self.len).vortex_expect("len is a valid usize")
92    }
93
94    #[inline]
95    pub fn is_empty(&self) -> bool {
96        self.len == 0
97    }
98
99    #[inline]
100    pub fn offset(&self) -> usize {
101        usize::try_from(self.offset).vortex_expect("offset is a valid usize")
102    }
103
104    #[inline]
105    pub fn chunk_offsets_dtype(&self) -> Option<DType> {
106        self.chunk_offsets_ptype
107            .map(|t| {
108                PType::try_from(t)
109                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
110                    .vortex_expect("invalid i32 value for PType")
111            })
112            .map(|ptype| DType::Primitive(ptype, NonNullable))
113    }
114
115    #[inline]
116    pub fn indices_dtype(&self) -> DType {
117        assert!(
118            self.indices_ptype().is_unsigned_int(),
119            "Patch indices must be unsigned integers"
120        );
121        DType::Primitive(self.indices_ptype(), NonNullable)
122    }
123}
124
125/// A helper for working with patched arrays.
126#[derive(Debug, Clone)]
127pub struct Patches {
128    array_len: usize,
129    offset: usize,
130    indices: ArrayRef,
131    values: ArrayRef,
132    /// Stores the patch index offset for each chunk.
133    ///
134    /// This allows us to lookup the patches for a given chunk in constant time via
135    /// `patch_indices[chunk_offsets[i]..chunk_offsets[i+1]]`.
136    ///
137    /// This is optional for compatibility reasons.
138    chunk_offsets: Option<ArrayRef>,
139    /// Chunk offsets are only sliced off in case the slice is fully
140    /// outside of the chunk range.
141    ///
142    /// Though the range for indices and values is sliced in terms of
143    /// individual elements, not chunks. To account for that we do a
144    /// saturating sub when adjusting the indices based on the chunk offset.
145    ///
146    /// `offset_within_chunk` is necessary in order to keep track of how many
147    /// elements were sliced off within the chunk.
148    offset_within_chunk: Option<usize>,
149}
150
151impl Patches {
152    pub fn new(
153        array_len: usize,
154        offset: usize,
155        indices: ArrayRef,
156        values: ArrayRef,
157        chunk_offsets: Option<ArrayRef>,
158    ) -> Self {
159        assert_eq!(
160            indices.len(),
161            values.len(),
162            "Patch indices and values must have the same length"
163        );
164        assert!(
165            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
166            "Patch indices must be non-nullable unsigned integers, got {:?}",
167            indices.dtype()
168        );
169        assert!(
170            indices.len() <= array_len,
171            "Patch indices must be shorter than the array length"
172        );
173        assert!(!indices.is_empty(), "Patch indices must not be empty");
174        let max = usize::try_from(&indices.scalar_at(indices.len() - 1))
175            .vortex_expect("indices must be a number");
176        assert!(
177            max - offset < array_len,
178            "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
179        );
180
181        debug_assert!(
182            is_sorted(indices.as_ref())
183                .unwrap_or(Some(false))
184                .unwrap_or(false),
185            "Patch indices must be sorted"
186        );
187
188        Self {
189            array_len,
190            offset,
191            indices,
192            values,
193            chunk_offsets: chunk_offsets.clone(),
194            // Initialize with `Some(0)` only if `chunk_offsets` are set.
195            offset_within_chunk: chunk_offsets.map(|_| 0),
196        }
197    }
198
199    /// Construct new patches without validating any of the arguments
200    ///
201    /// # Safety
202    ///
203    /// Users have to assert that
204    /// * Indices and values have the same length
205    /// * Indices is an unsigned integer type
206    /// * Indices must be sorted
207    /// * Last value in indices is smaller than array_len
208    pub unsafe fn new_unchecked(
209        array_len: usize,
210        offset: usize,
211        indices: ArrayRef,
212        values: ArrayRef,
213        chunk_offsets: Option<ArrayRef>,
214        offset_within_chunk: Option<usize>,
215    ) -> Self {
216        Self {
217            array_len,
218            offset,
219            indices,
220            values,
221            chunk_offsets,
222            offset_within_chunk,
223        }
224    }
225
226    #[inline]
227    pub fn array_len(&self) -> usize {
228        self.array_len
229    }
230
231    #[inline]
232    pub fn num_patches(&self) -> usize {
233        self.indices.len()
234    }
235
236    #[inline]
237    pub fn dtype(&self) -> &DType {
238        self.values.dtype()
239    }
240
241    #[inline]
242    pub fn indices(&self) -> &ArrayRef {
243        &self.indices
244    }
245
246    #[inline]
247    pub fn into_indices(self) -> ArrayRef {
248        self.indices
249    }
250
251    #[inline]
252    pub fn indices_mut(&mut self) -> &mut ArrayRef {
253        &mut self.indices
254    }
255
256    #[inline]
257    pub fn values(&self) -> &ArrayRef {
258        &self.values
259    }
260
261    #[inline]
262    pub fn into_values(self) -> ArrayRef {
263        self.values
264    }
265
266    #[inline]
267    pub fn values_mut(&mut self) -> &mut ArrayRef {
268        &mut self.values
269    }
270
271    #[inline]
272    pub fn offset(&self) -> usize {
273        self.offset
274    }
275
276    #[inline]
277    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
278        &self.chunk_offsets
279    }
280
281    #[inline]
282    pub fn chunk_offset_at(&self, idx: usize) -> usize {
283        let Some(chunk_offsets) = &self.chunk_offsets else {
284            vortex_panic!("chunk_offsets must be set to retrieve offset at index")
285        };
286
287        chunk_offsets
288            .scalar_at(idx)
289            .as_primitive()
290            .as_::<usize>()
291            .vortex_expect("chunk offset must be usize")
292    }
293
294    #[inline]
295    pub fn offset_within_chunk(&self) -> Option<usize> {
296        self.offset_within_chunk
297    }
298
299    #[inline]
300    pub fn indices_ptype(&self) -> PType {
301        PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
302    }
303
304    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
305        if self.indices.len() > len {
306            vortex_bail!(
307                "Patch indices {} are longer than the array length {}",
308                self.indices.len(),
309                len
310            );
311        }
312        if self.values.dtype() != dtype {
313            vortex_bail!(
314                "Patch values dtype {} does not match array dtype {}",
315                self.values.dtype(),
316                dtype
317            );
318        }
319        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
320        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
321
322        Ok(PatchesMetadata::new(
323            self.indices.len(),
324            self.offset,
325            self.indices.dtype().as_ptype(),
326            chunk_offsets_len,
327            chunk_offsets_ptype,
328            self.offset_within_chunk,
329        ))
330    }
331
332    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
333        // SAFETY: casting does not affect the relationship between the indices and values
334        unsafe {
335            Ok(Self::new_unchecked(
336                self.array_len,
337                self.offset,
338                self.indices,
339                cast(&self.values, values_dtype)?,
340                self.chunk_offsets,
341                self.offset_within_chunk,
342            ))
343        }
344    }
345
346    /// Get the patched value at a given index if it exists.
347    pub fn get_patched(&self, index: usize) -> Option<Scalar> {
348        self.search_index(index)
349            .to_found()
350            .map(|patch_idx| self.values().scalar_at(patch_idx))
351    }
352
353    /// Searches for `index` in the indices array.
354    ///
355    /// Chooses between chunked search when [`Self::chunk_offsets`] is
356    /// available, and binary search otherwise. The `index` parameter is
357    /// adjusted by [`Self::offset`] for both.
358    ///
359    /// # Arguments
360    /// * `index` - The index to search for
361    ///
362    /// # Returns
363    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
364    ///   position in the patches array
365    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
366    ///   a patch at this index would be inserted to maintain sorted order
367    pub fn search_index(&self, index: usize) -> SearchResult {
368        if self.chunk_offsets.is_some() {
369            return self.search_index_chunked(index);
370        }
371
372        Self::search_index_binary_search(&self.indices, index + self.offset)
373    }
374
375    /// Binary searches for `needle` in the indices array.
376    ///
377    /// # Returns
378    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
379    /// with the insertion point if not found.
380    fn search_index_binary_search(indices: &dyn Array, needle: usize) -> SearchResult {
381        if indices.is_canonical() {
382            let primitive = indices.to_primitive();
383            match_each_integer_ptype!(primitive.ptype(), |T| {
384                let Ok(needle) = T::try_from(needle) else {
385                    // If the needle is not of type T, then it cannot possibly be in this array.
386                    //
387                    // The needle is a non-negative integer (a usize); therefore, it must be larger
388                    // than all values in this array.
389                    return SearchResult::NotFound(primitive.len());
390                };
391                return primitive
392                    .as_slice::<T>()
393                    .search_sorted(&needle, SearchSortedSide::Left);
394            });
395        }
396        indices
397            .as_primitive_typed()
398            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
399    }
400
401    /// Constant time searches for `index` in the indices array.
402    ///
403    /// First determines which chunk the target index falls into, then performs
404    /// a binary search within that chunk's range.
405    ///
406    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
407    /// or the insertion point if not found.
408    ///
409    /// # Panics
410    /// Panics if `chunk_offsets` or `offset_within_chunk` are not set.
411    fn search_index_chunked(&self, index: usize) -> SearchResult {
412        let Some(chunk_offsets) = &self.chunk_offsets else {
413            vortex_panic!("chunk_offsets is required to be set")
414        };
415
416        let Some(offset_within_chunk) = self.offset_within_chunk else {
417            vortex_panic!("offset_within_chunk is required to be set")
418        };
419
420        if index >= self.array_len() {
421            return SearchResult::NotFound(self.indices().len());
422        }
423
424        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
425
426        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
427        let base_offset = self.chunk_offset_at(0);
428
429        let patches_start_idx = (self.chunk_offset_at(chunk_idx) - base_offset)
430            // Chunk offsets are only sliced off in case the slice is fully
431            // outside of the chunk range.
432            //
433            // Though the range for indices and values is sliced in terms of
434            // individual elements, not chunks. To account for that we do a
435            // saturating sub when adjusting the indices based on the chunk offset.
436            .saturating_sub(offset_within_chunk);
437
438        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
439            self.chunk_offset_at(chunk_idx + 1) - base_offset - offset_within_chunk
440        } else {
441            self.indices.len()
442        };
443
444        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx);
445        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset);
446
447        match result {
448            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
449            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
450        }
451    }
452
453    /// Batch version of `search_index`.
454    ///
455    /// In contrast to `search_index`, this function requires `indices` as
456    /// well as `chunk_offsets` to be passed as slices. This is to avoid
457    /// redundant canonicalization and `scalar_at` lookups across calls.
458    fn search_index_chunked_batch<T, O>(
459        &self,
460        indices: &[T],
461        chunk_offsets: &[O],
462        index: T,
463    ) -> SearchResult
464    where
465        T: UnsignedPType,
466        O: UnsignedPType,
467        usize: TryFrom<T>,
468        usize: TryFrom<O>,
469    {
470        let Some(offset_within_chunk) = self.offset_within_chunk else {
471            vortex_panic!("offset_within_chunk is required to be set")
472        };
473
474        let chunk_idx = {
475            let Ok(index) = usize::try_from(index) else {
476                // If the needle cannot be converted to usize, it's larger than all values in this array.
477                return SearchResult::NotFound(indices.len());
478            };
479
480            if index >= self.array_len() {
481                return SearchResult::NotFound(self.indices().len());
482            }
483
484            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
485        };
486
487        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
488        let Ok(chunk_offset) = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0]) else {
489            vortex_panic!("chunk_offset failed to convert to usize")
490        };
491
492        let patches_start_idx = chunk_offset
493            // Chunk offsets are only sliced off in case the slice is fully
494            // outside of the chunk range.
495            //
496            // Though the range for indices and values is sliced in terms of
497            // individual elements, not chunks. To account for that we do a
498            // saturating sub when adjusting the indices based on the chunk offset.
499            .saturating_sub(offset_within_chunk);
500
501        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
502            let base_offset_end = chunk_offsets[chunk_idx + 1];
503
504            let Some(offset_within_chunk) = O::from(offset_within_chunk) else {
505                vortex_panic!("offset_within_chunk failed to convert to O");
506            };
507
508            let Ok(patches_end_idx) =
509                usize::try_from(base_offset_end - chunk_offsets[0] - offset_within_chunk)
510            else {
511                vortex_panic!("patches_end_idx failed to convert to usize")
512            };
513
514            patches_end_idx
515        } else {
516            self.indices.len()
517        };
518
519        let Some(offset) = T::from(self.offset) else {
520            // If the offset cannot be converted to T, it's larger than all values in this array.
521            return SearchResult::NotFound(indices.len());
522        };
523
524        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
525        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left);
526
527        match result {
528            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
529            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
530        }
531    }
532
533    /// Returns the minimum patch index
534    pub fn min_index(&self) -> usize {
535        let first = self
536            .indices
537            .scalar_at(0)
538            .as_primitive()
539            .as_::<usize>()
540            .vortex_expect("non-null");
541        first - self.offset
542    }
543
544    /// Returns the maximum patch index
545    pub fn max_index(&self) -> usize {
546        let last = self
547            .indices
548            .scalar_at(self.indices.len() - 1)
549            .as_primitive()
550            .as_::<usize>()
551            .vortex_expect("non-null");
552        last - self.offset
553    }
554
555    /// Filter the patches by a mask, resulting in new patches for the filtered array.
556    pub fn filter(&self, mask: &Mask) -> VortexResult<Option<Self>> {
557        if mask.len() != self.array_len {
558            vortex_bail!(
559                "Filter mask length {} does not match array length {}",
560                mask.len(),
561                self.array_len
562            );
563        }
564
565        match mask.indices() {
566            AllOr::All => Ok(Some(self.clone())),
567            AllOr::None => Ok(None),
568            AllOr::Some(mask_indices) => {
569                let flat_indices = self.indices().to_primitive();
570                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
571                    filter_patches_with_mask(
572                        flat_indices.as_slice::<I>(),
573                        self.offset(),
574                        self.values(),
575                        mask_indices,
576                    )
577                })
578            }
579        }
580    }
581
582    /// Mask the patches, REMOVING the patches where the mask is true.
583    /// Unlike filter, this preserves the patch indices.
584    /// Unlike mask on a single array, this does not set masked values to null.
585    pub fn mask(&self, mask: &Mask) -> VortexResult<Option<Self>> {
586        if mask.len() != self.array_len {
587            vortex_bail!(
588                "Filter mask length {} does not match array length {}",
589                mask.len(),
590                self.array_len
591            );
592        }
593
594        let filter_mask = match mask.bit_buffer() {
595            AllOr::All => return Ok(None),
596            AllOr::None => return Ok(Some(self.clone())),
597            AllOr::Some(masked) => {
598                let patch_indices = self.indices().to_primitive();
599                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
600                    let patch_indices = patch_indices.as_slice::<P>();
601                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
602                        #[allow(clippy::cast_possible_truncation)]
603                        let idx = (patch_indices[i] as usize) - self.offset;
604                        !masked.value(idx)
605                    }))
606                })
607            }
608        };
609
610        if filter_mask.all_false() {
611            return Ok(None);
612        }
613
614        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
615        let filtered_indices = filter(&self.indices, &filter_mask)?;
616        let filtered_values = filter(&self.values, &filter_mask)?;
617
618        Ok(Some(Self {
619            array_len: self.array_len,
620            offset: self.offset,
621            indices: filtered_indices,
622            values: filtered_values,
623            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
624            chunk_offsets: None,
625            offset_within_chunk: self.offset_within_chunk,
626        }))
627    }
628
629    /// Slice the patches by a range of the patched array.
630    pub fn slice(&self, range: Range<usize>) -> Option<Self> {
631        let slice_start_idx = self.search_index(range.start).to_index();
632        let slice_end_idx = self.search_index(range.end).to_index();
633
634        if slice_start_idx == slice_end_idx {
635            return None;
636        }
637
638        let values = self.values().slice(slice_start_idx..slice_end_idx);
639        let indices = self.indices().slice(slice_start_idx..slice_end_idx);
640
641        let chunk_offsets = self.chunk_offsets.as_ref().map(|chunk_offsets| {
642            let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
643            let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
644            let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
645            chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
646        });
647
648        let offset_within_chunk = chunk_offsets.as_ref().map(|chunk_offsets| {
649            let base_offset = chunk_offsets
650                .scalar_at(0)
651                .as_primitive()
652                .as_::<usize>()
653                .vortex_expect("chunk offset must be usize");
654            slice_start_idx - base_offset
655        });
656
657        Some(Self {
658            array_len: range.len(),
659            offset: range.start + self.offset(),
660            indices,
661            values,
662            chunk_offsets,
663            offset_within_chunk,
664        })
665    }
666
667    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
668    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
669
670    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
671        (self.num_patches() as f64 / take_indices.len() as f64)
672            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
673    }
674
675    /// Take the indices from the patches
676    ///
677    /// Any nulls in take_indices are added to the resulting patches.
678    pub fn take_with_nulls(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
679        if take_indices.is_empty() {
680            return Ok(None);
681        }
682
683        let take_indices = take_indices.to_primitive();
684        if self.is_map_faster_than_search(&take_indices) {
685            self.take_map(take_indices, true)
686        } else {
687            self.take_search(take_indices, true)
688        }
689    }
690
691    /// Take the indices from the patches.
692    ///
693    /// Any nulls in take_indices are ignored.
694    pub fn take(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
695        if take_indices.is_empty() {
696            return Ok(None);
697        }
698
699        let take_indices = take_indices.to_primitive();
700        if self.is_map_faster_than_search(&take_indices) {
701            self.take_map(take_indices, false)
702        } else {
703            self.take_search(take_indices, false)
704        }
705    }
706
707    #[expect(
708        clippy::cognitive_complexity,
709        reason = "complexity is from nested match_each_* macros"
710    )]
711    pub fn take_search(
712        &self,
713        take_indices: PrimitiveArray,
714        include_nulls: bool,
715    ) -> VortexResult<Option<Self>> {
716        let take_indices_validity = take_indices.validity();
717        let patch_indices = self.indices.to_primitive();
718        let chunk_offsets = self.chunk_offsets().as_ref().map(|co| co.to_primitive());
719
720        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
721            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
722                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
723                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
724                    let take_slice = take_indices.as_slice::<TakeT>();
725
726                    if let Some(chunk_offsets) = chunk_offsets {
727                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
728                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
729                            take_indices_with_search_fn(
730                                patch_indices_slice,
731                                take_slice,
732                                take_indices.validity_mask(),
733                                include_nulls,
734                                |take_idx| {
735                                    self.search_index_chunked_batch(
736                                        patch_indices_slice,
737                                        chunk_offsets,
738                                        take_idx,
739                                    )
740                                },
741                            )
742                        })
743                    } else {
744                        take_indices_with_search_fn(
745                            patch_indices_slice,
746                            take_slice,
747                            take_indices.validity_mask(),
748                            include_nulls,
749                            |take_idx| {
750                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
751                                    // If the offset cannot be converted to T, it's larger than all values in this array.
752                                    return SearchResult::NotFound(patch_indices_slice.len());
753                                };
754
755                                patch_indices_slice
756                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
757                            },
758                        )
759                    }
760                })
761            });
762
763        if new_indices.is_empty() {
764            return Ok(None);
765        }
766
767        let new_indices = new_indices.into_array();
768        let new_array_len = take_indices.len();
769        let values_validity = take_indices_validity.take(&new_indices)?;
770
771        Ok(Some(Self {
772            array_len: new_array_len,
773            offset: 0,
774            indices: new_indices.clone(),
775            values: take(
776                self.values(),
777                &PrimitiveArray::new(values_indices, values_validity).into_array(),
778            )?,
779            chunk_offsets: None,
780            offset_within_chunk: Some(0), // Reset when creating new Patches.
781        }))
782    }
783
784    pub fn take_map(
785        &self,
786        take_indices: PrimitiveArray,
787        include_nulls: bool,
788    ) -> VortexResult<Option<Self>> {
789        let indices = self.indices.to_primitive();
790        let new_length = take_indices.len();
791
792        let Some((new_sparse_indices, value_indices)) =
793            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
794                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
795                    take_map::<_, TakeIndices>(
796                        indices.as_slice::<Indices>(),
797                        take_indices,
798                        self.offset(),
799                        self.min_index(),
800                        self.max_index(),
801                        include_nulls,
802                    )?
803                })
804            })
805        else {
806            return Ok(None);
807        };
808
809        let taken_values = take(self.values(), &value_indices)?;
810
811        Ok(Some(Patches {
812            array_len: new_length,
813            offset: 0,
814            indices: new_sparse_indices,
815            values: taken_values,
816            // TODO(0ax1): Chunk offsets are invalid after take is applied.
817            chunk_offsets: None,
818            offset_within_chunk: self.offset_within_chunk,
819        }))
820    }
821
822    /// Apply patches to a mutable buffer and validity mask.
823    ///
824    /// This method applies the patch values to the given buffer at the positions specified by the
825    /// patch indices. For non-null patch values, it updates the buffer and marks the position as
826    /// valid. For null patch values, it marks the position as invalid.
827    ///
828    /// # Safety
829    ///
830    /// - All patch indices after offset adjustment must be valid indices into the buffer.
831    /// - The buffer and validity mask must have the same length.
832    pub unsafe fn apply_to_buffer<P: NativePType>(&self, buffer: &mut [P], validity: &mut MaskMut) {
833        let patch_indices = self.indices.to_primitive();
834        let patch_values = self.values.to_primitive();
835        let patches_validity = patch_values.validity();
836
837        let patch_values_slice = patch_values.as_slice::<P>();
838        match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
839            let patch_indices_slice = patch_indices.as_slice::<I>();
840
841            // SAFETY:
842            // - `Patches` invariant guarantees indices are sorted and within array bounds.
843            // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
844            // - `buffer` and `validity` have equal length (precondition).
845            // - All patch indices are valid after offset adjustment (precondition).
846            unsafe {
847                apply_patches_to_buffer_inner(
848                    buffer,
849                    validity,
850                    patch_indices_slice,
851                    self.offset,
852                    patch_values_slice,
853                    patches_validity,
854                );
855            }
856        });
857    }
858
859    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
860    where
861        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
862    {
863        let values = f(self.values)?;
864        if self.indices.len() != values.len() {
865            vortex_bail!(
866                "map_values must preserve length: expected {} received {}",
867                self.indices.len(),
868                values.len()
869            )
870        }
871
872        Ok(Self {
873            array_len: self.array_len,
874            offset: self.offset,
875            indices: self.indices,
876            values,
877            chunk_offsets: self.chunk_offsets,
878            offset_within_chunk: self.offset_within_chunk,
879        })
880    }
881}
882
883/// Helper function to apply patches to a buffer.
884///
885/// # Safety
886///
887/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
888///   into both `buffer` and `validity`.
889/// - `patch_indices` must be sorted in ascending order.
890/// - `patch_indices` and `patch_values` must have the same length.
891/// - `buffer` and `validity` must have the same length.
892unsafe fn apply_patches_to_buffer_inner<P, I>(
893    buffer: &mut [P],
894    validity: &mut MaskMut,
895    patch_indices: &[I],
896    patch_offset: usize,
897    patch_values: &[P],
898    patches_validity: &Validity,
899) where
900    P: NativePType,
901    I: UnsignedPType,
902{
903    debug_assert!(!patch_indices.is_empty());
904    debug_assert_eq!(patch_indices.len(), patch_values.len());
905    debug_assert_eq!(buffer.len(), validity.len());
906
907    match patches_validity {
908        Validity::NonNullable | Validity::AllValid => {
909            // All patch values are valid, apply them all.
910            for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
911                let index = i.as_() - patch_offset;
912
913                // SAFETY: `index` is valid because caller guarantees all patch indices are within
914                // bounds after offset adjustment.
915                unsafe {
916                    validity.set_unchecked(index);
917                }
918                buffer[index] = value;
919            }
920        }
921        Validity::AllInvalid => {
922            // All patch values are null, just mark positions as invalid.
923            for &i in patch_indices {
924                let index = i.as_() - patch_offset;
925
926                // SAFETY: `index` is valid because caller guarantees all patch indices are within
927                // bounds after offset adjustment.
928                unsafe {
929                    validity.unset_unchecked(index);
930                }
931            }
932        }
933        Validity::Array(array) => {
934            // Some patch values may be null, check each one.
935            let bool_array = array.to_bool();
936            let mask = bool_array.bit_buffer();
937            for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() {
938                let index = i.as_() - patch_offset;
939
940                // SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch
941                // indices are within bounds after offset adjustment.
942                unsafe {
943                    if mask.value_unchecked(patch_idx) {
944                        buffer[index] = value;
945                        validity.set_unchecked(index);
946                    } else {
947                        validity.unset_unchecked(index);
948                    }
949                }
950            }
951        }
952    }
953}
954
955fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
956    indices: &[I],
957    take_indices: PrimitiveArray,
958    indices_offset: usize,
959    min_index: usize,
960    max_index: usize,
961    include_nulls: bool,
962) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
963where
964    usize: TryFrom<T>,
965    VortexError: From<<I as TryFrom<usize>>::Error>,
966{
967    let take_indices_validity = take_indices.validity();
968    let take_indices = take_indices.as_slice::<T>();
969    let offset_i = I::try_from(indices_offset)?;
970
971    let sparse_index_to_value_index: HashMap<I, usize> = indices
972        .iter()
973        .copied()
974        .map(|idx| idx - offset_i)
975        .enumerate()
976        .map(|(value_index, sparse_index)| (sparse_index, value_index))
977        .collect();
978
979    let (new_sparse_indices, value_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
980        .iter()
981        .copied()
982        .map(usize::try_from)
983        .process_results(|iter| {
984            iter.enumerate()
985                .filter_map(|(idx_in_take, ti)| {
986                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
987                    if include_nulls && take_indices_validity.is_null(idx_in_take) {
988                        Some((idx_in_take as u64, 0))
989                    } else if ti < min_index || ti > max_index {
990                        None
991                    } else {
992                        sparse_index_to_value_index
993                            .get(
994                                &I::try_from(ti)
995                                    .vortex_expect("take index is between min and max index"),
996                            )
997                            .map(|value_index| (idx_in_take as u64, *value_index as u64))
998                    }
999                })
1000                .unzip()
1001        })
1002        .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
1003
1004    if new_sparse_indices.is_empty() {
1005        return Ok(None);
1006    }
1007
1008    let new_sparse_indices = new_sparse_indices.into_array();
1009    let values_validity = take_indices_validity.take(&new_sparse_indices)?;
1010    Ok(Some((
1011        new_sparse_indices,
1012        PrimitiveArray::new(value_indices, values_validity).into_array(),
1013    )))
1014}
1015
1016/// Filter patches with the provided mask (in flattened space).
1017///
1018/// The filter mask may contain indices that are non-patched. The return value of this function
1019/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
1020/// patch values.
1021fn filter_patches_with_mask<T: IntegerPType>(
1022    patch_indices: &[T],
1023    offset: usize,
1024    patch_values: &dyn Array,
1025    mask_indices: &[usize],
1026) -> VortexResult<Option<Patches>> {
1027    let true_count = mask_indices.len();
1028    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
1029    let mut new_mask_indices = Vec::with_capacity(true_count);
1030
1031    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
1032    // the patches are relatively sparse compared to the overall mask, and so many indices in the
1033    // mask will end up being skipped.
1034    const STRIDE: usize = 4;
1035
1036    let mut mask_idx = 0usize;
1037    let mut true_idx = 0usize;
1038
1039    while mask_idx < patch_indices.len() && true_idx < true_count {
1040        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
1041        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
1042        //  the mask (which covers both patch and non-patch values of the parent array), and so to
1043        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
1044        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
1045        //  fallback to performing a two-way iterator merge.
1046        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
1047            // Load a vector of each into our registers.
1048            let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min") - offset;
1049            let left_max = patch_indices[mask_idx + STRIDE]
1050                .to_usize()
1051                .vortex_expect("left_max")
1052                - offset;
1053            let right_min = mask_indices[true_idx];
1054            let right_max = mask_indices[true_idx + STRIDE];
1055
1056            if left_min > right_max {
1057                // Advance right side
1058                true_idx += STRIDE;
1059                continue;
1060            } else if right_min > left_max {
1061                mask_idx += STRIDE;
1062                continue;
1063            } else {
1064                // Fallthrough to direct comparison path.
1065            }
1066        }
1067
1068        // Two-way sorted iterator merge:
1069
1070        let left = patch_indices[mask_idx].to_usize().vortex_expect("left") - offset;
1071        let right = mask_indices[true_idx];
1072
1073        match left.cmp(&right) {
1074            Ordering::Less => {
1075                mask_idx += 1;
1076            }
1077            Ordering::Greater => {
1078                true_idx += 1;
1079            }
1080            Ordering::Equal => {
1081                // Save the mask index as well as the positional index.
1082                new_mask_indices.push(mask_idx);
1083                new_patch_indices.push(true_idx as u64);
1084
1085                mask_idx += 1;
1086                true_idx += 1;
1087            }
1088        }
1089    }
1090
1091    if new_mask_indices.is_empty() {
1092        return Ok(None);
1093    }
1094
1095    let new_patch_indices = new_patch_indices.into_array();
1096    let new_patch_values = filter(
1097        patch_values,
1098        &Mask::from_indices(patch_values.len(), new_mask_indices),
1099    )?;
1100
1101    Ok(Some(Patches::new(
1102        true_count,
1103        0,
1104        new_patch_indices,
1105        new_patch_values,
1106        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
1107        None,
1108    )))
1109}
1110
1111fn take_indices_with_search_fn<I: UnsignedPType, T: IntegerPType, F: Fn(I) -> SearchResult>(
1112    indices: &[I],
1113    take_indices: &[T],
1114    take_validity: Mask,
1115    include_nulls: bool,
1116    search_fn: F,
1117) -> (BufferMut<u64>, BufferMut<u64>) {
1118    take_indices
1119        .iter()
1120        .enumerate()
1121        .filter_map(|(new_patch_idx, &take_idx)| {
1122            if include_nulls && !take_validity.value(new_patch_idx) {
1123                // For nulls, patch index doesn't matter - use 0 for consistency
1124                Some((0u64, new_patch_idx as u64))
1125            } else {
1126                let search_result = I::from(take_idx)
1127                    .map(&search_fn)
1128                    .unwrap_or(SearchResult::NotFound(indices.len()));
1129
1130                search_result
1131                    .to_found()
1132                    .map(|patch_idx| (patch_idx as u64, new_patch_idx as u64))
1133            }
1134        })
1135        .unzip()
1136}
1137
1138#[cfg(test)]
1139mod test {
1140    use vortex_buffer::BufferMut;
1141    use vortex_buffer::buffer;
1142    use vortex_mask::Mask;
1143
1144    use crate::IntoArray;
1145    use crate::ToCanonical;
1146    use crate::arrays::PrimitiveArray;
1147    use crate::patches::Patches;
1148    use crate::search_sorted::SearchResult;
1149    use crate::validity::Validity;
1150
1151    #[test]
1152    fn test_filter() {
1153        let patches = Patches::new(
1154            100,
1155            0,
1156            buffer![10u32, 11, 20].into_array(),
1157            buffer![100, 110, 200].into_array(),
1158            None,
1159        );
1160
1161        let filtered = patches
1162            .filter(&Mask::from_indices(100, vec![10, 20, 30]))
1163            .unwrap()
1164            .unwrap();
1165
1166        let indices = filtered.indices().to_primitive();
1167        let values = filtered.values().to_primitive();
1168        assert_eq!(indices.as_slice::<u64>(), &[0, 1]);
1169        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1170    }
1171
1172    #[test]
1173    fn take_with_nulls() {
1174        let patches = Patches::new(
1175            20,
1176            0,
1177            buffer![2u64, 9, 15].into_array(),
1178            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1179            None,
1180        );
1181
1182        let taken = patches
1183            .take(
1184                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1185                    .into_array(),
1186            )
1187            .unwrap()
1188            .unwrap();
1189        let primitive_values = taken.values().to_primitive();
1190        let primitive_indices = taken.indices().to_primitive();
1191        assert_eq!(taken.array_len(), 2);
1192        assert_eq!(primitive_values.as_slice::<i32>(), [44]);
1193        assert_eq!(primitive_indices.as_slice::<u64>(), [0]);
1194        assert_eq!(
1195            primitive_values.validity_mask(),
1196            Mask::from_iter(vec![true])
1197        );
1198    }
1199
1200    #[test]
1201    fn take_search_with_nulls_chunked() {
1202        let patches = Patches::new(
1203            20,
1204            0,
1205            buffer![2u64, 9, 15].into_array(),
1206            buffer![33_i32, 44, 55].into_array(),
1207            Some(buffer![0u64].into_array()),
1208        );
1209
1210        let taken = patches
1211            .take_search(
1212                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1213                true,
1214            )
1215            .unwrap()
1216            .unwrap();
1217
1218        let primitive_values = taken.values().to_primitive();
1219        let primitive_indices = taken.indices().to_primitive();
1220        assert_eq!(taken.array_len(), 2);
1221        assert_eq!(primitive_values.as_slice::<i32>(), [44, 33]);
1222        assert_eq!(primitive_indices.as_slice::<u64>(), [0, 1]);
1223
1224        assert_eq!(
1225            primitive_values.validity_mask(),
1226            Mask::from_iter([true, false])
1227        );
1228    }
1229
1230    #[test]
1231    fn take_search_chunked_multiple_chunks() {
1232        let patches = Patches::new(
1233            2048,
1234            0,
1235            buffer![100u64, 500, 1200, 1800].into_array(),
1236            buffer![10_i32, 20, 30, 40].into_array(),
1237            Some(buffer![0u64, 2].into_array()),
1238        );
1239
1240        let taken = patches
1241            .take_search(
1242                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1243                true,
1244            )
1245            .unwrap()
1246            .unwrap();
1247
1248        let primitive_values = taken.values().to_primitive();
1249        assert_eq!(taken.array_len(), 3);
1250        assert_eq!(primitive_values.as_slice::<i32>(), [20, 30]);
1251    }
1252
1253    #[test]
1254    fn take_search_chunked_indices_with_no_patches() {
1255        let patches = Patches::new(
1256            20,
1257            0,
1258            buffer![2u64, 9, 15].into_array(),
1259            buffer![33_i32, 44, 55].into_array(),
1260            Some(buffer![0u64].into_array()),
1261        );
1262
1263        let taken = patches
1264            .take_search(
1265                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1266                true,
1267            )
1268            .unwrap();
1269
1270        assert!(taken.is_none());
1271    }
1272
1273    #[test]
1274    fn take_search_chunked_interleaved() {
1275        let patches = Patches::new(
1276            30,
1277            0,
1278            buffer![5u64, 10, 20, 25].into_array(),
1279            buffer![100_i32, 200, 300, 400].into_array(),
1280            Some(buffer![0u64].into_array()),
1281        );
1282
1283        let taken = patches
1284            .take_search(
1285                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1286                true,
1287            )
1288            .unwrap()
1289            .unwrap();
1290
1291        let primitive_values = taken.values().to_primitive();
1292        assert_eq!(taken.array_len(), 4);
1293        assert_eq!(primitive_values.as_slice::<i32>(), [200, 300]);
1294    }
1295
1296    #[test]
1297    fn test_take_search_multiple_chunk_offsets() {
1298        let patches = Patches::new(
1299            1500,
1300            0,
1301            BufferMut::from_iter(0..1500u64).into_array(),
1302            BufferMut::from_iter(0..1500i32).into_array(),
1303            Some(buffer![0u64, 1024u64].into_array()),
1304        );
1305
1306        let taken = patches
1307            .take_search(
1308                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1309                false,
1310            )
1311            .unwrap()
1312            .unwrap();
1313
1314        assert_eq!(taken.array_len(), 1500);
1315    }
1316
1317    #[test]
1318    fn test_slice() {
1319        let values = buffer![15_u32, 135, 13531, 42].into_array();
1320        let indices = buffer![10_u64, 11, 50, 100].into_array();
1321
1322        let patches = Patches::new(101, 0, indices, values, None);
1323
1324        let sliced = patches.slice(15..100).unwrap();
1325        assert_eq!(sliced.array_len(), 100 - 15);
1326        let primitive = sliced.values().to_primitive();
1327
1328        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1329    }
1330
1331    #[test]
1332    fn doubly_sliced() {
1333        let values = buffer![15_u32, 135, 13531, 42].into_array();
1334        let indices = buffer![10_u64, 11, 50, 100].into_array();
1335
1336        let patches = Patches::new(101, 0, indices, values, None);
1337
1338        let sliced = patches.slice(15..100).unwrap();
1339        assert_eq!(sliced.array_len(), 100 - 15);
1340        let primitive = sliced.values().to_primitive();
1341
1342        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1343
1344        let doubly_sliced = sliced.slice(35..36).unwrap();
1345        let primitive_doubly_sliced = doubly_sliced.values().to_primitive();
1346
1347        assert_eq!(primitive_doubly_sliced.as_slice::<u32>(), &[13531]);
1348    }
1349
1350    #[test]
1351    fn test_mask_all_true() {
1352        let patches = Patches::new(
1353            10,
1354            0,
1355            buffer![2u64, 5, 8].into_array(),
1356            buffer![100i32, 200, 300].into_array(),
1357            None,
1358        );
1359
1360        let mask = Mask::new_true(10);
1361        let masked = patches.mask(&mask).unwrap();
1362        assert!(masked.is_none());
1363    }
1364
1365    #[test]
1366    fn test_mask_all_false() {
1367        let patches = Patches::new(
1368            10,
1369            0,
1370            buffer![2u64, 5, 8].into_array(),
1371            buffer![100i32, 200, 300].into_array(),
1372            None,
1373        );
1374
1375        let mask = Mask::new_false(10);
1376        let masked = patches.mask(&mask).unwrap().unwrap();
1377
1378        // No patch values should be masked
1379        let masked_values = masked.values().to_primitive();
1380        assert_eq!(masked_values.as_slice::<i32>(), &[100, 200, 300]);
1381        assert!(masked_values.is_valid(0));
1382        assert!(masked_values.is_valid(1));
1383        assert!(masked_values.is_valid(2));
1384
1385        // Indices should remain unchanged
1386        let indices = masked.indices().to_primitive();
1387        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1388    }
1389
1390    #[test]
1391    fn test_mask_partial() {
1392        let patches = Patches::new(
1393            10,
1394            0,
1395            buffer![2u64, 5, 8].into_array(),
1396            buffer![100i32, 200, 300].into_array(),
1397            None,
1398        );
1399
1400        // Mask that removes patches at indices 2 and 8 (but not 5)
1401        let mask = Mask::from_iter([
1402            false, false, true, false, false, false, false, false, true, false,
1403        ]);
1404        let masked = patches.mask(&mask).unwrap().unwrap();
1405
1406        // Only the patch at index 5 should remain
1407        let masked_values = masked.values().to_primitive();
1408        assert_eq!(masked_values.len(), 1);
1409        assert_eq!(masked_values.as_slice::<i32>(), &[200]);
1410
1411        // Only index 5 should remain
1412        let indices = masked.indices().to_primitive();
1413        assert_eq!(indices.as_slice::<u64>(), &[5]);
1414    }
1415
1416    #[test]
1417    fn test_mask_with_offset() {
1418        let patches = Patches::new(
1419            10,
1420            5,                                  // offset
1421            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1422            buffer![100i32, 200, 300].into_array(),
1423            None,
1424        );
1425
1426        // Mask that sets actual index 2 to null
1427        let mask = Mask::from_iter([
1428            false, false, true, false, false, false, false, false, false, false,
1429        ]);
1430
1431        let masked = patches.mask(&mask).unwrap().unwrap();
1432        assert_eq!(masked.array_len(), 10);
1433        assert_eq!(masked.offset(), 5);
1434        let indices = masked.indices().to_primitive();
1435        assert_eq!(indices.as_slice::<u64>(), &[10, 13]);
1436        let masked_values = masked.values().to_primitive();
1437        assert_eq!(masked_values.as_slice::<i32>(), &[200, 300]);
1438    }
1439
1440    #[test]
1441    fn test_mask_nullable_values() {
1442        let patches = Patches::new(
1443            10,
1444            0,
1445            buffer![2u64, 5, 8].into_array(),
1446            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1447            None,
1448        );
1449
1450        // Test masking removes patch at index 2
1451        let mask = Mask::from_iter([
1452            false, false, true, false, false, false, false, false, false, false,
1453        ]);
1454        let masked = patches.mask(&mask).unwrap().unwrap();
1455
1456        // Patches at indices 5 and 8 should remain
1457        let indices = masked.indices().to_primitive();
1458        assert_eq!(indices.as_slice::<u64>(), &[5, 8]);
1459
1460        // Values should be the null and 300
1461        let masked_values = masked.values().to_primitive();
1462        assert_eq!(masked_values.len(), 2);
1463        assert!(!masked_values.is_valid(0)); // the null value at index 5
1464        assert!(masked_values.is_valid(1)); // the 300 value at index 8
1465        assert_eq!(i32::try_from(&masked_values.scalar_at(1)).unwrap(), 300i32);
1466    }
1467
1468    #[test]
1469    fn test_filter_keep_all() {
1470        let patches = Patches::new(
1471            10,
1472            0,
1473            buffer![2u64, 5, 8].into_array(),
1474            buffer![100i32, 200, 300].into_array(),
1475            None,
1476        );
1477
1478        // Keep all indices (mask with indices 0-9)
1479        let mask = Mask::from_indices(10, (0..10).collect());
1480        let filtered = patches.filter(&mask).unwrap().unwrap();
1481
1482        let indices = filtered.indices().to_primitive();
1483        let values = filtered.values().to_primitive();
1484        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1485        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1486    }
1487
1488    #[test]
1489    fn test_filter_none() {
1490        let patches = Patches::new(
1491            10,
1492            0,
1493            buffer![2u64, 5, 8].into_array(),
1494            buffer![100i32, 200, 300].into_array(),
1495            None,
1496        );
1497
1498        // Filter out all (empty mask means keep nothing)
1499        let mask = Mask::from_indices(10, vec![]);
1500        let filtered = patches.filter(&mask).unwrap();
1501        assert!(filtered.is_none());
1502    }
1503
1504    #[test]
1505    fn test_filter_with_indices() {
1506        let patches = Patches::new(
1507            10,
1508            0,
1509            buffer![2u64, 5, 8].into_array(),
1510            buffer![100i32, 200, 300].into_array(),
1511            None,
1512        );
1513
1514        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1515        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1516        let filtered = patches.filter(&mask).unwrap().unwrap();
1517
1518        let indices = filtered.indices().to_primitive();
1519        let values = filtered.values().to_primitive();
1520        assert_eq!(indices.as_slice::<u64>(), &[0, 1]); // Adjusted indices
1521        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1522    }
1523
1524    #[test]
1525    fn test_slice_full_range() {
1526        let patches = Patches::new(
1527            10,
1528            0,
1529            buffer![2u64, 5, 8].into_array(),
1530            buffer![100i32, 200, 300].into_array(),
1531            None,
1532        );
1533
1534        let sliced = patches.slice(0..10).unwrap();
1535
1536        let indices = sliced.indices().to_primitive();
1537        let values = sliced.values().to_primitive();
1538        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1539        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1540    }
1541
1542    #[test]
1543    fn test_slice_partial() {
1544        let patches = Patches::new(
1545            10,
1546            0,
1547            buffer![2u64, 5, 8].into_array(),
1548            buffer![100i32, 200, 300].into_array(),
1549            None,
1550        );
1551
1552        // Slice from 3 to 8 (includes patch at 5)
1553        let sliced = patches.slice(3..8).unwrap();
1554
1555        let indices = sliced.indices().to_primitive();
1556        let values = sliced.values().to_primitive();
1557        assert_eq!(indices.as_slice::<u64>(), &[5]); // Index stays the same
1558        assert_eq!(values.as_slice::<i32>(), &[200]);
1559        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1560        assert_eq!(sliced.offset(), 3); // New offset
1561    }
1562
1563    #[test]
1564    fn test_slice_no_patches() {
1565        let patches = Patches::new(
1566            10,
1567            0,
1568            buffer![2u64, 5, 8].into_array(),
1569            buffer![100i32, 200, 300].into_array(),
1570            None,
1571        );
1572
1573        // Slice from 6 to 7 (no patches in this range)
1574        let sliced = patches.slice(6..7);
1575        assert!(sliced.is_none());
1576    }
1577
1578    #[test]
1579    fn test_slice_with_offset() {
1580        let patches = Patches::new(
1581            10,
1582            5,                                  // offset
1583            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1584            buffer![100i32, 200, 300].into_array(),
1585            None,
1586        );
1587
1588        // Slice from 3 to 8 (includes patch at actual index 5)
1589        let sliced = patches.slice(3..8).unwrap();
1590
1591        let indices = sliced.indices().to_primitive();
1592        let values = sliced.values().to_primitive();
1593        assert_eq!(indices.as_slice::<u64>(), &[10]); // Index stays the same (offset + 5 = 10)
1594        assert_eq!(values.as_slice::<i32>(), &[200]);
1595        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1596    }
1597
1598    #[test]
1599    fn test_patch_values() {
1600        let patches = Patches::new(
1601            10,
1602            0,
1603            buffer![2u64, 5, 8].into_array(),
1604            buffer![100i32, 200, 300].into_array(),
1605            None,
1606        );
1607
1608        let values = patches.values().to_primitive();
1609        assert_eq!(i32::try_from(&values.scalar_at(0)).unwrap(), 100i32);
1610        assert_eq!(i32::try_from(&values.scalar_at(1)).unwrap(), 200i32);
1611        assert_eq!(i32::try_from(&values.scalar_at(2)).unwrap(), 300i32);
1612    }
1613
1614    #[test]
1615    fn test_indices_range() {
1616        let patches = Patches::new(
1617            10,
1618            0,
1619            buffer![2u64, 5, 8].into_array(),
1620            buffer![100i32, 200, 300].into_array(),
1621            None,
1622        );
1623
1624        assert_eq!(patches.min_index(), 2);
1625        assert_eq!(patches.max_index(), 8);
1626    }
1627
1628    #[test]
1629    fn test_search_index() {
1630        let patches = Patches::new(
1631            10,
1632            0,
1633            buffer![2u64, 5, 8].into_array(),
1634            buffer![100i32, 200, 300].into_array(),
1635            None,
1636        );
1637
1638        // Search for exact indices
1639        assert_eq!(patches.search_index(2), SearchResult::Found(0));
1640        assert_eq!(patches.search_index(5), SearchResult::Found(1));
1641        assert_eq!(patches.search_index(8), SearchResult::Found(2));
1642
1643        // Search for non-patch indices
1644        assert_eq!(patches.search_index(0), SearchResult::NotFound(0));
1645        assert_eq!(patches.search_index(3), SearchResult::NotFound(1));
1646        assert_eq!(patches.search_index(6), SearchResult::NotFound(2));
1647        assert_eq!(patches.search_index(9), SearchResult::NotFound(3));
1648    }
1649
1650    #[test]
1651    fn test_mask_boundary_patches() {
1652        // Test masking patches at array boundaries
1653        let patches = Patches::new(
1654            10,
1655            0,
1656            buffer![0u64, 9].into_array(),
1657            buffer![100i32, 200].into_array(),
1658            None,
1659        );
1660
1661        let mask = Mask::from_iter([
1662            true, false, false, false, false, false, false, false, false, false,
1663        ]);
1664        let masked = patches.mask(&mask).unwrap();
1665        assert!(masked.is_some());
1666        let masked = masked.unwrap();
1667        let indices = masked.indices().to_primitive();
1668        assert_eq!(indices.as_slice::<u64>(), &[9]);
1669        let values = masked.values().to_primitive();
1670        assert_eq!(values.as_slice::<i32>(), &[200]);
1671    }
1672
1673    #[test]
1674    fn test_mask_all_patches_removed() {
1675        // Test when all patches are masked out
1676        let patches = Patches::new(
1677            10,
1678            0,
1679            buffer![2u64, 5, 8].into_array(),
1680            buffer![100i32, 200, 300].into_array(),
1681            None,
1682        );
1683
1684        // Mask that removes all patches
1685        let mask = Mask::from_iter([
1686            false, false, true, false, false, true, false, false, true, false,
1687        ]);
1688        let masked = patches.mask(&mask).unwrap();
1689        assert!(masked.is_none());
1690    }
1691
1692    #[test]
1693    fn test_mask_no_patches_removed() {
1694        // Test when no patches are masked
1695        let patches = Patches::new(
1696            10,
1697            0,
1698            buffer![2u64, 5, 8].into_array(),
1699            buffer![100i32, 200, 300].into_array(),
1700            None,
1701        );
1702
1703        // Mask that doesn't affect any patches
1704        let mask = Mask::from_iter([
1705            true, false, false, true, false, false, true, false, false, true,
1706        ]);
1707        let masked = patches.mask(&mask).unwrap().unwrap();
1708
1709        let indices = masked.indices().to_primitive();
1710        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1711        let values = masked.values().to_primitive();
1712        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1713    }
1714
1715    #[test]
1716    fn test_mask_single_patch() {
1717        // Test with a single patch
1718        let patches = Patches::new(
1719            5,
1720            0,
1721            buffer![2u64].into_array(),
1722            buffer![42i32].into_array(),
1723            None,
1724        );
1725
1726        // Mask that removes the single patch
1727        let mask = Mask::from_iter([false, false, true, false, false]);
1728        let masked = patches.mask(&mask).unwrap();
1729        assert!(masked.is_none());
1730
1731        // Mask that keeps the single patch
1732        let mask = Mask::from_iter([true, false, false, true, false]);
1733        let masked = patches.mask(&mask).unwrap().unwrap();
1734        let indices = masked.indices().to_primitive();
1735        assert_eq!(indices.as_slice::<u64>(), &[2]);
1736    }
1737
1738    #[test]
1739    fn test_mask_contiguous_patches() {
1740        // Test with contiguous patches
1741        let patches = Patches::new(
1742            10,
1743            0,
1744            buffer![3u64, 4, 5, 6].into_array(),
1745            buffer![100i32, 200, 300, 400].into_array(),
1746            None,
1747        );
1748
1749        // Mask that removes middle patches
1750        let mask = Mask::from_iter([
1751            false, false, false, false, true, true, false, false, false, false,
1752        ]);
1753        let masked = patches.mask(&mask).unwrap().unwrap();
1754
1755        let indices = masked.indices().to_primitive();
1756        assert_eq!(indices.as_slice::<u64>(), &[3, 6]);
1757        let values = masked.values().to_primitive();
1758        assert_eq!(values.as_slice::<i32>(), &[100, 400]);
1759    }
1760
1761    #[test]
1762    fn test_mask_with_large_offset() {
1763        // Test with a large offset that shifts all indices
1764        let patches = Patches::new(
1765            20,
1766            15,
1767            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1768            buffer![100i32, 200, 300].into_array(),
1769            None,
1770        );
1771
1772        // Mask that removes the patch at actual index 2
1773        let mask = Mask::from_iter([
1774            false, false, true, false, false, false, false, false, false, false, false, false,
1775            false, false, false, false, false, false, false, false,
1776        ]);
1777        let masked = patches.mask(&mask).unwrap().unwrap();
1778
1779        let indices = masked.indices().to_primitive();
1780        assert_eq!(indices.as_slice::<u64>(), &[16, 19]);
1781        let values = masked.values().to_primitive();
1782        assert_eq!(values.as_slice::<i32>(), &[100, 300]);
1783    }
1784
1785    #[test]
1786    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1787    fn test_mask_wrong_length() {
1788        let patches = Patches::new(
1789            10,
1790            0,
1791            buffer![2u64, 5, 8].into_array(),
1792            buffer![100i32, 200, 300].into_array(),
1793            None,
1794        );
1795
1796        // Mask with wrong length
1797        let mask = Mask::from_iter([false, false, true, false, false]);
1798        patches.mask(&mask).unwrap();
1799    }
1800
1801    #[test]
1802    fn test_chunk_offsets_search() {
1803        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1804        let values = buffer![10i32, 20, 30, 40].into_array();
1805        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1806        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets));
1807
1808        assert!(patches.chunk_offsets.is_some());
1809
1810        // chunk 0: patches at 100, 200
1811        assert_eq!(patches.search_index(100), SearchResult::Found(0));
1812        assert_eq!(patches.search_index(200), SearchResult::Found(1));
1813
1814        // chunks 1, 2: no patches
1815        assert_eq!(patches.search_index(1500), SearchResult::NotFound(2));
1816        assert_eq!(patches.search_index(2000), SearchResult::NotFound(2));
1817
1818        // chunk 3: patches at 3000, 3100
1819        assert_eq!(patches.search_index(3000), SearchResult::Found(2));
1820        assert_eq!(patches.search_index(3100), SearchResult::Found(3));
1821
1822        assert_eq!(patches.search_index(1024), SearchResult::NotFound(2));
1823    }
1824
1825    #[test]
1826    fn test_chunk_offsets_with_slice() {
1827        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1828        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1829        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1830        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1831
1832        let sliced = patches.slice(1000..2200).unwrap();
1833
1834        assert!(sliced.chunk_offsets.is_some());
1835        assert_eq!(sliced.offset(), 1000);
1836
1837        assert_eq!(sliced.search_index(200), SearchResult::Found(0));
1838        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1839        assert_eq!(sliced.search_index(1100), SearchResult::Found(4));
1840
1841        assert_eq!(sliced.search_index(250), SearchResult::NotFound(1));
1842    }
1843
1844    #[test]
1845    fn test_chunk_offsets_with_slice_after_first_chunk() {
1846        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1847        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1848        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1849        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1850
1851        let sliced = patches.slice(1300..2200).unwrap();
1852
1853        assert!(sliced.chunk_offsets.is_some());
1854        assert_eq!(sliced.offset(), 1300);
1855
1856        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1857        assert_eq!(sliced.search_index(200), SearchResult::Found(1));
1858        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1859        assert_eq!(sliced.search_index(250), SearchResult::NotFound(2));
1860        assert_eq!(sliced.search_index(900), SearchResult::NotFound(4));
1861    }
1862
1863    #[test]
1864    fn test_chunk_offsets_slice_empty_result() {
1865        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1866        let values = buffer![10i32, 20, 30, 40].into_array();
1867        let chunk_offsets = buffer![0u64, 2].into_array();
1868        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets));
1869
1870        let sliced = patches.slice(1000..2000);
1871        assert!(sliced.is_none());
1872    }
1873
1874    #[test]
1875    fn test_chunk_offsets_slice_single_patch() {
1876        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
1877        let values = buffer![10i32, 20, 30, 40].into_array();
1878        let chunk_offsets = buffer![0u64, 1, 3].into_array();
1879        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1880
1881        let sliced = patches.slice(1100..1250).unwrap();
1882
1883        assert_eq!(sliced.num_patches(), 1);
1884        assert_eq!(sliced.offset(), 1100);
1885        assert_eq!(sliced.search_index(100), SearchResult::Found(0)); // 1200 - 1100 = 100
1886        assert_eq!(sliced.search_index(50), SearchResult::NotFound(0));
1887        assert_eq!(sliced.search_index(150), SearchResult::NotFound(1));
1888    }
1889
1890    #[test]
1891    fn test_chunk_offsets_slice_across_chunks() {
1892        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
1893        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1894        let chunk_offsets = buffer![0u64, 2, 4].into_array();
1895        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1896
1897        let sliced = patches.slice(150..2150).unwrap();
1898
1899        assert_eq!(sliced.num_patches(), 4);
1900        assert_eq!(sliced.offset(), 150);
1901
1902        assert_eq!(sliced.search_index(50), SearchResult::Found(0)); // 200 - 150 = 50
1903        assert_eq!(sliced.search_index(950), SearchResult::Found(1)); // 1100 - 150 = 950
1904        assert_eq!(sliced.search_index(1050), SearchResult::Found(2)); // 1200 - 150 = 1050
1905        assert_eq!(sliced.search_index(1950), SearchResult::Found(3)); // 2100 - 150 = 1950
1906    }
1907
1908    #[test]
1909    fn test_chunk_offsets_boundary_searches() {
1910        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
1911        let values = buffer![10i32, 20, 30, 40, 50].into_array();
1912        let chunk_offsets = buffer![0u64, 1, 4].into_array();
1913        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1914
1915        assert_eq!(patches.search_index(1023), SearchResult::Found(0));
1916        assert_eq!(patches.search_index(1024), SearchResult::Found(1));
1917        assert_eq!(patches.search_index(1025), SearchResult::Found(2));
1918        assert_eq!(patches.search_index(2047), SearchResult::Found(3));
1919        assert_eq!(patches.search_index(2048), SearchResult::Found(4));
1920
1921        assert_eq!(patches.search_index(1022), SearchResult::NotFound(0));
1922        assert_eq!(patches.search_index(2046), SearchResult::NotFound(3));
1923    }
1924
1925    #[test]
1926    fn test_chunk_offsets_slice_edge_cases() {
1927        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
1928        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1929        let chunk_offsets = buffer![0u64, 3, 5].into_array();
1930        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1931
1932        // Slice at the very beginning
1933        let sliced = patches.slice(0..10).unwrap();
1934        assert_eq!(sliced.num_patches(), 2);
1935        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1936        assert_eq!(sliced.search_index(1), SearchResult::Found(1));
1937
1938        // Slice at the very end
1939        let sliced = patches.slice(2040..3000).unwrap();
1940        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
1941        assert_eq!(sliced.search_index(7), SearchResult::Found(0)); // 2047 - 2040
1942        assert_eq!(sliced.search_index(8), SearchResult::Found(1)); // 2048 - 2040
1943    }
1944
1945    #[test]
1946    fn test_chunk_offsets_slice_nested() {
1947        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
1948        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1949        let chunk_offsets = buffer![0u64].into_array();
1950        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets));
1951
1952        let sliced1 = patches.slice(150..550).unwrap();
1953        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
1954
1955        let sliced2 = sliced1.slice(100..250).unwrap();
1956        assert_eq!(sliced2.num_patches(), 1); // 300
1957        assert_eq!(sliced2.offset(), 250);
1958
1959        assert_eq!(sliced2.search_index(50), SearchResult::Found(0)); // 300 - 250
1960        assert_eq!(sliced2.search_index(150), SearchResult::NotFound(1));
1961    }
1962
1963    #[test]
1964    fn test_index_larger_than_length() {
1965        let chunk_offsets = buffer![0u64].into_array();
1966        let indices = buffer![1023u64].into_array();
1967        let values = buffer![42i32].into_array();
1968        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets));
1969        assert_eq!(patches.search_index(2048), SearchResult::NotFound(1));
1970    }
1971}