vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use itertools::Itertools;
10use num_traits::NumCast;
11use vortex_buffer::BitBuffer;
12use vortex_buffer::BufferMut;
13use vortex_dtype::DType;
14use vortex_dtype::IntegerPType;
15use vortex_dtype::NativePType;
16use vortex_dtype::Nullability::NonNullable;
17use vortex_dtype::PType;
18use vortex_dtype::PTypeDowncastExt;
19use vortex_dtype::UnsignedPType;
20use vortex_dtype::match_each_integer_ptype;
21use vortex_dtype::match_each_native_ptype;
22use vortex_dtype::match_each_unsigned_integer_ptype;
23use vortex_error::VortexError;
24use vortex_error::VortexExpect;
25use vortex_error::VortexResult;
26use vortex_error::vortex_bail;
27use vortex_error::vortex_err;
28use vortex_error::vortex_panic;
29use vortex_mask::AllOr;
30use vortex_mask::Mask;
31use vortex_mask::MaskMut;
32use vortex_scalar::PValue;
33use vortex_scalar::Scalar;
34use vortex_utils::aliases::hash_map::HashMap;
35use vortex_vector::primitive::PVectorMut;
36use vortex_vector::primitive::PrimitiveVectorMut;
37
38use crate::Array;
39use crate::ArrayRef;
40use crate::IntoArray;
41use crate::ToCanonical;
42use crate::arrays::PrimitiveArray;
43use crate::compute::cast;
44use crate::compute::filter;
45use crate::compute::is_sorted;
46use crate::compute::take;
47use crate::search_sorted::SearchResult;
48use crate::search_sorted::SearchSorted;
49use crate::search_sorted::SearchSortedSide;
50use crate::validity::Validity;
51use crate::vtable::ValidityHelper;
52
53/// One patch index offset is stored for each chunk.
54/// This allows for constant time patch index lookups.
55const PATCH_CHUNK_SIZE: usize = 1024;
56
57#[derive(Copy, Clone, prost::Message)]
58pub struct PatchesMetadata {
59    #[prost(uint64, tag = "1")]
60    len: u64,
61    #[prost(uint64, tag = "2")]
62    offset: u64,
63    #[prost(enumeration = "PType", tag = "3")]
64    indices_ptype: i32,
65    #[prost(uint64, optional, tag = "4")]
66    chunk_offsets_len: Option<u64>,
67    #[prost(enumeration = "PType", optional, tag = "5")]
68    chunk_offsets_ptype: Option<i32>,
69    #[prost(uint64, optional, tag = "6")]
70    offset_within_chunk: Option<u64>,
71}
72
73impl PatchesMetadata {
74    #[inline]
75    pub fn new(
76        len: usize,
77        offset: usize,
78        indices_ptype: PType,
79        chunk_offsets_len: Option<usize>,
80        chunk_offsets_ptype: Option<PType>,
81        offset_within_chunk: Option<usize>,
82    ) -> Self {
83        Self {
84            len: len as u64,
85            offset: offset as u64,
86            indices_ptype: indices_ptype as i32,
87            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
88            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
89            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
90        }
91    }
92
93    #[inline]
94    pub fn len(&self) -> usize {
95        usize::try_from(self.len).vortex_expect("len is a valid usize")
96    }
97
98    #[inline]
99    pub fn is_empty(&self) -> bool {
100        self.len == 0
101    }
102
103    #[inline]
104    pub fn offset(&self) -> usize {
105        usize::try_from(self.offset).vortex_expect("offset is a valid usize")
106    }
107
108    #[inline]
109    pub fn chunk_offsets_dtype(&self) -> Option<DType> {
110        self.chunk_offsets_ptype
111            .map(|t| {
112                PType::try_from(t)
113                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
114                    .vortex_expect("invalid i32 value for PType")
115            })
116            .map(|ptype| DType::Primitive(ptype, NonNullable))
117    }
118
119    #[inline]
120    pub fn indices_dtype(&self) -> DType {
121        assert!(
122            self.indices_ptype().is_unsigned_int(),
123            "Patch indices must be unsigned integers"
124        );
125        DType::Primitive(self.indices_ptype(), NonNullable)
126    }
127}
128
129/// A helper for working with patched arrays.
130#[derive(Debug, Clone)]
131pub struct Patches {
132    array_len: usize,
133    offset: usize,
134    indices: ArrayRef,
135    values: ArrayRef,
136    /// Stores the patch index offset for each chunk.
137    ///
138    /// This allows us to lookup the patches for a given chunk in constant time via
139    /// `patch_indices[chunk_offsets[i]..chunk_offsets[i+1]]`.
140    ///
141    /// This is optional for compatibility reasons.
142    chunk_offsets: Option<ArrayRef>,
143    /// Chunk offsets are only sliced off in case the slice is fully
144    /// outside of the chunk range.
145    ///
146    /// Though the range for indices and values is sliced in terms of
147    /// individual elements, not chunks. To account for that we do a
148    /// saturating sub when adjusting the indices based on the chunk offset.
149    ///
150    /// `offset_within_chunk` is necessary in order to keep track of how many
151    /// elements were sliced off within the chunk.
152    offset_within_chunk: Option<usize>,
153}
154
155impl Patches {
156    pub fn new(
157        array_len: usize,
158        offset: usize,
159        indices: ArrayRef,
160        values: ArrayRef,
161        chunk_offsets: Option<ArrayRef>,
162    ) -> Self {
163        assert_eq!(
164            indices.len(),
165            values.len(),
166            "Patch indices and values must have the same length"
167        );
168        assert!(
169            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
170            "Patch indices must be non-nullable unsigned integers, got {:?}",
171            indices.dtype()
172        );
173        assert!(
174            indices.len() <= array_len,
175            "Patch indices must be shorter than the array length"
176        );
177        assert!(!indices.is_empty(), "Patch indices must not be empty");
178        let max = usize::try_from(&indices.scalar_at(indices.len() - 1))
179            .vortex_expect("indices must be a number");
180        assert!(
181            max - offset < array_len,
182            "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
183        );
184
185        debug_assert!(
186            is_sorted(indices.as_ref())
187                .unwrap_or(Some(false))
188                .unwrap_or(false),
189            "Patch indices must be sorted"
190        );
191
192        Self {
193            array_len,
194            offset,
195            indices,
196            values,
197            chunk_offsets: chunk_offsets.clone(),
198            // Initialize with `Some(0)` only if `chunk_offsets` are set.
199            offset_within_chunk: chunk_offsets.map(|_| 0),
200        }
201    }
202
203    /// Construct new patches without validating any of the arguments
204    ///
205    /// # Safety
206    ///
207    /// Users have to assert that
208    /// * Indices and values have the same length
209    /// * Indices is an unsigned integer type
210    /// * Indices must be sorted
211    /// * Last value in indices is smaller than array_len
212    pub unsafe fn new_unchecked(
213        array_len: usize,
214        offset: usize,
215        indices: ArrayRef,
216        values: ArrayRef,
217        chunk_offsets: Option<ArrayRef>,
218        offset_within_chunk: Option<usize>,
219    ) -> Self {
220        Self {
221            array_len,
222            offset,
223            indices,
224            values,
225            chunk_offsets,
226            offset_within_chunk,
227        }
228    }
229
230    #[inline]
231    pub fn array_len(&self) -> usize {
232        self.array_len
233    }
234
235    #[inline]
236    pub fn num_patches(&self) -> usize {
237        self.indices.len()
238    }
239
240    #[inline]
241    pub fn dtype(&self) -> &DType {
242        self.values.dtype()
243    }
244
245    #[inline]
246    pub fn indices(&self) -> &ArrayRef {
247        &self.indices
248    }
249
250    #[inline]
251    pub fn into_indices(self) -> ArrayRef {
252        self.indices
253    }
254
255    #[inline]
256    pub fn indices_mut(&mut self) -> &mut ArrayRef {
257        &mut self.indices
258    }
259
260    #[inline]
261    pub fn values(&self) -> &ArrayRef {
262        &self.values
263    }
264
265    #[inline]
266    pub fn into_values(self) -> ArrayRef {
267        self.values
268    }
269
270    #[inline]
271    pub fn values_mut(&mut self) -> &mut ArrayRef {
272        &mut self.values
273    }
274
275    #[inline]
276    pub fn offset(&self) -> usize {
277        self.offset
278    }
279
280    #[inline]
281    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
282        &self.chunk_offsets
283    }
284
285    #[inline]
286    pub fn chunk_offset_at(&self, idx: usize) -> usize {
287        let Some(chunk_offsets) = &self.chunk_offsets else {
288            vortex_panic!("chunk_offsets must be set to retrieve offset at index")
289        };
290
291        chunk_offsets
292            .scalar_at(idx)
293            .as_primitive()
294            .as_::<usize>()
295            .vortex_expect("chunk offset must be usize")
296    }
297
298    #[inline]
299    pub fn offset_within_chunk(&self) -> Option<usize> {
300        self.offset_within_chunk
301    }
302
303    #[inline]
304    pub fn indices_ptype(&self) -> PType {
305        PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
306    }
307
308    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
309        if self.indices.len() > len {
310            vortex_bail!(
311                "Patch indices {} are longer than the array length {}",
312                self.indices.len(),
313                len
314            );
315        }
316        if self.values.dtype() != dtype {
317            vortex_bail!(
318                "Patch values dtype {} does not match array dtype {}",
319                self.values.dtype(),
320                dtype
321            );
322        }
323        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
324        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
325
326        Ok(PatchesMetadata::new(
327            self.indices.len(),
328            self.offset,
329            self.indices.dtype().as_ptype(),
330            chunk_offsets_len,
331            chunk_offsets_ptype,
332            self.offset_within_chunk,
333        ))
334    }
335
336    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
337        // SAFETY: casting does not affect the relationship between the indices and values
338        unsafe {
339            Ok(Self::new_unchecked(
340                self.array_len,
341                self.offset,
342                self.indices,
343                cast(&self.values, values_dtype)?,
344                self.chunk_offsets,
345                self.offset_within_chunk,
346            ))
347        }
348    }
349
350    /// Get the patched value at a given index if it exists.
351    pub fn get_patched(&self, index: usize) -> Option<Scalar> {
352        self.search_index(index)
353            .to_found()
354            .map(|patch_idx| self.values().scalar_at(patch_idx))
355    }
356
357    /// Searches for `index` in the indices array.
358    ///
359    /// Chooses between chunked search when [`Self::chunk_offsets`] is
360    /// available, and binary search otherwise. The `index` parameter is
361    /// adjusted by [`Self::offset`] for both.
362    ///
363    /// # Arguments
364    /// * `index` - The index to search for
365    ///
366    /// # Returns
367    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
368    ///   position in the patches array
369    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
370    ///   a patch at this index would be inserted to maintain sorted order
371    ///
372    /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found
373    /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound
374    pub fn search_index(&self, index: usize) -> SearchResult {
375        if self.chunk_offsets.is_some() {
376            return self.search_index_chunked(index);
377        }
378
379        Self::search_index_binary_search(&self.indices, index + self.offset)
380    }
381
382    /// Binary searches for `needle` in the indices array.
383    ///
384    /// # Returns
385    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
386    /// with the insertion point if not found.
387    fn search_index_binary_search(indices: &dyn Array, needle: usize) -> SearchResult {
388        if indices.is_canonical() {
389            let primitive = indices.to_primitive();
390            match_each_integer_ptype!(primitive.ptype(), |T| {
391                let Ok(needle) = T::try_from(needle) else {
392                    // If the needle is not of type T, then it cannot possibly be in this array.
393                    //
394                    // The needle is a non-negative integer (a usize); therefore, it must be larger
395                    // than all values in this array.
396                    return SearchResult::NotFound(primitive.len());
397                };
398                return primitive
399                    .as_slice::<T>()
400                    .search_sorted(&needle, SearchSortedSide::Left);
401            });
402        }
403        indices
404            .as_primitive_typed()
405            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
406    }
407
408    /// Constant time searches for `index` in the indices array.
409    ///
410    /// First determines which chunk the target index falls into, then performs
411    /// a binary search within that chunk's range.
412    ///
413    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
414    /// or the insertion point if not found.
415    ///
416    /// # Panics
417    /// Panics if `chunk_offsets` or `offset_within_chunk` are not set.
418    fn search_index_chunked(&self, index: usize) -> SearchResult {
419        let Some(chunk_offsets) = &self.chunk_offsets else {
420            vortex_panic!("chunk_offsets is required to be set")
421        };
422
423        let Some(offset_within_chunk) = self.offset_within_chunk else {
424            vortex_panic!("offset_within_chunk is required to be set")
425        };
426
427        if index >= self.array_len() {
428            return SearchResult::NotFound(self.indices().len());
429        }
430
431        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
432
433        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
434        let base_offset = self.chunk_offset_at(0);
435
436        let patches_start_idx = (self.chunk_offset_at(chunk_idx) - base_offset)
437            // Chunk offsets are only sliced off in case the slice is fully
438            // outside of the chunk range.
439            //
440            // Though the range for indices and values is sliced in terms of
441            // individual elements, not chunks. To account for that we do a
442            // saturating sub when adjusting the indices based on the chunk offset.
443            .saturating_sub(offset_within_chunk);
444
445        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
446            self.chunk_offset_at(chunk_idx + 1) - base_offset - offset_within_chunk
447        } else {
448            self.indices.len()
449        };
450
451        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx);
452        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset);
453
454        match result {
455            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
456            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
457        }
458    }
459
460    /// Batch version of `search_index`.
461    ///
462    /// In contrast to `search_index`, this function requires `indices` as
463    /// well as `chunk_offsets` to be passed as slices. This is to avoid
464    /// redundant canonicalization and `scalar_at` lookups across calls.
465    fn search_index_chunked_batch<T, O>(
466        &self,
467        indices: &[T],
468        chunk_offsets: &[O],
469        index: T,
470    ) -> SearchResult
471    where
472        T: UnsignedPType,
473        O: UnsignedPType,
474        usize: TryFrom<T>,
475        usize: TryFrom<O>,
476    {
477        let Some(offset_within_chunk) = self.offset_within_chunk else {
478            vortex_panic!("offset_within_chunk is required to be set")
479        };
480
481        let chunk_idx = {
482            let Ok(index) = usize::try_from(index) else {
483                // If the needle cannot be converted to usize, it's larger than all values in this array.
484                return SearchResult::NotFound(indices.len());
485            };
486
487            if index >= self.array_len() {
488                return SearchResult::NotFound(self.indices().len());
489            }
490
491            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
492        };
493
494        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
495        let Ok(chunk_offset) = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0]) else {
496            vortex_panic!("chunk_offset failed to convert to usize")
497        };
498
499        let patches_start_idx = chunk_offset
500            // Chunk offsets are only sliced off in case the slice is fully
501            // outside of the chunk range.
502            //
503            // Though the range for indices and values is sliced in terms of
504            // individual elements, not chunks. To account for that we do a
505            // saturating sub when adjusting the indices based on the chunk offset.
506            .saturating_sub(offset_within_chunk);
507
508        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
509            let base_offset_end = chunk_offsets[chunk_idx + 1];
510
511            let Some(offset_within_chunk) = O::from(offset_within_chunk) else {
512                vortex_panic!("offset_within_chunk failed to convert to O");
513            };
514
515            let Ok(patches_end_idx) =
516                usize::try_from(base_offset_end - chunk_offsets[0] - offset_within_chunk)
517            else {
518                vortex_panic!("patches_end_idx failed to convert to usize")
519            };
520
521            patches_end_idx
522        } else {
523            self.indices.len()
524        };
525
526        let Some(offset) = T::from(self.offset) else {
527            // If the offset cannot be converted to T, it's larger than all values in this array.
528            return SearchResult::NotFound(indices.len());
529        };
530
531        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
532        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left);
533
534        match result {
535            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
536            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
537        }
538    }
539
540    /// Returns the minimum patch index
541    pub fn min_index(&self) -> usize {
542        let first = self
543            .indices
544            .scalar_at(0)
545            .as_primitive()
546            .as_::<usize>()
547            .vortex_expect("non-null");
548        first - self.offset
549    }
550
551    /// Returns the maximum patch index
552    pub fn max_index(&self) -> usize {
553        let last = self
554            .indices
555            .scalar_at(self.indices.len() - 1)
556            .as_primitive()
557            .as_::<usize>()
558            .vortex_expect("non-null");
559        last - self.offset
560    }
561
562    /// Filter the patches by a mask, resulting in new patches for the filtered array.
563    pub fn filter(&self, mask: &Mask) -> VortexResult<Option<Self>> {
564        if mask.len() != self.array_len {
565            vortex_bail!(
566                "Filter mask length {} does not match array length {}",
567                mask.len(),
568                self.array_len
569            );
570        }
571
572        match mask.indices() {
573            AllOr::All => Ok(Some(self.clone())),
574            AllOr::None => Ok(None),
575            AllOr::Some(mask_indices) => {
576                let flat_indices = self.indices().to_primitive();
577                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
578                    filter_patches_with_mask(
579                        flat_indices.as_slice::<I>(),
580                        self.offset(),
581                        self.values(),
582                        mask_indices,
583                    )
584                })
585            }
586        }
587    }
588
589    /// Mask the patches, REMOVING the patches where the mask is true.
590    /// Unlike filter, this preserves the patch indices.
591    /// Unlike mask on a single array, this does not set masked values to null.
592    pub fn mask(&self, mask: &Mask) -> VortexResult<Option<Self>> {
593        if mask.len() != self.array_len {
594            vortex_bail!(
595                "Filter mask length {} does not match array length {}",
596                mask.len(),
597                self.array_len
598            );
599        }
600
601        let filter_mask = match mask.bit_buffer() {
602            AllOr::All => return Ok(None),
603            AllOr::None => return Ok(Some(self.clone())),
604            AllOr::Some(masked) => {
605                let patch_indices = self.indices().to_primitive();
606                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
607                    let patch_indices = patch_indices.as_slice::<P>();
608                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
609                        #[allow(clippy::cast_possible_truncation)]
610                        let idx = (patch_indices[i] as usize) - self.offset;
611                        !masked.value(idx)
612                    }))
613                })
614            }
615        };
616
617        if filter_mask.all_false() {
618            return Ok(None);
619        }
620
621        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
622        let filtered_indices = filter(&self.indices, &filter_mask)?;
623        let filtered_values = filter(&self.values, &filter_mask)?;
624
625        Ok(Some(Self {
626            array_len: self.array_len,
627            offset: self.offset,
628            indices: filtered_indices,
629            values: filtered_values,
630            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
631            chunk_offsets: None,
632            offset_within_chunk: self.offset_within_chunk,
633        }))
634    }
635
636    /// Slice the patches by a range of the patched array.
637    pub fn slice(&self, range: Range<usize>) -> Option<Self> {
638        let slice_start_idx = self.search_index(range.start).to_index();
639        let slice_end_idx = self.search_index(range.end).to_index();
640
641        if slice_start_idx == slice_end_idx {
642            return None;
643        }
644
645        let values = self.values().slice(slice_start_idx..slice_end_idx);
646        let indices = self.indices().slice(slice_start_idx..slice_end_idx);
647
648        let chunk_offsets = self.chunk_offsets.as_ref().map(|chunk_offsets| {
649            let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
650            let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
651            let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
652            chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
653        });
654
655        let offset_within_chunk = chunk_offsets.as_ref().map(|chunk_offsets| {
656            let base_offset = chunk_offsets
657                .scalar_at(0)
658                .as_primitive()
659                .as_::<usize>()
660                .vortex_expect("chunk offset must be usize");
661            slice_start_idx - base_offset
662        });
663
664        Some(Self {
665            array_len: range.len(),
666            offset: range.start + self.offset(),
667            indices,
668            values,
669            chunk_offsets,
670            offset_within_chunk,
671        })
672    }
673
674    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
675    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
676
677    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
678        (self.num_patches() as f64 / take_indices.len() as f64)
679            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
680    }
681
682    /// Take the indices from the patches
683    ///
684    /// Any nulls in take_indices are added to the resulting patches.
685    pub fn take_with_nulls(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
686        if take_indices.is_empty() {
687            return Ok(None);
688        }
689
690        let take_indices = take_indices.to_primitive();
691        if self.is_map_faster_than_search(&take_indices) {
692            self.take_map(take_indices, true)
693        } else {
694            self.take_search(take_indices, true)
695        }
696    }
697
698    /// Take the indices from the patches.
699    ///
700    /// Any nulls in take_indices are ignored.
701    pub fn take(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
702        if take_indices.is_empty() {
703            return Ok(None);
704        }
705
706        let take_indices = take_indices.to_primitive();
707        if self.is_map_faster_than_search(&take_indices) {
708            self.take_map(take_indices, false)
709        } else {
710            self.take_search(take_indices, false)
711        }
712    }
713
714    #[expect(
715        clippy::cognitive_complexity,
716        reason = "complexity is from nested match_each_* macros"
717    )]
718    pub fn take_search(
719        &self,
720        take_indices: PrimitiveArray,
721        include_nulls: bool,
722    ) -> VortexResult<Option<Self>> {
723        let take_indices_validity = take_indices.validity();
724        let patch_indices = self.indices.to_primitive();
725        let chunk_offsets = self.chunk_offsets().as_ref().map(|co| co.to_primitive());
726
727        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
728            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
729                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
730                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
731                    let take_slice = take_indices.as_slice::<TakeT>();
732
733                    if let Some(chunk_offsets) = chunk_offsets {
734                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
735                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
736                            take_indices_with_search_fn(
737                                patch_indices_slice,
738                                take_slice,
739                                take_indices.validity_mask(),
740                                include_nulls,
741                                |take_idx| {
742                                    self.search_index_chunked_batch(
743                                        patch_indices_slice,
744                                        chunk_offsets,
745                                        take_idx,
746                                    )
747                                },
748                            )
749                        })
750                    } else {
751                        take_indices_with_search_fn(
752                            patch_indices_slice,
753                            take_slice,
754                            take_indices.validity_mask(),
755                            include_nulls,
756                            |take_idx| {
757                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
758                                    // If the offset cannot be converted to T, it's larger than all values in this array.
759                                    return SearchResult::NotFound(patch_indices_slice.len());
760                                };
761
762                                patch_indices_slice
763                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
764                            },
765                        )
766                    }
767                })
768            });
769
770        if new_indices.is_empty() {
771            return Ok(None);
772        }
773
774        let new_indices = new_indices.into_array();
775        let new_array_len = take_indices.len();
776        let values_validity = take_indices_validity.take(&new_indices)?;
777
778        Ok(Some(Self {
779            array_len: new_array_len,
780            offset: 0,
781            indices: new_indices.clone(),
782            values: take(
783                self.values(),
784                &PrimitiveArray::new(values_indices, values_validity).into_array(),
785            )?,
786            chunk_offsets: None,
787            offset_within_chunk: Some(0), // Reset when creating new Patches.
788        }))
789    }
790
791    pub fn take_map(
792        &self,
793        take_indices: PrimitiveArray,
794        include_nulls: bool,
795    ) -> VortexResult<Option<Self>> {
796        let indices = self.indices.to_primitive();
797        let new_length = take_indices.len();
798
799        let Some((new_sparse_indices, value_indices)) =
800            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
801                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
802                    take_map::<_, TakeIndices>(
803                        indices.as_slice::<Indices>(),
804                        take_indices,
805                        self.offset(),
806                        self.min_index(),
807                        self.max_index(),
808                        include_nulls,
809                    )?
810                })
811            })
812        else {
813            return Ok(None);
814        };
815
816        let taken_values = take(self.values(), &value_indices)?;
817
818        Ok(Some(Patches {
819            array_len: new_length,
820            offset: 0,
821            indices: new_sparse_indices,
822            values: taken_values,
823            // TODO(0ax1): Chunk offsets are invalid after take is applied.
824            chunk_offsets: None,
825            offset_within_chunk: self.offset_within_chunk,
826        }))
827    }
828
829    /// Applies patches to a primitive vector, returning the patched vector.
830    pub fn apply_to_primitive_vector(&self, vector: PrimitiveVectorMut) -> PrimitiveVectorMut {
831        match_each_native_ptype!(vector.ptype(), |T| {
832            self.apply_to_pvector(vector.downcast::<T>()).into()
833        })
834    }
835
836    /// Applies patches to a [`PVectorMut<T>`], returning the patched vector.
837    ///
838    /// This function modifies the elements buffer in-place at the positions specified by the patch
839    /// indices. It also updates the validity mask to reflect the nullability of patch values.
840    pub fn apply_to_pvector<T: NativePType>(&self, pvector: PVectorMut<T>) -> PVectorMut<T> {
841        let (mut elements, mut validity) = pvector.into_parts();
842
843        // SAFETY: We maintain the invariant that elements and validity have the same length, and all
844        // patch indices are valid after offset adjustment (guaranteed by `Patches`).
845        unsafe { self.apply_to_buffer(elements.as_mut_slice(), &mut validity) };
846
847        // SAFETY: We have not modified the length of elements or validity.
848        unsafe { PVectorMut::new_unchecked(elements, validity) }
849    }
850
851    /// Apply patches to a mutable buffer and validity mask.
852    ///
853    /// This method applies the patch values to the given buffer at the positions specified by the
854    /// patch indices. For non-null patch values, it updates the buffer and marks the position as
855    /// valid. For null patch values, it marks the position as invalid.
856    ///
857    /// # Safety
858    ///
859    /// - All patch indices after offset adjustment must be valid indices into the buffer.
860    /// - The buffer and validity mask must have the same length.
861    pub unsafe fn apply_to_buffer<P: NativePType>(&self, buffer: &mut [P], validity: &mut MaskMut) {
862        let patch_indices = self.indices.to_primitive();
863        let patch_values = self.values.to_primitive();
864        let patches_validity = patch_values.validity();
865
866        let patch_values_slice = patch_values.as_slice::<P>();
867        match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
868            let patch_indices_slice = patch_indices.as_slice::<I>();
869
870            // SAFETY:
871            // - `Patches` invariant guarantees indices are sorted and within array bounds.
872            // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
873            // - `buffer` and `validity` have equal length (precondition).
874            // - All patch indices are valid after offset adjustment (precondition).
875            unsafe {
876                apply_patches_to_buffer_inner(
877                    buffer,
878                    validity,
879                    patch_indices_slice,
880                    self.offset,
881                    patch_values_slice,
882                    patches_validity,
883                );
884            }
885        });
886    }
887
888    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
889    where
890        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
891    {
892        let values = f(self.values)?;
893        if self.indices.len() != values.len() {
894            vortex_bail!(
895                "map_values must preserve length: expected {} received {}",
896                self.indices.len(),
897                values.len()
898            )
899        }
900
901        Ok(Self {
902            array_len: self.array_len,
903            offset: self.offset,
904            indices: self.indices,
905            values,
906            chunk_offsets: self.chunk_offsets,
907            offset_within_chunk: self.offset_within_chunk,
908        })
909    }
910}
911
912/// Helper function to apply patches to a buffer.
913///
914/// # Safety
915///
916/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
917///   into both `buffer` and `validity`.
918/// - `patch_indices` must be sorted in ascending order.
919/// - `patch_indices` and `patch_values` must have the same length.
920/// - `buffer` and `validity` must have the same length.
921unsafe fn apply_patches_to_buffer_inner<P, I>(
922    buffer: &mut [P],
923    validity: &mut MaskMut,
924    patch_indices: &[I],
925    patch_offset: usize,
926    patch_values: &[P],
927    patches_validity: &Validity,
928) where
929    P: NativePType,
930    I: UnsignedPType,
931{
932    debug_assert!(!patch_indices.is_empty());
933    debug_assert_eq!(patch_indices.len(), patch_values.len());
934    debug_assert_eq!(buffer.len(), validity.len());
935
936    match patches_validity {
937        Validity::NonNullable | Validity::AllValid => {
938            // All patch values are valid, apply them all.
939            for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
940                let index = i.as_() - patch_offset;
941
942                // SAFETY: `index` is valid because caller guarantees all patch indices are within
943                // bounds after offset adjustment.
944                unsafe {
945                    validity.set_unchecked(index);
946                }
947                buffer[index] = value;
948            }
949        }
950        Validity::AllInvalid => {
951            // All patch values are null, just mark positions as invalid.
952            for &i in patch_indices {
953                let index = i.as_() - patch_offset;
954
955                // SAFETY: `index` is valid because caller guarantees all patch indices are within
956                // bounds after offset adjustment.
957                unsafe {
958                    validity.unset_unchecked(index);
959                }
960            }
961        }
962        Validity::Array(array) => {
963            // Some patch values may be null, check each one.
964            let bool_array = array.to_bool();
965            let mask = bool_array.bit_buffer();
966            for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() {
967                let index = i.as_() - patch_offset;
968
969                // SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch
970                // indices are within bounds after offset adjustment.
971                unsafe {
972                    if mask.value_unchecked(patch_idx) {
973                        buffer[index] = value;
974                        validity.set_unchecked(index);
975                    } else {
976                        validity.unset_unchecked(index);
977                    }
978                }
979            }
980        }
981    }
982}
983
984fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
985    indices: &[I],
986    take_indices: PrimitiveArray,
987    indices_offset: usize,
988    min_index: usize,
989    max_index: usize,
990    include_nulls: bool,
991) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
992where
993    usize: TryFrom<T>,
994    VortexError: From<<I as TryFrom<usize>>::Error>,
995{
996    let take_indices_validity = take_indices.validity();
997    let take_indices = take_indices.as_slice::<T>();
998    let offset_i = I::try_from(indices_offset)?;
999
1000    let sparse_index_to_value_index: HashMap<I, usize> = indices
1001        .iter()
1002        .copied()
1003        .map(|idx| idx - offset_i)
1004        .enumerate()
1005        .map(|(value_index, sparse_index)| (sparse_index, value_index))
1006        .collect();
1007
1008    let (new_sparse_indices, value_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
1009        .iter()
1010        .copied()
1011        .map(usize::try_from)
1012        .process_results(|iter| {
1013            iter.enumerate()
1014                .filter_map(|(idx_in_take, ti)| {
1015                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
1016                    if include_nulls && take_indices_validity.is_null(idx_in_take) {
1017                        Some((idx_in_take as u64, 0))
1018                    } else if ti < min_index || ti > max_index {
1019                        None
1020                    } else {
1021                        sparse_index_to_value_index
1022                            .get(
1023                                &I::try_from(ti)
1024                                    .vortex_expect("take index is between min and max index"),
1025                            )
1026                            .map(|value_index| (idx_in_take as u64, *value_index as u64))
1027                    }
1028                })
1029                .unzip()
1030        })
1031        .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
1032
1033    if new_sparse_indices.is_empty() {
1034        return Ok(None);
1035    }
1036
1037    let new_sparse_indices = new_sparse_indices.into_array();
1038    let values_validity = take_indices_validity.take(&new_sparse_indices)?;
1039    Ok(Some((
1040        new_sparse_indices,
1041        PrimitiveArray::new(value_indices, values_validity).into_array(),
1042    )))
1043}
1044
1045/// Filter patches with the provided mask (in flattened space).
1046///
1047/// The filter mask may contain indices that are non-patched. The return value of this function
1048/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
1049/// patch values.
1050fn filter_patches_with_mask<T: IntegerPType>(
1051    patch_indices: &[T],
1052    offset: usize,
1053    patch_values: &dyn Array,
1054    mask_indices: &[usize],
1055) -> VortexResult<Option<Patches>> {
1056    let true_count = mask_indices.len();
1057    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
1058    let mut new_mask_indices = Vec::with_capacity(true_count);
1059
1060    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
1061    // the patches are relatively sparse compared to the overall mask, and so many indices in the
1062    // mask will end up being skipped.
1063    const STRIDE: usize = 4;
1064
1065    let mut mask_idx = 0usize;
1066    let mut true_idx = 0usize;
1067
1068    while mask_idx < patch_indices.len() && true_idx < true_count {
1069        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
1070        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
1071        //  the mask (which covers both patch and non-patch values of the parent array), and so to
1072        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
1073        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
1074        //  fallback to performing a two-way iterator merge.
1075        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
1076            // Load a vector of each into our registers.
1077            let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min") - offset;
1078            let left_max = patch_indices[mask_idx + STRIDE]
1079                .to_usize()
1080                .vortex_expect("left_max")
1081                - offset;
1082            let right_min = mask_indices[true_idx];
1083            let right_max = mask_indices[true_idx + STRIDE];
1084
1085            if left_min > right_max {
1086                // Advance right side
1087                true_idx += STRIDE;
1088                continue;
1089            } else if right_min > left_max {
1090                mask_idx += STRIDE;
1091                continue;
1092            } else {
1093                // Fallthrough to direct comparison path.
1094            }
1095        }
1096
1097        // Two-way sorted iterator merge:
1098
1099        let left = patch_indices[mask_idx].to_usize().vortex_expect("left") - offset;
1100        let right = mask_indices[true_idx];
1101
1102        match left.cmp(&right) {
1103            Ordering::Less => {
1104                mask_idx += 1;
1105            }
1106            Ordering::Greater => {
1107                true_idx += 1;
1108            }
1109            Ordering::Equal => {
1110                // Save the mask index as well as the positional index.
1111                new_mask_indices.push(mask_idx);
1112                new_patch_indices.push(true_idx as u64);
1113
1114                mask_idx += 1;
1115                true_idx += 1;
1116            }
1117        }
1118    }
1119
1120    if new_mask_indices.is_empty() {
1121        return Ok(None);
1122    }
1123
1124    let new_patch_indices = new_patch_indices.into_array();
1125    let new_patch_values = filter(
1126        patch_values,
1127        &Mask::from_indices(patch_values.len(), new_mask_indices),
1128    )?;
1129
1130    Ok(Some(Patches::new(
1131        true_count,
1132        0,
1133        new_patch_indices,
1134        new_patch_values,
1135        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
1136        None,
1137    )))
1138}
1139
1140fn take_indices_with_search_fn<I: UnsignedPType, T: IntegerPType, F: Fn(I) -> SearchResult>(
1141    indices: &[I],
1142    take_indices: &[T],
1143    take_validity: Mask,
1144    include_nulls: bool,
1145    search_fn: F,
1146) -> (BufferMut<u64>, BufferMut<u64>) {
1147    take_indices
1148        .iter()
1149        .enumerate()
1150        .filter_map(|(new_patch_idx, &take_idx)| {
1151            if include_nulls && !take_validity.value(new_patch_idx) {
1152                // For nulls, patch index doesn't matter - use 0 for consistency
1153                Some((0u64, new_patch_idx as u64))
1154            } else {
1155                let search_result = I::from(take_idx)
1156                    .map(&search_fn)
1157                    .unwrap_or(SearchResult::NotFound(indices.len()));
1158
1159                search_result
1160                    .to_found()
1161                    .map(|patch_idx| (patch_idx as u64, new_patch_idx as u64))
1162            }
1163        })
1164        .unzip()
1165}
1166
1167#[cfg(test)]
1168mod test {
1169    use vortex_buffer::BufferMut;
1170    use vortex_buffer::buffer;
1171    use vortex_mask::Mask;
1172
1173    use crate::IntoArray;
1174    use crate::ToCanonical;
1175    use crate::arrays::PrimitiveArray;
1176    use crate::patches::Patches;
1177    use crate::search_sorted::SearchResult;
1178    use crate::validity::Validity;
1179
1180    #[test]
1181    fn test_filter() {
1182        let patches = Patches::new(
1183            100,
1184            0,
1185            buffer![10u32, 11, 20].into_array(),
1186            buffer![100, 110, 200].into_array(),
1187            None,
1188        );
1189
1190        let filtered = patches
1191            .filter(&Mask::from_indices(100, vec![10, 20, 30]))
1192            .unwrap()
1193            .unwrap();
1194
1195        let indices = filtered.indices().to_primitive();
1196        let values = filtered.values().to_primitive();
1197        assert_eq!(indices.as_slice::<u64>(), &[0, 1]);
1198        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1199    }
1200
1201    #[test]
1202    fn take_with_nulls() {
1203        let patches = Patches::new(
1204            20,
1205            0,
1206            buffer![2u64, 9, 15].into_array(),
1207            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1208            None,
1209        );
1210
1211        let taken = patches
1212            .take(
1213                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1214                    .into_array(),
1215            )
1216            .unwrap()
1217            .unwrap();
1218        let primitive_values = taken.values().to_primitive();
1219        let primitive_indices = taken.indices().to_primitive();
1220        assert_eq!(taken.array_len(), 2);
1221        assert_eq!(primitive_values.as_slice::<i32>(), [44]);
1222        assert_eq!(primitive_indices.as_slice::<u64>(), [0]);
1223        assert_eq!(
1224            primitive_values.validity_mask(),
1225            Mask::from_iter(vec![true])
1226        );
1227    }
1228
1229    #[test]
1230    fn take_search_with_nulls_chunked() {
1231        let patches = Patches::new(
1232            20,
1233            0,
1234            buffer![2u64, 9, 15].into_array(),
1235            buffer![33_i32, 44, 55].into_array(),
1236            Some(buffer![0u64].into_array()),
1237        );
1238
1239        let taken = patches
1240            .take_search(
1241                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1242                true,
1243            )
1244            .unwrap()
1245            .unwrap();
1246
1247        let primitive_values = taken.values().to_primitive();
1248        let primitive_indices = taken.indices().to_primitive();
1249        assert_eq!(taken.array_len(), 2);
1250        assert_eq!(primitive_values.as_slice::<i32>(), [44, 33]);
1251        assert_eq!(primitive_indices.as_slice::<u64>(), [0, 1]);
1252
1253        assert_eq!(
1254            primitive_values.validity_mask(),
1255            Mask::from_iter([true, false])
1256        );
1257    }
1258
1259    #[test]
1260    fn take_search_chunked_multiple_chunks() {
1261        let patches = Patches::new(
1262            2048,
1263            0,
1264            buffer![100u64, 500, 1200, 1800].into_array(),
1265            buffer![10_i32, 20, 30, 40].into_array(),
1266            Some(buffer![0u64, 2].into_array()),
1267        );
1268
1269        let taken = patches
1270            .take_search(
1271                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1272                true,
1273            )
1274            .unwrap()
1275            .unwrap();
1276
1277        let primitive_values = taken.values().to_primitive();
1278        assert_eq!(taken.array_len(), 3);
1279        assert_eq!(primitive_values.as_slice::<i32>(), [20, 30]);
1280    }
1281
1282    #[test]
1283    fn take_search_chunked_indices_with_no_patches() {
1284        let patches = Patches::new(
1285            20,
1286            0,
1287            buffer![2u64, 9, 15].into_array(),
1288            buffer![33_i32, 44, 55].into_array(),
1289            Some(buffer![0u64].into_array()),
1290        );
1291
1292        let taken = patches
1293            .take_search(
1294                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1295                true,
1296            )
1297            .unwrap();
1298
1299        assert!(taken.is_none());
1300    }
1301
1302    #[test]
1303    fn take_search_chunked_interleaved() {
1304        let patches = Patches::new(
1305            30,
1306            0,
1307            buffer![5u64, 10, 20, 25].into_array(),
1308            buffer![100_i32, 200, 300, 400].into_array(),
1309            Some(buffer![0u64].into_array()),
1310        );
1311
1312        let taken = patches
1313            .take_search(
1314                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1315                true,
1316            )
1317            .unwrap()
1318            .unwrap();
1319
1320        let primitive_values = taken.values().to_primitive();
1321        assert_eq!(taken.array_len(), 4);
1322        assert_eq!(primitive_values.as_slice::<i32>(), [200, 300]);
1323    }
1324
1325    #[test]
1326    fn test_take_search_multiple_chunk_offsets() {
1327        let patches = Patches::new(
1328            1500,
1329            0,
1330            BufferMut::from_iter(0..1500u64).into_array(),
1331            BufferMut::from_iter(0..1500i32).into_array(),
1332            Some(buffer![0u64, 1024u64].into_array()),
1333        );
1334
1335        let taken = patches
1336            .take_search(
1337                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1338                false,
1339            )
1340            .unwrap()
1341            .unwrap();
1342
1343        assert_eq!(taken.array_len(), 1500);
1344    }
1345
1346    #[test]
1347    fn test_slice() {
1348        let values = buffer![15_u32, 135, 13531, 42].into_array();
1349        let indices = buffer![10_u64, 11, 50, 100].into_array();
1350
1351        let patches = Patches::new(101, 0, indices, values, None);
1352
1353        let sliced = patches.slice(15..100).unwrap();
1354        assert_eq!(sliced.array_len(), 100 - 15);
1355        let primitive = sliced.values().to_primitive();
1356
1357        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1358    }
1359
1360    #[test]
1361    fn doubly_sliced() {
1362        let values = buffer![15_u32, 135, 13531, 42].into_array();
1363        let indices = buffer![10_u64, 11, 50, 100].into_array();
1364
1365        let patches = Patches::new(101, 0, indices, values, None);
1366
1367        let sliced = patches.slice(15..100).unwrap();
1368        assert_eq!(sliced.array_len(), 100 - 15);
1369        let primitive = sliced.values().to_primitive();
1370
1371        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1372
1373        let doubly_sliced = sliced.slice(35..36).unwrap();
1374        let primitive_doubly_sliced = doubly_sliced.values().to_primitive();
1375
1376        assert_eq!(primitive_doubly_sliced.as_slice::<u32>(), &[13531]);
1377    }
1378
1379    #[test]
1380    fn test_mask_all_true() {
1381        let patches = Patches::new(
1382            10,
1383            0,
1384            buffer![2u64, 5, 8].into_array(),
1385            buffer![100i32, 200, 300].into_array(),
1386            None,
1387        );
1388
1389        let mask = Mask::new_true(10);
1390        let masked = patches.mask(&mask).unwrap();
1391        assert!(masked.is_none());
1392    }
1393
1394    #[test]
1395    fn test_mask_all_false() {
1396        let patches = Patches::new(
1397            10,
1398            0,
1399            buffer![2u64, 5, 8].into_array(),
1400            buffer![100i32, 200, 300].into_array(),
1401            None,
1402        );
1403
1404        let mask = Mask::new_false(10);
1405        let masked = patches.mask(&mask).unwrap().unwrap();
1406
1407        // No patch values should be masked
1408        let masked_values = masked.values().to_primitive();
1409        assert_eq!(masked_values.as_slice::<i32>(), &[100, 200, 300]);
1410        assert!(masked_values.is_valid(0));
1411        assert!(masked_values.is_valid(1));
1412        assert!(masked_values.is_valid(2));
1413
1414        // Indices should remain unchanged
1415        let indices = masked.indices().to_primitive();
1416        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1417    }
1418
1419    #[test]
1420    fn test_mask_partial() {
1421        let patches = Patches::new(
1422            10,
1423            0,
1424            buffer![2u64, 5, 8].into_array(),
1425            buffer![100i32, 200, 300].into_array(),
1426            None,
1427        );
1428
1429        // Mask that removes patches at indices 2 and 8 (but not 5)
1430        let mask = Mask::from_iter([
1431            false, false, true, false, false, false, false, false, true, false,
1432        ]);
1433        let masked = patches.mask(&mask).unwrap().unwrap();
1434
1435        // Only the patch at index 5 should remain
1436        let masked_values = masked.values().to_primitive();
1437        assert_eq!(masked_values.len(), 1);
1438        assert_eq!(masked_values.as_slice::<i32>(), &[200]);
1439
1440        // Only index 5 should remain
1441        let indices = masked.indices().to_primitive();
1442        assert_eq!(indices.as_slice::<u64>(), &[5]);
1443    }
1444
1445    #[test]
1446    fn test_mask_with_offset() {
1447        let patches = Patches::new(
1448            10,
1449            5,                                  // offset
1450            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1451            buffer![100i32, 200, 300].into_array(),
1452            None,
1453        );
1454
1455        // Mask that sets actual index 2 to null
1456        let mask = Mask::from_iter([
1457            false, false, true, false, false, false, false, false, false, false,
1458        ]);
1459
1460        let masked = patches.mask(&mask).unwrap().unwrap();
1461        assert_eq!(masked.array_len(), 10);
1462        assert_eq!(masked.offset(), 5);
1463        let indices = masked.indices().to_primitive();
1464        assert_eq!(indices.as_slice::<u64>(), &[10, 13]);
1465        let masked_values = masked.values().to_primitive();
1466        assert_eq!(masked_values.as_slice::<i32>(), &[200, 300]);
1467    }
1468
1469    #[test]
1470    fn test_mask_nullable_values() {
1471        let patches = Patches::new(
1472            10,
1473            0,
1474            buffer![2u64, 5, 8].into_array(),
1475            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1476            None,
1477        );
1478
1479        // Test masking removes patch at index 2
1480        let mask = Mask::from_iter([
1481            false, false, true, false, false, false, false, false, false, false,
1482        ]);
1483        let masked = patches.mask(&mask).unwrap().unwrap();
1484
1485        // Patches at indices 5 and 8 should remain
1486        let indices = masked.indices().to_primitive();
1487        assert_eq!(indices.as_slice::<u64>(), &[5, 8]);
1488
1489        // Values should be the null and 300
1490        let masked_values = masked.values().to_primitive();
1491        assert_eq!(masked_values.len(), 2);
1492        assert!(!masked_values.is_valid(0)); // the null value at index 5
1493        assert!(masked_values.is_valid(1)); // the 300 value at index 8
1494        assert_eq!(i32::try_from(&masked_values.scalar_at(1)).unwrap(), 300i32);
1495    }
1496
1497    #[test]
1498    fn test_filter_keep_all() {
1499        let patches = Patches::new(
1500            10,
1501            0,
1502            buffer![2u64, 5, 8].into_array(),
1503            buffer![100i32, 200, 300].into_array(),
1504            None,
1505        );
1506
1507        // Keep all indices (mask with indices 0-9)
1508        let mask = Mask::from_indices(10, (0..10).collect());
1509        let filtered = patches.filter(&mask).unwrap().unwrap();
1510
1511        let indices = filtered.indices().to_primitive();
1512        let values = filtered.values().to_primitive();
1513        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1514        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1515    }
1516
1517    #[test]
1518    fn test_filter_none() {
1519        let patches = Patches::new(
1520            10,
1521            0,
1522            buffer![2u64, 5, 8].into_array(),
1523            buffer![100i32, 200, 300].into_array(),
1524            None,
1525        );
1526
1527        // Filter out all (empty mask means keep nothing)
1528        let mask = Mask::from_indices(10, vec![]);
1529        let filtered = patches.filter(&mask).unwrap();
1530        assert!(filtered.is_none());
1531    }
1532
1533    #[test]
1534    fn test_filter_with_indices() {
1535        let patches = Patches::new(
1536            10,
1537            0,
1538            buffer![2u64, 5, 8].into_array(),
1539            buffer![100i32, 200, 300].into_array(),
1540            None,
1541        );
1542
1543        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1544        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1545        let filtered = patches.filter(&mask).unwrap().unwrap();
1546
1547        let indices = filtered.indices().to_primitive();
1548        let values = filtered.values().to_primitive();
1549        assert_eq!(indices.as_slice::<u64>(), &[0, 1]); // Adjusted indices
1550        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1551    }
1552
1553    #[test]
1554    fn test_slice_full_range() {
1555        let patches = Patches::new(
1556            10,
1557            0,
1558            buffer![2u64, 5, 8].into_array(),
1559            buffer![100i32, 200, 300].into_array(),
1560            None,
1561        );
1562
1563        let sliced = patches.slice(0..10).unwrap();
1564
1565        let indices = sliced.indices().to_primitive();
1566        let values = sliced.values().to_primitive();
1567        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1568        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1569    }
1570
1571    #[test]
1572    fn test_slice_partial() {
1573        let patches = Patches::new(
1574            10,
1575            0,
1576            buffer![2u64, 5, 8].into_array(),
1577            buffer![100i32, 200, 300].into_array(),
1578            None,
1579        );
1580
1581        // Slice from 3 to 8 (includes patch at 5)
1582        let sliced = patches.slice(3..8).unwrap();
1583
1584        let indices = sliced.indices().to_primitive();
1585        let values = sliced.values().to_primitive();
1586        assert_eq!(indices.as_slice::<u64>(), &[5]); // Index stays the same
1587        assert_eq!(values.as_slice::<i32>(), &[200]);
1588        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1589        assert_eq!(sliced.offset(), 3); // New offset
1590    }
1591
1592    #[test]
1593    fn test_slice_no_patches() {
1594        let patches = Patches::new(
1595            10,
1596            0,
1597            buffer![2u64, 5, 8].into_array(),
1598            buffer![100i32, 200, 300].into_array(),
1599            None,
1600        );
1601
1602        // Slice from 6 to 7 (no patches in this range)
1603        let sliced = patches.slice(6..7);
1604        assert!(sliced.is_none());
1605    }
1606
1607    #[test]
1608    fn test_slice_with_offset() {
1609        let patches = Patches::new(
1610            10,
1611            5,                                  // offset
1612            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1613            buffer![100i32, 200, 300].into_array(),
1614            None,
1615        );
1616
1617        // Slice from 3 to 8 (includes patch at actual index 5)
1618        let sliced = patches.slice(3..8).unwrap();
1619
1620        let indices = sliced.indices().to_primitive();
1621        let values = sliced.values().to_primitive();
1622        assert_eq!(indices.as_slice::<u64>(), &[10]); // Index stays the same (offset + 5 = 10)
1623        assert_eq!(values.as_slice::<i32>(), &[200]);
1624        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1625    }
1626
1627    #[test]
1628    fn test_patch_values() {
1629        let patches = Patches::new(
1630            10,
1631            0,
1632            buffer![2u64, 5, 8].into_array(),
1633            buffer![100i32, 200, 300].into_array(),
1634            None,
1635        );
1636
1637        let values = patches.values().to_primitive();
1638        assert_eq!(i32::try_from(&values.scalar_at(0)).unwrap(), 100i32);
1639        assert_eq!(i32::try_from(&values.scalar_at(1)).unwrap(), 200i32);
1640        assert_eq!(i32::try_from(&values.scalar_at(2)).unwrap(), 300i32);
1641    }
1642
1643    #[test]
1644    fn test_indices_range() {
1645        let patches = Patches::new(
1646            10,
1647            0,
1648            buffer![2u64, 5, 8].into_array(),
1649            buffer![100i32, 200, 300].into_array(),
1650            None,
1651        );
1652
1653        assert_eq!(patches.min_index(), 2);
1654        assert_eq!(patches.max_index(), 8);
1655    }
1656
1657    #[test]
1658    fn test_search_index() {
1659        let patches = Patches::new(
1660            10,
1661            0,
1662            buffer![2u64, 5, 8].into_array(),
1663            buffer![100i32, 200, 300].into_array(),
1664            None,
1665        );
1666
1667        // Search for exact indices
1668        assert_eq!(patches.search_index(2), SearchResult::Found(0));
1669        assert_eq!(patches.search_index(5), SearchResult::Found(1));
1670        assert_eq!(patches.search_index(8), SearchResult::Found(2));
1671
1672        // Search for non-patch indices
1673        assert_eq!(patches.search_index(0), SearchResult::NotFound(0));
1674        assert_eq!(patches.search_index(3), SearchResult::NotFound(1));
1675        assert_eq!(patches.search_index(6), SearchResult::NotFound(2));
1676        assert_eq!(patches.search_index(9), SearchResult::NotFound(3));
1677    }
1678
1679    #[test]
1680    fn test_mask_boundary_patches() {
1681        // Test masking patches at array boundaries
1682        let patches = Patches::new(
1683            10,
1684            0,
1685            buffer![0u64, 9].into_array(),
1686            buffer![100i32, 200].into_array(),
1687            None,
1688        );
1689
1690        let mask = Mask::from_iter([
1691            true, false, false, false, false, false, false, false, false, false,
1692        ]);
1693        let masked = patches.mask(&mask).unwrap();
1694        assert!(masked.is_some());
1695        let masked = masked.unwrap();
1696        let indices = masked.indices().to_primitive();
1697        assert_eq!(indices.as_slice::<u64>(), &[9]);
1698        let values = masked.values().to_primitive();
1699        assert_eq!(values.as_slice::<i32>(), &[200]);
1700    }
1701
1702    #[test]
1703    fn test_mask_all_patches_removed() {
1704        // Test when all patches are masked out
1705        let patches = Patches::new(
1706            10,
1707            0,
1708            buffer![2u64, 5, 8].into_array(),
1709            buffer![100i32, 200, 300].into_array(),
1710            None,
1711        );
1712
1713        // Mask that removes all patches
1714        let mask = Mask::from_iter([
1715            false, false, true, false, false, true, false, false, true, false,
1716        ]);
1717        let masked = patches.mask(&mask).unwrap();
1718        assert!(masked.is_none());
1719    }
1720
1721    #[test]
1722    fn test_mask_no_patches_removed() {
1723        // Test when no patches are masked
1724        let patches = Patches::new(
1725            10,
1726            0,
1727            buffer![2u64, 5, 8].into_array(),
1728            buffer![100i32, 200, 300].into_array(),
1729            None,
1730        );
1731
1732        // Mask that doesn't affect any patches
1733        let mask = Mask::from_iter([
1734            true, false, false, true, false, false, true, false, false, true,
1735        ]);
1736        let masked = patches.mask(&mask).unwrap().unwrap();
1737
1738        let indices = masked.indices().to_primitive();
1739        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1740        let values = masked.values().to_primitive();
1741        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1742    }
1743
1744    #[test]
1745    fn test_mask_single_patch() {
1746        // Test with a single patch
1747        let patches = Patches::new(
1748            5,
1749            0,
1750            buffer![2u64].into_array(),
1751            buffer![42i32].into_array(),
1752            None,
1753        );
1754
1755        // Mask that removes the single patch
1756        let mask = Mask::from_iter([false, false, true, false, false]);
1757        let masked = patches.mask(&mask).unwrap();
1758        assert!(masked.is_none());
1759
1760        // Mask that keeps the single patch
1761        let mask = Mask::from_iter([true, false, false, true, false]);
1762        let masked = patches.mask(&mask).unwrap().unwrap();
1763        let indices = masked.indices().to_primitive();
1764        assert_eq!(indices.as_slice::<u64>(), &[2]);
1765    }
1766
1767    #[test]
1768    fn test_mask_contiguous_patches() {
1769        // Test with contiguous patches
1770        let patches = Patches::new(
1771            10,
1772            0,
1773            buffer![3u64, 4, 5, 6].into_array(),
1774            buffer![100i32, 200, 300, 400].into_array(),
1775            None,
1776        );
1777
1778        // Mask that removes middle patches
1779        let mask = Mask::from_iter([
1780            false, false, false, false, true, true, false, false, false, false,
1781        ]);
1782        let masked = patches.mask(&mask).unwrap().unwrap();
1783
1784        let indices = masked.indices().to_primitive();
1785        assert_eq!(indices.as_slice::<u64>(), &[3, 6]);
1786        let values = masked.values().to_primitive();
1787        assert_eq!(values.as_slice::<i32>(), &[100, 400]);
1788    }
1789
1790    #[test]
1791    fn test_mask_with_large_offset() {
1792        // Test with a large offset that shifts all indices
1793        let patches = Patches::new(
1794            20,
1795            15,
1796            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1797            buffer![100i32, 200, 300].into_array(),
1798            None,
1799        );
1800
1801        // Mask that removes the patch at actual index 2
1802        let mask = Mask::from_iter([
1803            false, false, true, false, false, false, false, false, false, false, false, false,
1804            false, false, false, false, false, false, false, false,
1805        ]);
1806        let masked = patches.mask(&mask).unwrap().unwrap();
1807
1808        let indices = masked.indices().to_primitive();
1809        assert_eq!(indices.as_slice::<u64>(), &[16, 19]);
1810        let values = masked.values().to_primitive();
1811        assert_eq!(values.as_slice::<i32>(), &[100, 300]);
1812    }
1813
1814    #[test]
1815    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1816    fn test_mask_wrong_length() {
1817        let patches = Patches::new(
1818            10,
1819            0,
1820            buffer![2u64, 5, 8].into_array(),
1821            buffer![100i32, 200, 300].into_array(),
1822            None,
1823        );
1824
1825        // Mask with wrong length
1826        let mask = Mask::from_iter([false, false, true, false, false]);
1827        patches.mask(&mask).unwrap();
1828    }
1829
1830    #[test]
1831    fn test_chunk_offsets_search() {
1832        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1833        let values = buffer![10i32, 20, 30, 40].into_array();
1834        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1835        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets));
1836
1837        assert!(patches.chunk_offsets.is_some());
1838
1839        // chunk 0: patches at 100, 200
1840        assert_eq!(patches.search_index(100), SearchResult::Found(0));
1841        assert_eq!(patches.search_index(200), SearchResult::Found(1));
1842
1843        // chunks 1, 2: no patches
1844        assert_eq!(patches.search_index(1500), SearchResult::NotFound(2));
1845        assert_eq!(patches.search_index(2000), SearchResult::NotFound(2));
1846
1847        // chunk 3: patches at 3000, 3100
1848        assert_eq!(patches.search_index(3000), SearchResult::Found(2));
1849        assert_eq!(patches.search_index(3100), SearchResult::Found(3));
1850
1851        assert_eq!(patches.search_index(1024), SearchResult::NotFound(2));
1852    }
1853
1854    #[test]
1855    fn test_chunk_offsets_with_slice() {
1856        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1857        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1858        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1859        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1860
1861        let sliced = patches.slice(1000..2200).unwrap();
1862
1863        assert!(sliced.chunk_offsets.is_some());
1864        assert_eq!(sliced.offset(), 1000);
1865
1866        assert_eq!(sliced.search_index(200), SearchResult::Found(0));
1867        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1868        assert_eq!(sliced.search_index(1100), SearchResult::Found(4));
1869
1870        assert_eq!(sliced.search_index(250), SearchResult::NotFound(1));
1871    }
1872
1873    #[test]
1874    fn test_chunk_offsets_with_slice_after_first_chunk() {
1875        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1876        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1877        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1878        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1879
1880        let sliced = patches.slice(1300..2200).unwrap();
1881
1882        assert!(sliced.chunk_offsets.is_some());
1883        assert_eq!(sliced.offset(), 1300);
1884
1885        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1886        assert_eq!(sliced.search_index(200), SearchResult::Found(1));
1887        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1888        assert_eq!(sliced.search_index(250), SearchResult::NotFound(2));
1889        assert_eq!(sliced.search_index(900), SearchResult::NotFound(4));
1890    }
1891
1892    #[test]
1893    fn test_chunk_offsets_slice_empty_result() {
1894        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1895        let values = buffer![10i32, 20, 30, 40].into_array();
1896        let chunk_offsets = buffer![0u64, 2].into_array();
1897        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets));
1898
1899        let sliced = patches.slice(1000..2000);
1900        assert!(sliced.is_none());
1901    }
1902
1903    #[test]
1904    fn test_chunk_offsets_slice_single_patch() {
1905        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
1906        let values = buffer![10i32, 20, 30, 40].into_array();
1907        let chunk_offsets = buffer![0u64, 1, 3].into_array();
1908        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1909
1910        let sliced = patches.slice(1100..1250).unwrap();
1911
1912        assert_eq!(sliced.num_patches(), 1);
1913        assert_eq!(sliced.offset(), 1100);
1914        assert_eq!(sliced.search_index(100), SearchResult::Found(0)); // 1200 - 1100 = 100
1915        assert_eq!(sliced.search_index(50), SearchResult::NotFound(0));
1916        assert_eq!(sliced.search_index(150), SearchResult::NotFound(1));
1917    }
1918
1919    #[test]
1920    fn test_chunk_offsets_slice_across_chunks() {
1921        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
1922        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1923        let chunk_offsets = buffer![0u64, 2, 4].into_array();
1924        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1925
1926        let sliced = patches.slice(150..2150).unwrap();
1927
1928        assert_eq!(sliced.num_patches(), 4);
1929        assert_eq!(sliced.offset(), 150);
1930
1931        assert_eq!(sliced.search_index(50), SearchResult::Found(0)); // 200 - 150 = 50
1932        assert_eq!(sliced.search_index(950), SearchResult::Found(1)); // 1100 - 150 = 950
1933        assert_eq!(sliced.search_index(1050), SearchResult::Found(2)); // 1200 - 150 = 1050
1934        assert_eq!(sliced.search_index(1950), SearchResult::Found(3)); // 2100 - 150 = 1950
1935    }
1936
1937    #[test]
1938    fn test_chunk_offsets_boundary_searches() {
1939        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
1940        let values = buffer![10i32, 20, 30, 40, 50].into_array();
1941        let chunk_offsets = buffer![0u64, 1, 4].into_array();
1942        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1943
1944        assert_eq!(patches.search_index(1023), SearchResult::Found(0));
1945        assert_eq!(patches.search_index(1024), SearchResult::Found(1));
1946        assert_eq!(patches.search_index(1025), SearchResult::Found(2));
1947        assert_eq!(patches.search_index(2047), SearchResult::Found(3));
1948        assert_eq!(patches.search_index(2048), SearchResult::Found(4));
1949
1950        assert_eq!(patches.search_index(1022), SearchResult::NotFound(0));
1951        assert_eq!(patches.search_index(2046), SearchResult::NotFound(3));
1952    }
1953
1954    #[test]
1955    fn test_chunk_offsets_slice_edge_cases() {
1956        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
1957        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1958        let chunk_offsets = buffer![0u64, 3, 5].into_array();
1959        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1960
1961        // Slice at the very beginning
1962        let sliced = patches.slice(0..10).unwrap();
1963        assert_eq!(sliced.num_patches(), 2);
1964        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1965        assert_eq!(sliced.search_index(1), SearchResult::Found(1));
1966
1967        // Slice at the very end
1968        let sliced = patches.slice(2040..3000).unwrap();
1969        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
1970        assert_eq!(sliced.search_index(7), SearchResult::Found(0)); // 2047 - 2040
1971        assert_eq!(sliced.search_index(8), SearchResult::Found(1)); // 2048 - 2040
1972    }
1973
1974    #[test]
1975    fn test_chunk_offsets_slice_nested() {
1976        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
1977        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1978        let chunk_offsets = buffer![0u64].into_array();
1979        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets));
1980
1981        let sliced1 = patches.slice(150..550).unwrap();
1982        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
1983
1984        let sliced2 = sliced1.slice(100..250).unwrap();
1985        assert_eq!(sliced2.num_patches(), 1); // 300
1986        assert_eq!(sliced2.offset(), 250);
1987
1988        assert_eq!(sliced2.search_index(50), SearchResult::Found(0)); // 300 - 250
1989        assert_eq!(sliced2.search_index(150), SearchResult::NotFound(1));
1990    }
1991
1992    #[test]
1993    fn test_index_larger_than_length() {
1994        let chunk_offsets = buffer![0u64].into_array();
1995        let indices = buffer![1023u64].into_array();
1996        let values = buffer![42i32].into_array();
1997        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets));
1998        assert_eq!(patches.search_index(2048), SearchResult::NotFound(1));
1999    }
2000}