vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use itertools::Itertools;
10use num_traits::NumCast;
11use vortex_buffer::{BitBuffer, BufferMut};
12use vortex_dtype::Nullability::NonNullable;
13use vortex_dtype::{
14    DType, IntegerPType, NativePType, PType, UnsignedPType, match_each_integer_ptype,
15    match_each_unsigned_integer_ptype,
16};
17use vortex_error::{
18    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
19};
20use vortex_mask::{AllOr, Mask};
21use vortex_scalar::{PValue, Scalar};
22use vortex_utils::aliases::hash_map::HashMap;
23
24use crate::arrays::PrimitiveArray;
25use crate::compute::{cast, filter, is_sorted, take};
26use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
27use crate::vtable::ValidityHelper;
28use crate::{Array, ArrayRef, IntoArray, ToCanonical};
29
30/// One patch index offset is stored for each chunk.
31/// This allows for constant time patch index lookups.
32const PATCH_CHUNK_SIZE: usize = 1024;
33
34#[derive(Copy, Clone, prost::Message)]
35pub struct PatchesMetadata {
36    #[prost(uint64, tag = "1")]
37    len: u64,
38    #[prost(uint64, tag = "2")]
39    offset: u64,
40    #[prost(enumeration = "PType", tag = "3")]
41    indices_ptype: i32,
42    #[prost(uint64, optional, tag = "4")]
43    chunk_offsets_len: Option<u64>,
44    #[prost(enumeration = "PType", optional, tag = "5")]
45    chunk_offsets_ptype: Option<i32>,
46    #[prost(uint64, optional, tag = "6")]
47    offset_within_chunk: Option<u64>,
48}
49
50impl PatchesMetadata {
51    #[inline]
52    pub fn new(
53        len: usize,
54        offset: usize,
55        indices_ptype: PType,
56        chunk_offsets_len: Option<usize>,
57        chunk_offsets_ptype: Option<PType>,
58        offset_within_chunk: Option<usize>,
59    ) -> Self {
60        Self {
61            len: len as u64,
62            offset: offset as u64,
63            indices_ptype: indices_ptype as i32,
64            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
65            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
66            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
67        }
68    }
69
70    #[inline]
71    pub fn len(&self) -> usize {
72        usize::try_from(self.len).vortex_expect("len is a valid usize")
73    }
74
75    #[inline]
76    pub fn is_empty(&self) -> bool {
77        self.len == 0
78    }
79
80    #[inline]
81    pub fn offset(&self) -> usize {
82        usize::try_from(self.offset).vortex_expect("offset is a valid usize")
83    }
84
85    #[inline]
86    pub fn chunk_offsets_dtype(&self) -> Option<DType> {
87        self.chunk_offsets_ptype
88            .map(|t| {
89                PType::try_from(t)
90                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
91                    .vortex_expect("invalid i32 value for PType")
92            })
93            .map(|ptype| DType::Primitive(ptype, NonNullable))
94    }
95
96    #[inline]
97    pub fn indices_dtype(&self) -> DType {
98        assert!(
99            self.indices_ptype().is_unsigned_int(),
100            "Patch indices must be unsigned integers"
101        );
102        DType::Primitive(self.indices_ptype(), NonNullable)
103    }
104}
105
106/// A helper for working with patched arrays.
107#[derive(Debug, Clone)]
108pub struct Patches {
109    array_len: usize,
110    offset: usize,
111    indices: ArrayRef,
112    values: ArrayRef,
113    /// Stores the patch index offset for each chunk.
114    chunk_offsets: Option<ArrayRef>,
115    /// Chunk offsets are only sliced off in case the slice is fully
116    /// outside of the chunk range.
117    ///
118    /// Though the range for indices and values is sliced in terms of
119    /// individual elements, not chunks. To account for that we do a
120    /// saturating sub when adjusting the indices based on the chunk offset.
121    //
122    /// `offset_within_chunk` is necessary in order to keep track of how many
123    /// elements were sliced off within the chunk.
124    offset_within_chunk: Option<usize>,
125}
126
127impl Patches {
128    pub fn new(
129        array_len: usize,
130        offset: usize,
131        indices: ArrayRef,
132        values: ArrayRef,
133        chunk_offsets: Option<ArrayRef>,
134    ) -> Self {
135        assert_eq!(
136            indices.len(),
137            values.len(),
138            "Patch indices and values must have the same length"
139        );
140        assert!(
141            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
142            "Patch indices must be non-nullable unsigned integers, got {:?}",
143            indices.dtype()
144        );
145        assert!(
146            indices.len() <= array_len,
147            "Patch indices must be shorter than the array length"
148        );
149        assert!(!indices.is_empty(), "Patch indices must not be empty");
150        let max = usize::try_from(&indices.scalar_at(indices.len() - 1))
151            .vortex_expect("indices must be a number");
152        assert!(
153            max - offset < array_len,
154            "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
155        );
156
157        debug_assert!(
158            is_sorted(indices.as_ref())
159                .unwrap_or(Some(false))
160                .unwrap_or(false),
161            "Patch indices must be sorted"
162        );
163
164        Self {
165            array_len,
166            offset,
167            indices,
168            values,
169            chunk_offsets: chunk_offsets.clone(),
170            // Initialize with `Some(0)` only if `chunk_offsets` are set.
171            offset_within_chunk: chunk_offsets.map(|_| 0),
172        }
173    }
174
175    /// Construct new patches without validating any of the arguments
176    ///
177    /// # Safety
178    ///
179    /// Users have to assert that
180    /// * Indices and values have the same length
181    /// * Indices is an unsigned integer type
182    /// * Indices must be sorted
183    /// * Last value in indices is smaller than array_len
184    pub unsafe fn new_unchecked(
185        array_len: usize,
186        offset: usize,
187        indices: ArrayRef,
188        values: ArrayRef,
189        chunk_offsets: Option<ArrayRef>,
190        offset_within_chunk: Option<usize>,
191    ) -> Self {
192        Self {
193            array_len,
194            offset,
195            indices,
196            values,
197            chunk_offsets,
198            offset_within_chunk,
199        }
200    }
201
202    #[inline]
203    pub fn array_len(&self) -> usize {
204        self.array_len
205    }
206
207    #[inline]
208    pub fn num_patches(&self) -> usize {
209        self.indices.len()
210    }
211
212    #[inline]
213    pub fn dtype(&self) -> &DType {
214        self.values.dtype()
215    }
216
217    #[inline]
218    pub fn indices(&self) -> &ArrayRef {
219        &self.indices
220    }
221
222    #[inline]
223    pub fn into_indices(self) -> ArrayRef {
224        self.indices
225    }
226
227    #[inline]
228    pub fn indices_mut(&mut self) -> &mut ArrayRef {
229        &mut self.indices
230    }
231
232    #[inline]
233    pub fn values(&self) -> &ArrayRef {
234        &self.values
235    }
236
237    #[inline]
238    pub fn into_values(self) -> ArrayRef {
239        self.values
240    }
241
242    #[inline]
243    pub fn values_mut(&mut self) -> &mut ArrayRef {
244        &mut self.values
245    }
246
247    #[inline]
248    pub fn offset(&self) -> usize {
249        self.offset
250    }
251
252    #[inline]
253    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
254        &self.chunk_offsets
255    }
256
257    #[inline]
258    pub fn chunk_offset_at(&self, idx: usize) -> usize {
259        let Some(chunk_offsets) = &self.chunk_offsets else {
260            vortex_panic!("chunk_offsets must be set to retrieve offset at index")
261        };
262
263        chunk_offsets
264            .scalar_at(idx)
265            .as_primitive()
266            .as_::<usize>()
267            .vortex_expect("chunk offset must be usize")
268    }
269
270    #[inline]
271    pub fn offset_within_chunk(&self) -> Option<usize> {
272        self.offset_within_chunk
273    }
274
275    #[inline]
276    pub fn indices_ptype(&self) -> PType {
277        PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
278    }
279
280    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
281        if self.indices.len() > len {
282            vortex_bail!(
283                "Patch indices {} are longer than the array length {}",
284                self.indices.len(),
285                len
286            );
287        }
288        if self.values.dtype() != dtype {
289            vortex_bail!(
290                "Patch values dtype {} does not match array dtype {}",
291                self.values.dtype(),
292                dtype
293            );
294        }
295        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
296        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
297
298        Ok(PatchesMetadata::new(
299            self.indices.len(),
300            self.offset,
301            self.indices.dtype().as_ptype(),
302            chunk_offsets_len,
303            chunk_offsets_ptype,
304            self.offset_within_chunk,
305        ))
306    }
307
308    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
309        // SAFETY: casting does not affect the relationship between the indices and values
310        unsafe {
311            Ok(Self::new_unchecked(
312                self.array_len,
313                self.offset,
314                self.indices,
315                cast(&self.values, values_dtype)?,
316                self.chunk_offsets,
317                self.offset_within_chunk,
318            ))
319        }
320    }
321
322    /// Get the patched value at a given index if it exists.
323    pub fn get_patched(&self, index: usize) -> Option<Scalar> {
324        self.search_index(index)
325            .to_found()
326            .map(|patch_idx| self.values().scalar_at(patch_idx))
327    }
328
329    /// Searches for `index` in the indices array.
330    ///
331    /// Chooses between chunked search when [`Self::chunk_offsets`] is
332    /// available, and binary search otherwise. The `index` parameter is
333    /// adjusted by [`Self::offset`] for both.
334    ///
335    /// # Arguments
336    /// * `index` - The index to search for
337    ///
338    /// # Returns
339    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
340    ///   position in the patches array
341    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
342    ///   a patch at this index would be inserted to maintain sorted order
343    pub fn search_index(&self, index: usize) -> SearchResult {
344        if self.chunk_offsets.is_some() {
345            return self.search_index_chunked(index);
346        }
347
348        Self::search_index_binary_search(&self.indices, index + self.offset)
349    }
350
351    /// Binary searches for `needle` in the indices array.
352    ///
353    /// # Returns
354    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
355    /// with the insertion point if not found.
356    fn search_index_binary_search(indices: &dyn Array, needle: usize) -> SearchResult {
357        if indices.is_canonical() {
358            let primitive = indices.to_primitive();
359            match_each_integer_ptype!(primitive.ptype(), |T| {
360                let Ok(needle) = T::try_from(needle) else {
361                    // If the needle is not of type T, then it cannot possibly be in this array.
362                    //
363                    // The needle is a non-negative integer (a usize); therefore, it must be larger
364                    // than all values in this array.
365                    return SearchResult::NotFound(primitive.len());
366                };
367                return primitive
368                    .as_slice::<T>()
369                    .search_sorted(&needle, SearchSortedSide::Left);
370            });
371        }
372        indices
373            .as_primitive_typed()
374            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
375    }
376
377    /// Constant time searches for `index` in the indices array.
378    ///
379    /// First determines which chunk the target index falls into, then performs
380    /// a binary search within that chunk's range.
381    ///
382    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
383    /// or the insertion point if not found.
384    ///
385    /// # Panics
386    /// Panics if `chunk_offsets` or `offset_within_chunk` are not set.
387    fn search_index_chunked(&self, index: usize) -> SearchResult {
388        let Some(chunk_offsets) = &self.chunk_offsets else {
389            vortex_panic!("chunk_offsets is required to be set")
390        };
391
392        let Some(offset_within_chunk) = self.offset_within_chunk else {
393            vortex_panic!("offset_within_chunk is required to be set")
394        };
395
396        if index >= self.array_len() {
397            return SearchResult::NotFound(self.indices().len());
398        }
399
400        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
401
402        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
403        let base_offset = self.chunk_offset_at(0);
404
405        let patches_start_idx = (self.chunk_offset_at(chunk_idx) - base_offset)
406            // Chunk offsets are only sliced off in case the slice is fully
407            // outside of the chunk range.
408            //
409            // Though the range for indices and values is sliced in terms of
410            // individual elements, not chunks. To account for that we do a
411            // saturating sub when adjusting the indices based on the chunk offset.
412            .saturating_sub(offset_within_chunk);
413
414        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
415            self.chunk_offset_at(chunk_idx + 1) - base_offset - offset_within_chunk
416        } else {
417            self.indices.len()
418        };
419
420        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx);
421        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset);
422
423        match result {
424            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
425            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
426        }
427    }
428
429    /// Batch version of `search_index`.
430    ///
431    /// In contrast to `search_index`, this function requires `indices` as
432    /// well as `chunk_offsets` to be passed as slices. This is to avoid
433    /// redundant canonicalization and `scalar_at` lookups across calls.
434    fn search_index_chunked_batch<T, O>(
435        &self,
436        indices: &[T],
437        chunk_offsets: &[O],
438        index: T,
439    ) -> SearchResult
440    where
441        T: UnsignedPType,
442        O: UnsignedPType,
443        usize: TryFrom<T>,
444        usize: TryFrom<O>,
445    {
446        let Some(offset_within_chunk) = self.offset_within_chunk else {
447            vortex_panic!("offset_within_chunk is required to be set")
448        };
449
450        let chunk_idx = {
451            let Ok(index) = usize::try_from(index) else {
452                // If the needle cannot be converted to usize, it's larger than all values in this array.
453                return SearchResult::NotFound(indices.len());
454            };
455
456            if index >= self.array_len() {
457                return SearchResult::NotFound(self.indices().len());
458            }
459
460            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
461        };
462
463        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
464        let Ok(chunk_offset) = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0]) else {
465            vortex_panic!("chunk_offset failed to convert to usize")
466        };
467
468        let patches_start_idx = chunk_offset
469            // Chunk offsets are only sliced off in case the slice is fully
470            // outside of the chunk range.
471            //
472            // Though the range for indices and values is sliced in terms of
473            // individual elements, not chunks. To account for that we do a
474            // saturating sub when adjusting the indices based on the chunk offset.
475            .saturating_sub(offset_within_chunk);
476
477        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
478            let base_offset_end = chunk_offsets[chunk_idx + 1];
479
480            let Some(offset_within_chunk) = O::from(offset_within_chunk) else {
481                vortex_panic!("offset_within_chunk failed to convert to O");
482            };
483
484            let Ok(patches_end_idx) =
485                usize::try_from(base_offset_end - chunk_offsets[0] - offset_within_chunk)
486            else {
487                vortex_panic!("patches_end_idx failed to convert to usize")
488            };
489
490            patches_end_idx
491        } else {
492            self.indices.len()
493        };
494
495        let Some(offset) = T::from(self.offset) else {
496            // If the offset cannot be converted to T, it's larger than all values in this array.
497            return SearchResult::NotFound(indices.len());
498        };
499
500        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
501        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left);
502
503        match result {
504            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
505            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
506        }
507    }
508
509    /// Returns the minimum patch index
510    pub fn min_index(&self) -> usize {
511        let first = self
512            .indices
513            .scalar_at(0)
514            .as_primitive()
515            .as_::<usize>()
516            .vortex_expect("non-null");
517        first - self.offset
518    }
519
520    /// Returns the maximum patch index
521    pub fn max_index(&self) -> usize {
522        let last = self
523            .indices
524            .scalar_at(self.indices.len() - 1)
525            .as_primitive()
526            .as_::<usize>()
527            .vortex_expect("non-null");
528        last - self.offset
529    }
530
531    /// Filter the patches by a mask, resulting in new patches for the filtered array.
532    pub fn filter(&self, mask: &Mask) -> VortexResult<Option<Self>> {
533        if mask.len() != self.array_len {
534            vortex_bail!(
535                "Filter mask length {} does not match array length {}",
536                mask.len(),
537                self.array_len
538            );
539        }
540
541        match mask.indices() {
542            AllOr::All => Ok(Some(self.clone())),
543            AllOr::None => Ok(None),
544            AllOr::Some(mask_indices) => {
545                let flat_indices = self.indices().to_primitive();
546                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
547                    filter_patches_with_mask(
548                        flat_indices.as_slice::<I>(),
549                        self.offset(),
550                        self.values(),
551                        mask_indices,
552                    )
553                })
554            }
555        }
556    }
557
558    /// Mask the patches, REMOVING the patches where the mask is true.
559    /// Unlike filter, this preserves the patch indices.
560    /// Unlike mask on a single array, this does not set masked values to null.
561    pub fn mask(&self, mask: &Mask) -> VortexResult<Option<Self>> {
562        if mask.len() != self.array_len {
563            vortex_bail!(
564                "Filter mask length {} does not match array length {}",
565                mask.len(),
566                self.array_len
567            );
568        }
569
570        let filter_mask = match mask.bit_buffer() {
571            AllOr::All => return Ok(None),
572            AllOr::None => return Ok(Some(self.clone())),
573            AllOr::Some(masked) => {
574                let patch_indices = self.indices().to_primitive();
575                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
576                    let patch_indices = patch_indices.as_slice::<P>();
577                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
578                        #[allow(clippy::cast_possible_truncation)]
579                        let idx = (patch_indices[i] as usize) - self.offset;
580                        !masked.value(idx)
581                    }))
582                })
583            }
584        };
585
586        if filter_mask.all_false() {
587            return Ok(None);
588        }
589
590        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
591        let filtered_indices = filter(&self.indices, &filter_mask)?;
592        let filtered_values = filter(&self.values, &filter_mask)?;
593
594        Ok(Some(Self {
595            array_len: self.array_len,
596            offset: self.offset,
597            indices: filtered_indices,
598            values: filtered_values,
599            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
600            chunk_offsets: None,
601            offset_within_chunk: self.offset_within_chunk,
602        }))
603    }
604
605    /// Slice the patches by a range of the patched array.
606    pub fn slice(&self, range: Range<usize>) -> Option<Self> {
607        let slice_start_idx = self.search_index(range.start).to_index();
608        let slice_end_idx = self.search_index(range.end).to_index();
609
610        if slice_start_idx == slice_end_idx {
611            return None;
612        }
613
614        let values = self.values().slice(slice_start_idx..slice_end_idx);
615        let indices = self.indices().slice(slice_start_idx..slice_end_idx);
616
617        let chunk_offsets = self.chunk_offsets.as_ref().map(|chunk_offsets| {
618            let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
619            let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
620            let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
621            chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
622        });
623
624        let offset_within_chunk = chunk_offsets.as_ref().map(|chunk_offsets| {
625            let base_offset = chunk_offsets
626                .scalar_at(0)
627                .as_primitive()
628                .as_::<usize>()
629                .vortex_expect("chunk offset must be usize");
630            slice_start_idx - base_offset
631        });
632
633        Some(Self {
634            array_len: range.len(),
635            offset: range.start + self.offset(),
636            indices,
637            values,
638            chunk_offsets,
639            offset_within_chunk,
640        })
641    }
642
643    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
644    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
645
646    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
647        (self.num_patches() as f64 / take_indices.len() as f64)
648            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
649    }
650
651    /// Take the indices from the patches
652    ///
653    /// Any nulls in take_indices are added to the resulting patches.
654    pub fn take_with_nulls(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
655        if take_indices.is_empty() {
656            return Ok(None);
657        }
658
659        let take_indices = take_indices.to_primitive();
660        if self.is_map_faster_than_search(&take_indices) {
661            self.take_map(take_indices, true)
662        } else {
663            self.take_search(take_indices, true)
664        }
665    }
666
667    /// Take the indices from the patches.
668    ///
669    /// Any nulls in take_indices are ignored.
670    pub fn take(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
671        if take_indices.is_empty() {
672            return Ok(None);
673        }
674
675        let take_indices = take_indices.to_primitive();
676        if self.is_map_faster_than_search(&take_indices) {
677            self.take_map(take_indices, false)
678        } else {
679            self.take_search(take_indices, false)
680        }
681    }
682
683    #[allow(clippy::cognitive_complexity)]
684    pub fn take_search(
685        &self,
686        take_indices: PrimitiveArray,
687        include_nulls: bool,
688    ) -> VortexResult<Option<Self>> {
689        let take_indices_validity = take_indices.validity();
690        let patch_indices = self.indices.to_primitive();
691        let chunk_offsets = self.chunk_offsets().as_ref().map(|co| co.to_primitive());
692
693        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
694            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
695                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
696                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
697                    let take_slice = take_indices.as_slice::<TakeT>();
698
699                    if let Some(chunk_offsets) = chunk_offsets {
700                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
701                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
702                            take_indices_with_search_fn(
703                                patch_indices_slice,
704                                take_slice,
705                                take_indices.validity_mask(),
706                                include_nulls,
707                                |take_idx| {
708                                    self.search_index_chunked_batch(
709                                        patch_indices_slice,
710                                        chunk_offsets,
711                                        take_idx,
712                                    )
713                                },
714                            )
715                        })
716                    } else {
717                        take_indices_with_search_fn(
718                            patch_indices_slice,
719                            take_slice,
720                            take_indices.validity_mask(),
721                            include_nulls,
722                            |take_idx| {
723                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
724                                    // If the offset cannot be converted to T, it's larger than all values in this array.
725                                    return SearchResult::NotFound(patch_indices_slice.len());
726                                };
727
728                                patch_indices_slice
729                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
730                            },
731                        )
732                    }
733                })
734            });
735
736        if new_indices.is_empty() {
737            return Ok(None);
738        }
739
740        let new_indices = new_indices.into_array();
741        let new_array_len = take_indices.len();
742        let values_validity = take_indices_validity.take(&new_indices)?;
743
744        Ok(Some(Self {
745            array_len: new_array_len,
746            offset: 0,
747            indices: new_indices.clone(),
748            values: take(
749                self.values(),
750                &PrimitiveArray::new(values_indices, values_validity).into_array(),
751            )?,
752            chunk_offsets: None,
753            offset_within_chunk: Some(0), // Reset when creating new Patches.
754        }))
755    }
756
757    pub fn take_map(
758        &self,
759        take_indices: PrimitiveArray,
760        include_nulls: bool,
761    ) -> VortexResult<Option<Self>> {
762        let indices = self.indices.to_primitive();
763        let new_length = take_indices.len();
764
765        let Some((new_sparse_indices, value_indices)) =
766            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
767                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
768                    take_map::<_, TakeIndices>(
769                        indices.as_slice::<Indices>(),
770                        take_indices,
771                        self.offset(),
772                        self.min_index(),
773                        self.max_index(),
774                        include_nulls,
775                    )?
776                })
777            })
778        else {
779            return Ok(None);
780        };
781
782        let taken_values = take(self.values(), &value_indices)?;
783
784        Ok(Some(Patches {
785            array_len: new_length,
786            offset: 0,
787            indices: new_sparse_indices,
788            values: taken_values,
789            // TODO(0ax1): Chunk offsets are invalid after take is applied.
790            chunk_offsets: None,
791            offset_within_chunk: self.offset_within_chunk,
792        }))
793    }
794
795    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
796    where
797        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
798    {
799        let values = f(self.values)?;
800        if self.indices.len() != values.len() {
801            vortex_bail!(
802                "map_values must preserve length: expected {} received {}",
803                self.indices.len(),
804                values.len()
805            )
806        }
807
808        Ok(Self {
809            array_len: self.array_len,
810            offset: self.offset,
811            indices: self.indices,
812            values,
813            chunk_offsets: self.chunk_offsets,
814            offset_within_chunk: self.offset_within_chunk,
815        })
816    }
817}
818
819fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
820    indices: &[I],
821    take_indices: PrimitiveArray,
822    indices_offset: usize,
823    min_index: usize,
824    max_index: usize,
825    include_nulls: bool,
826) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
827where
828    usize: TryFrom<T>,
829    VortexError: From<<I as TryFrom<usize>>::Error>,
830{
831    let take_indices_validity = take_indices.validity();
832    let take_indices = take_indices.as_slice::<T>();
833    let offset_i = I::try_from(indices_offset)?;
834
835    let sparse_index_to_value_index: HashMap<I, usize> = indices
836        .iter()
837        .copied()
838        .map(|idx| idx - offset_i)
839        .enumerate()
840        .map(|(value_index, sparse_index)| (sparse_index, value_index))
841        .collect();
842
843    let (new_sparse_indices, value_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
844        .iter()
845        .copied()
846        .map(usize::try_from)
847        .process_results(|iter| {
848            iter.enumerate()
849                .filter_map(|(idx_in_take, ti)| {
850                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
851                    if include_nulls && take_indices_validity.is_null(idx_in_take) {
852                        Some((idx_in_take as u64, 0))
853                    } else if ti < min_index || ti > max_index {
854                        None
855                    } else {
856                        sparse_index_to_value_index
857                            .get(
858                                &I::try_from(ti)
859                                    .vortex_expect("take index is between min and max index"),
860                            )
861                            .map(|value_index| (idx_in_take as u64, *value_index as u64))
862                    }
863                })
864                .unzip()
865        })
866        .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
867
868    if new_sparse_indices.is_empty() {
869        return Ok(None);
870    }
871
872    let new_sparse_indices = new_sparse_indices.into_array();
873    let values_validity = take_indices_validity.take(&new_sparse_indices)?;
874    Ok(Some((
875        new_sparse_indices,
876        PrimitiveArray::new(value_indices, values_validity).into_array(),
877    )))
878}
879
880/// Filter patches with the provided mask (in flattened space).
881///
882/// The filter mask may contain indices that are non-patched. The return value of this function
883/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
884/// patch values.
885fn filter_patches_with_mask<T: IntegerPType>(
886    patch_indices: &[T],
887    offset: usize,
888    patch_values: &dyn Array,
889    mask_indices: &[usize],
890) -> VortexResult<Option<Patches>> {
891    let true_count = mask_indices.len();
892    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
893    let mut new_mask_indices = Vec::with_capacity(true_count);
894
895    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
896    // the patches are relatively sparse compared to the overall mask, and so many indices in the
897    // mask will end up being skipped.
898    const STRIDE: usize = 4;
899
900    let mut mask_idx = 0usize;
901    let mut true_idx = 0usize;
902
903    while mask_idx < patch_indices.len() && true_idx < true_count {
904        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
905        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
906        //  the mask (which covers both patch and non-patch values of the parent array), and so to
907        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
908        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
909        //  fallback to performing a two-way iterator merge.
910        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
911            // Load a vector of each into our registers.
912            let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min") - offset;
913            let left_max = patch_indices[mask_idx + STRIDE]
914                .to_usize()
915                .vortex_expect("left_max")
916                - offset;
917            let right_min = mask_indices[true_idx];
918            let right_max = mask_indices[true_idx + STRIDE];
919
920            if left_min > right_max {
921                // Advance right side
922                true_idx += STRIDE;
923                continue;
924            } else if right_min > left_max {
925                mask_idx += STRIDE;
926                continue;
927            } else {
928                // Fallthrough to direct comparison path.
929            }
930        }
931
932        // Two-way sorted iterator merge:
933
934        let left = patch_indices[mask_idx].to_usize().vortex_expect("left") - offset;
935        let right = mask_indices[true_idx];
936
937        match left.cmp(&right) {
938            Ordering::Less => {
939                mask_idx += 1;
940            }
941            Ordering::Greater => {
942                true_idx += 1;
943            }
944            Ordering::Equal => {
945                // Save the mask index as well as the positional index.
946                new_mask_indices.push(mask_idx);
947                new_patch_indices.push(true_idx as u64);
948
949                mask_idx += 1;
950                true_idx += 1;
951            }
952        }
953    }
954
955    if new_mask_indices.is_empty() {
956        return Ok(None);
957    }
958
959    let new_patch_indices = new_patch_indices.into_array();
960    let new_patch_values = filter(
961        patch_values,
962        &Mask::from_indices(patch_values.len(), new_mask_indices),
963    )?;
964
965    Ok(Some(Patches::new(
966        true_count,
967        0,
968        new_patch_indices,
969        new_patch_values,
970        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
971        None,
972    )))
973}
974
975fn take_indices_with_search_fn<I: UnsignedPType, T: IntegerPType, F: Fn(I) -> SearchResult>(
976    indices: &[I],
977    take_indices: &[T],
978    take_validity: Mask,
979    include_nulls: bool,
980    search_fn: F,
981) -> (BufferMut<u64>, BufferMut<u64>) {
982    take_indices
983        .iter()
984        .enumerate()
985        .filter_map(|(new_patch_idx, &take_idx)| {
986            if include_nulls && !take_validity.value(new_patch_idx) {
987                // For nulls, patch index doesn't matter - use 0 for consistency
988                Some((0u64, new_patch_idx as u64))
989            } else {
990                let search_result = I::from(take_idx)
991                    .map(&search_fn)
992                    .unwrap_or(SearchResult::NotFound(indices.len()));
993
994                search_result
995                    .to_found()
996                    .map(|patch_idx| (patch_idx as u64, new_patch_idx as u64))
997            }
998        })
999        .unzip()
1000}
1001
1002#[cfg(test)]
1003mod test {
1004    use vortex_buffer::{BufferMut, buffer};
1005    use vortex_mask::Mask;
1006
1007    use crate::arrays::PrimitiveArray;
1008    use crate::patches::Patches;
1009    use crate::search_sorted::SearchResult;
1010    use crate::validity::Validity;
1011    use crate::{IntoArray, ToCanonical};
1012
1013    #[test]
1014    fn test_filter() {
1015        let patches = Patches::new(
1016            100,
1017            0,
1018            buffer![10u32, 11, 20].into_array(),
1019            buffer![100, 110, 200].into_array(),
1020            None,
1021        );
1022
1023        let filtered = patches
1024            .filter(&Mask::from_indices(100, vec![10, 20, 30]))
1025            .unwrap()
1026            .unwrap();
1027
1028        let indices = filtered.indices().to_primitive();
1029        let values = filtered.values().to_primitive();
1030        assert_eq!(indices.as_slice::<u64>(), &[0, 1]);
1031        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1032    }
1033
1034    #[test]
1035    fn take_with_nulls() {
1036        let patches = Patches::new(
1037            20,
1038            0,
1039            buffer![2u64, 9, 15].into_array(),
1040            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1041            None,
1042        );
1043
1044        let taken = patches
1045            .take(
1046                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1047                    .into_array(),
1048            )
1049            .unwrap()
1050            .unwrap();
1051        let primitive_values = taken.values().to_primitive();
1052        let primitive_indices = taken.indices().to_primitive();
1053        assert_eq!(taken.array_len(), 2);
1054        assert_eq!(primitive_values.as_slice::<i32>(), [44]);
1055        assert_eq!(primitive_indices.as_slice::<u64>(), [0]);
1056        assert_eq!(
1057            primitive_values.validity_mask(),
1058            Mask::from_iter(vec![true])
1059        );
1060    }
1061
1062    #[test]
1063    fn take_search_with_nulls_chunked() {
1064        let patches = Patches::new(
1065            20,
1066            0,
1067            buffer![2u64, 9, 15].into_array(),
1068            buffer![33_i32, 44, 55].into_array(),
1069            Some(buffer![0u64].into_array()),
1070        );
1071
1072        let taken = patches
1073            .take_search(
1074                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1075                true,
1076            )
1077            .unwrap()
1078            .unwrap();
1079
1080        let primitive_values = taken.values().to_primitive();
1081        let primitive_indices = taken.indices().to_primitive();
1082        assert_eq!(taken.array_len(), 2);
1083        assert_eq!(primitive_values.as_slice::<i32>(), [44, 33]);
1084        assert_eq!(primitive_indices.as_slice::<u64>(), [0, 1]);
1085
1086        assert_eq!(
1087            primitive_values.validity_mask(),
1088            Mask::from_iter([true, false])
1089        );
1090    }
1091
1092    #[test]
1093    fn take_search_chunked_multiple_chunks() {
1094        let patches = Patches::new(
1095            2048,
1096            0,
1097            buffer![100u64, 500, 1200, 1800].into_array(),
1098            buffer![10_i32, 20, 30, 40].into_array(),
1099            Some(buffer![0u64, 2].into_array()),
1100        );
1101
1102        let taken = patches
1103            .take_search(
1104                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1105                true,
1106            )
1107            .unwrap()
1108            .unwrap();
1109
1110        let primitive_values = taken.values().to_primitive();
1111        assert_eq!(taken.array_len(), 3);
1112        assert_eq!(primitive_values.as_slice::<i32>(), [20, 30]);
1113    }
1114
1115    #[test]
1116    fn take_search_chunked_indices_with_no_patches() {
1117        let patches = Patches::new(
1118            20,
1119            0,
1120            buffer![2u64, 9, 15].into_array(),
1121            buffer![33_i32, 44, 55].into_array(),
1122            Some(buffer![0u64].into_array()),
1123        );
1124
1125        let taken = patches
1126            .take_search(
1127                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1128                true,
1129            )
1130            .unwrap();
1131
1132        assert!(taken.is_none());
1133    }
1134
1135    #[test]
1136    fn take_search_chunked_interleaved() {
1137        let patches = Patches::new(
1138            30,
1139            0,
1140            buffer![5u64, 10, 20, 25].into_array(),
1141            buffer![100_i32, 200, 300, 400].into_array(),
1142            Some(buffer![0u64].into_array()),
1143        );
1144
1145        let taken = patches
1146            .take_search(
1147                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1148                true,
1149            )
1150            .unwrap()
1151            .unwrap();
1152
1153        let primitive_values = taken.values().to_primitive();
1154        assert_eq!(taken.array_len(), 4);
1155        assert_eq!(primitive_values.as_slice::<i32>(), [200, 300]);
1156    }
1157
1158    #[test]
1159    fn test_take_search_multiple_chunk_offsets() {
1160        let patches = Patches::new(
1161            1500,
1162            0,
1163            BufferMut::from_iter(0..1500u64).into_array(),
1164            BufferMut::from_iter(0..1500i32).into_array(),
1165            Some(buffer![0u64, 1024u64].into_array()),
1166        );
1167
1168        let taken = patches
1169            .take_search(
1170                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1171                false,
1172            )
1173            .unwrap()
1174            .unwrap();
1175
1176        assert_eq!(taken.array_len(), 1500);
1177    }
1178
1179    #[test]
1180    fn test_slice() {
1181        let values = buffer![15_u32, 135, 13531, 42].into_array();
1182        let indices = buffer![10_u64, 11, 50, 100].into_array();
1183
1184        let patches = Patches::new(101, 0, indices, values, None);
1185
1186        let sliced = patches.slice(15..100).unwrap();
1187        assert_eq!(sliced.array_len(), 100 - 15);
1188        let primitive = sliced.values().to_primitive();
1189
1190        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1191    }
1192
1193    #[test]
1194    fn doubly_sliced() {
1195        let values = buffer![15_u32, 135, 13531, 42].into_array();
1196        let indices = buffer![10_u64, 11, 50, 100].into_array();
1197
1198        let patches = Patches::new(101, 0, indices, values, None);
1199
1200        let sliced = patches.slice(15..100).unwrap();
1201        assert_eq!(sliced.array_len(), 100 - 15);
1202        let primitive = sliced.values().to_primitive();
1203
1204        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1205
1206        let doubly_sliced = sliced.slice(35..36).unwrap();
1207        let primitive_doubly_sliced = doubly_sliced.values().to_primitive();
1208
1209        assert_eq!(primitive_doubly_sliced.as_slice::<u32>(), &[13531]);
1210    }
1211
1212    #[test]
1213    fn test_mask_all_true() {
1214        let patches = Patches::new(
1215            10,
1216            0,
1217            buffer![2u64, 5, 8].into_array(),
1218            buffer![100i32, 200, 300].into_array(),
1219            None,
1220        );
1221
1222        let mask = Mask::new_true(10);
1223        let masked = patches.mask(&mask).unwrap();
1224        assert!(masked.is_none());
1225    }
1226
1227    #[test]
1228    fn test_mask_all_false() {
1229        let patches = Patches::new(
1230            10,
1231            0,
1232            buffer![2u64, 5, 8].into_array(),
1233            buffer![100i32, 200, 300].into_array(),
1234            None,
1235        );
1236
1237        let mask = Mask::new_false(10);
1238        let masked = patches.mask(&mask).unwrap().unwrap();
1239
1240        // No patch values should be masked
1241        let masked_values = masked.values().to_primitive();
1242        assert_eq!(masked_values.as_slice::<i32>(), &[100, 200, 300]);
1243        assert!(masked_values.is_valid(0));
1244        assert!(masked_values.is_valid(1));
1245        assert!(masked_values.is_valid(2));
1246
1247        // Indices should remain unchanged
1248        let indices = masked.indices().to_primitive();
1249        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1250    }
1251
1252    #[test]
1253    fn test_mask_partial() {
1254        let patches = Patches::new(
1255            10,
1256            0,
1257            buffer![2u64, 5, 8].into_array(),
1258            buffer![100i32, 200, 300].into_array(),
1259            None,
1260        );
1261
1262        // Mask that removes patches at indices 2 and 8 (but not 5)
1263        let mask = Mask::from_iter([
1264            false, false, true, false, false, false, false, false, true, false,
1265        ]);
1266        let masked = patches.mask(&mask).unwrap().unwrap();
1267
1268        // Only the patch at index 5 should remain
1269        let masked_values = masked.values().to_primitive();
1270        assert_eq!(masked_values.len(), 1);
1271        assert_eq!(masked_values.as_slice::<i32>(), &[200]);
1272
1273        // Only index 5 should remain
1274        let indices = masked.indices().to_primitive();
1275        assert_eq!(indices.as_slice::<u64>(), &[5]);
1276    }
1277
1278    #[test]
1279    fn test_mask_with_offset() {
1280        let patches = Patches::new(
1281            10,
1282            5,                                  // offset
1283            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1284            buffer![100i32, 200, 300].into_array(),
1285            None,
1286        );
1287
1288        // Mask that sets actual index 2 to null
1289        let mask = Mask::from_iter([
1290            false, false, true, false, false, false, false, false, false, false,
1291        ]);
1292
1293        let masked = patches.mask(&mask).unwrap().unwrap();
1294        assert_eq!(masked.array_len(), 10);
1295        assert_eq!(masked.offset(), 5);
1296        let indices = masked.indices().to_primitive();
1297        assert_eq!(indices.as_slice::<u64>(), &[10, 13]);
1298        let masked_values = masked.values().to_primitive();
1299        assert_eq!(masked_values.as_slice::<i32>(), &[200, 300]);
1300    }
1301
1302    #[test]
1303    fn test_mask_nullable_values() {
1304        let patches = Patches::new(
1305            10,
1306            0,
1307            buffer![2u64, 5, 8].into_array(),
1308            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1309            None,
1310        );
1311
1312        // Test masking removes patch at index 2
1313        let mask = Mask::from_iter([
1314            false, false, true, false, false, false, false, false, false, false,
1315        ]);
1316        let masked = patches.mask(&mask).unwrap().unwrap();
1317
1318        // Patches at indices 5 and 8 should remain
1319        let indices = masked.indices().to_primitive();
1320        assert_eq!(indices.as_slice::<u64>(), &[5, 8]);
1321
1322        // Values should be the null and 300
1323        let masked_values = masked.values().to_primitive();
1324        assert_eq!(masked_values.len(), 2);
1325        assert!(!masked_values.is_valid(0)); // the null value at index 5
1326        assert!(masked_values.is_valid(1)); // the 300 value at index 8
1327        assert_eq!(i32::try_from(&masked_values.scalar_at(1)).unwrap(), 300i32);
1328    }
1329
1330    #[test]
1331    fn test_filter_keep_all() {
1332        let patches = Patches::new(
1333            10,
1334            0,
1335            buffer![2u64, 5, 8].into_array(),
1336            buffer![100i32, 200, 300].into_array(),
1337            None,
1338        );
1339
1340        // Keep all indices (mask with indices 0-9)
1341        let mask = Mask::from_indices(10, (0..10).collect());
1342        let filtered = patches.filter(&mask).unwrap().unwrap();
1343
1344        let indices = filtered.indices().to_primitive();
1345        let values = filtered.values().to_primitive();
1346        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1347        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1348    }
1349
1350    #[test]
1351    fn test_filter_none() {
1352        let patches = Patches::new(
1353            10,
1354            0,
1355            buffer![2u64, 5, 8].into_array(),
1356            buffer![100i32, 200, 300].into_array(),
1357            None,
1358        );
1359
1360        // Filter out all (empty mask means keep nothing)
1361        let mask = Mask::from_indices(10, vec![]);
1362        let filtered = patches.filter(&mask).unwrap();
1363        assert!(filtered.is_none());
1364    }
1365
1366    #[test]
1367    fn test_filter_with_indices() {
1368        let patches = Patches::new(
1369            10,
1370            0,
1371            buffer![2u64, 5, 8].into_array(),
1372            buffer![100i32, 200, 300].into_array(),
1373            None,
1374        );
1375
1376        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1377        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1378        let filtered = patches.filter(&mask).unwrap().unwrap();
1379
1380        let indices = filtered.indices().to_primitive();
1381        let values = filtered.values().to_primitive();
1382        assert_eq!(indices.as_slice::<u64>(), &[0, 1]); // Adjusted indices
1383        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1384    }
1385
1386    #[test]
1387    fn test_slice_full_range() {
1388        let patches = Patches::new(
1389            10,
1390            0,
1391            buffer![2u64, 5, 8].into_array(),
1392            buffer![100i32, 200, 300].into_array(),
1393            None,
1394        );
1395
1396        let sliced = patches.slice(0..10).unwrap();
1397
1398        let indices = sliced.indices().to_primitive();
1399        let values = sliced.values().to_primitive();
1400        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1401        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1402    }
1403
1404    #[test]
1405    fn test_slice_partial() {
1406        let patches = Patches::new(
1407            10,
1408            0,
1409            buffer![2u64, 5, 8].into_array(),
1410            buffer![100i32, 200, 300].into_array(),
1411            None,
1412        );
1413
1414        // Slice from 3 to 8 (includes patch at 5)
1415        let sliced = patches.slice(3..8).unwrap();
1416
1417        let indices = sliced.indices().to_primitive();
1418        let values = sliced.values().to_primitive();
1419        assert_eq!(indices.as_slice::<u64>(), &[5]); // Index stays the same
1420        assert_eq!(values.as_slice::<i32>(), &[200]);
1421        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1422        assert_eq!(sliced.offset(), 3); // New offset
1423    }
1424
1425    #[test]
1426    fn test_slice_no_patches() {
1427        let patches = Patches::new(
1428            10,
1429            0,
1430            buffer![2u64, 5, 8].into_array(),
1431            buffer![100i32, 200, 300].into_array(),
1432            None,
1433        );
1434
1435        // Slice from 6 to 7 (no patches in this range)
1436        let sliced = patches.slice(6..7);
1437        assert!(sliced.is_none());
1438    }
1439
1440    #[test]
1441    fn test_slice_with_offset() {
1442        let patches = Patches::new(
1443            10,
1444            5,                                  // offset
1445            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1446            buffer![100i32, 200, 300].into_array(),
1447            None,
1448        );
1449
1450        // Slice from 3 to 8 (includes patch at actual index 5)
1451        let sliced = patches.slice(3..8).unwrap();
1452
1453        let indices = sliced.indices().to_primitive();
1454        let values = sliced.values().to_primitive();
1455        assert_eq!(indices.as_slice::<u64>(), &[10]); // Index stays the same (offset + 5 = 10)
1456        assert_eq!(values.as_slice::<i32>(), &[200]);
1457        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1458    }
1459
1460    #[test]
1461    fn test_patch_values() {
1462        let patches = Patches::new(
1463            10,
1464            0,
1465            buffer![2u64, 5, 8].into_array(),
1466            buffer![100i32, 200, 300].into_array(),
1467            None,
1468        );
1469
1470        let values = patches.values().to_primitive();
1471        assert_eq!(i32::try_from(&values.scalar_at(0)).unwrap(), 100i32);
1472        assert_eq!(i32::try_from(&values.scalar_at(1)).unwrap(), 200i32);
1473        assert_eq!(i32::try_from(&values.scalar_at(2)).unwrap(), 300i32);
1474    }
1475
1476    #[test]
1477    fn test_indices_range() {
1478        let patches = Patches::new(
1479            10,
1480            0,
1481            buffer![2u64, 5, 8].into_array(),
1482            buffer![100i32, 200, 300].into_array(),
1483            None,
1484        );
1485
1486        assert_eq!(patches.min_index(), 2);
1487        assert_eq!(patches.max_index(), 8);
1488    }
1489
1490    #[test]
1491    fn test_search_index() {
1492        let patches = Patches::new(
1493            10,
1494            0,
1495            buffer![2u64, 5, 8].into_array(),
1496            buffer![100i32, 200, 300].into_array(),
1497            None,
1498        );
1499
1500        // Search for exact indices
1501        assert_eq!(patches.search_index(2), SearchResult::Found(0));
1502        assert_eq!(patches.search_index(5), SearchResult::Found(1));
1503        assert_eq!(patches.search_index(8), SearchResult::Found(2));
1504
1505        // Search for non-patch indices
1506        assert_eq!(patches.search_index(0), SearchResult::NotFound(0));
1507        assert_eq!(patches.search_index(3), SearchResult::NotFound(1));
1508        assert_eq!(patches.search_index(6), SearchResult::NotFound(2));
1509        assert_eq!(patches.search_index(9), SearchResult::NotFound(3));
1510    }
1511
1512    #[test]
1513    fn test_mask_boundary_patches() {
1514        // Test masking patches at array boundaries
1515        let patches = Patches::new(
1516            10,
1517            0,
1518            buffer![0u64, 9].into_array(),
1519            buffer![100i32, 200].into_array(),
1520            None,
1521        );
1522
1523        let mask = Mask::from_iter([
1524            true, false, false, false, false, false, false, false, false, false,
1525        ]);
1526        let masked = patches.mask(&mask).unwrap();
1527        assert!(masked.is_some());
1528        let masked = masked.unwrap();
1529        let indices = masked.indices().to_primitive();
1530        assert_eq!(indices.as_slice::<u64>(), &[9]);
1531        let values = masked.values().to_primitive();
1532        assert_eq!(values.as_slice::<i32>(), &[200]);
1533    }
1534
1535    #[test]
1536    fn test_mask_all_patches_removed() {
1537        // Test when all patches are masked out
1538        let patches = Patches::new(
1539            10,
1540            0,
1541            buffer![2u64, 5, 8].into_array(),
1542            buffer![100i32, 200, 300].into_array(),
1543            None,
1544        );
1545
1546        // Mask that removes all patches
1547        let mask = Mask::from_iter([
1548            false, false, true, false, false, true, false, false, true, false,
1549        ]);
1550        let masked = patches.mask(&mask).unwrap();
1551        assert!(masked.is_none());
1552    }
1553
1554    #[test]
1555    fn test_mask_no_patches_removed() {
1556        // Test when no patches are masked
1557        let patches = Patches::new(
1558            10,
1559            0,
1560            buffer![2u64, 5, 8].into_array(),
1561            buffer![100i32, 200, 300].into_array(),
1562            None,
1563        );
1564
1565        // Mask that doesn't affect any patches
1566        let mask = Mask::from_iter([
1567            true, false, false, true, false, false, true, false, false, true,
1568        ]);
1569        let masked = patches.mask(&mask).unwrap().unwrap();
1570
1571        let indices = masked.indices().to_primitive();
1572        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1573        let values = masked.values().to_primitive();
1574        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1575    }
1576
1577    #[test]
1578    fn test_mask_single_patch() {
1579        // Test with a single patch
1580        let patches = Patches::new(
1581            5,
1582            0,
1583            buffer![2u64].into_array(),
1584            buffer![42i32].into_array(),
1585            None,
1586        );
1587
1588        // Mask that removes the single patch
1589        let mask = Mask::from_iter([false, false, true, false, false]);
1590        let masked = patches.mask(&mask).unwrap();
1591        assert!(masked.is_none());
1592
1593        // Mask that keeps the single patch
1594        let mask = Mask::from_iter([true, false, false, true, false]);
1595        let masked = patches.mask(&mask).unwrap().unwrap();
1596        let indices = masked.indices().to_primitive();
1597        assert_eq!(indices.as_slice::<u64>(), &[2]);
1598    }
1599
1600    #[test]
1601    fn test_mask_contiguous_patches() {
1602        // Test with contiguous patches
1603        let patches = Patches::new(
1604            10,
1605            0,
1606            buffer![3u64, 4, 5, 6].into_array(),
1607            buffer![100i32, 200, 300, 400].into_array(),
1608            None,
1609        );
1610
1611        // Mask that removes middle patches
1612        let mask = Mask::from_iter([
1613            false, false, false, false, true, true, false, false, false, false,
1614        ]);
1615        let masked = patches.mask(&mask).unwrap().unwrap();
1616
1617        let indices = masked.indices().to_primitive();
1618        assert_eq!(indices.as_slice::<u64>(), &[3, 6]);
1619        let values = masked.values().to_primitive();
1620        assert_eq!(values.as_slice::<i32>(), &[100, 400]);
1621    }
1622
1623    #[test]
1624    fn test_mask_with_large_offset() {
1625        // Test with a large offset that shifts all indices
1626        let patches = Patches::new(
1627            20,
1628            15,
1629            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1630            buffer![100i32, 200, 300].into_array(),
1631            None,
1632        );
1633
1634        // Mask that removes the patch at actual index 2
1635        let mask = Mask::from_iter([
1636            false, false, true, false, false, false, false, false, false, false, false, false,
1637            false, false, false, false, false, false, false, false,
1638        ]);
1639        let masked = patches.mask(&mask).unwrap().unwrap();
1640
1641        let indices = masked.indices().to_primitive();
1642        assert_eq!(indices.as_slice::<u64>(), &[16, 19]);
1643        let values = masked.values().to_primitive();
1644        assert_eq!(values.as_slice::<i32>(), &[100, 300]);
1645    }
1646
1647    #[test]
1648    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1649    fn test_mask_wrong_length() {
1650        let patches = Patches::new(
1651            10,
1652            0,
1653            buffer![2u64, 5, 8].into_array(),
1654            buffer![100i32, 200, 300].into_array(),
1655            None,
1656        );
1657
1658        // Mask with wrong length
1659        let mask = Mask::from_iter([false, false, true, false, false]);
1660        patches.mask(&mask).unwrap();
1661    }
1662
1663    #[test]
1664    fn test_chunk_offsets_search() {
1665        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1666        let values = buffer![10i32, 20, 30, 40].into_array();
1667        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1668        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets));
1669
1670        assert!(patches.chunk_offsets.is_some());
1671
1672        // chunk 0: patches at 100, 200
1673        assert_eq!(patches.search_index(100), SearchResult::Found(0));
1674        assert_eq!(patches.search_index(200), SearchResult::Found(1));
1675
1676        // chunks 1, 2: no patches
1677        assert_eq!(patches.search_index(1500), SearchResult::NotFound(2));
1678        assert_eq!(patches.search_index(2000), SearchResult::NotFound(2));
1679
1680        // chunk 3: patches at 3000, 3100
1681        assert_eq!(patches.search_index(3000), SearchResult::Found(2));
1682        assert_eq!(patches.search_index(3100), SearchResult::Found(3));
1683
1684        assert_eq!(patches.search_index(1024), SearchResult::NotFound(2));
1685    }
1686
1687    #[test]
1688    fn test_chunk_offsets_with_slice() {
1689        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1690        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1691        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1692        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1693
1694        let sliced = patches.slice(1000..2200).unwrap();
1695
1696        assert!(sliced.chunk_offsets.is_some());
1697        assert_eq!(sliced.offset(), 1000);
1698
1699        assert_eq!(sliced.search_index(200), SearchResult::Found(0));
1700        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1701        assert_eq!(sliced.search_index(1100), SearchResult::Found(4));
1702
1703        assert_eq!(sliced.search_index(250), SearchResult::NotFound(1));
1704    }
1705
1706    #[test]
1707    fn test_chunk_offsets_with_slice_after_first_chunk() {
1708        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1709        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1710        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1711        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1712
1713        let sliced = patches.slice(1300..2200).unwrap();
1714
1715        assert!(sliced.chunk_offsets.is_some());
1716        assert_eq!(sliced.offset(), 1300);
1717
1718        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1719        assert_eq!(sliced.search_index(200), SearchResult::Found(1));
1720        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1721        assert_eq!(sliced.search_index(250), SearchResult::NotFound(2));
1722        assert_eq!(sliced.search_index(900), SearchResult::NotFound(4));
1723    }
1724
1725    #[test]
1726    fn test_chunk_offsets_slice_empty_result() {
1727        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1728        let values = buffer![10i32, 20, 30, 40].into_array();
1729        let chunk_offsets = buffer![0u64, 2].into_array();
1730        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets));
1731
1732        let sliced = patches.slice(1000..2000);
1733        assert!(sliced.is_none());
1734    }
1735
1736    #[test]
1737    fn test_chunk_offsets_slice_single_patch() {
1738        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
1739        let values = buffer![10i32, 20, 30, 40].into_array();
1740        let chunk_offsets = buffer![0u64, 1, 3].into_array();
1741        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1742
1743        let sliced = patches.slice(1100..1250).unwrap();
1744
1745        assert_eq!(sliced.num_patches(), 1);
1746        assert_eq!(sliced.offset(), 1100);
1747        assert_eq!(sliced.search_index(100), SearchResult::Found(0)); // 1200 - 1100 = 100
1748        assert_eq!(sliced.search_index(50), SearchResult::NotFound(0));
1749        assert_eq!(sliced.search_index(150), SearchResult::NotFound(1));
1750    }
1751
1752    #[test]
1753    fn test_chunk_offsets_slice_across_chunks() {
1754        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
1755        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1756        let chunk_offsets = buffer![0u64, 2, 4].into_array();
1757        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1758
1759        let sliced = patches.slice(150..2150).unwrap();
1760
1761        assert_eq!(sliced.num_patches(), 4);
1762        assert_eq!(sliced.offset(), 150);
1763
1764        assert_eq!(sliced.search_index(50), SearchResult::Found(0)); // 200 - 150 = 50
1765        assert_eq!(sliced.search_index(950), SearchResult::Found(1)); // 1100 - 150 = 950
1766        assert_eq!(sliced.search_index(1050), SearchResult::Found(2)); // 1200 - 150 = 1050
1767        assert_eq!(sliced.search_index(1950), SearchResult::Found(3)); // 2100 - 150 = 1950
1768    }
1769
1770    #[test]
1771    fn test_chunk_offsets_boundary_searches() {
1772        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
1773        let values = buffer![10i32, 20, 30, 40, 50].into_array();
1774        let chunk_offsets = buffer![0u64, 1, 4].into_array();
1775        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1776
1777        assert_eq!(patches.search_index(1023), SearchResult::Found(0));
1778        assert_eq!(patches.search_index(1024), SearchResult::Found(1));
1779        assert_eq!(patches.search_index(1025), SearchResult::Found(2));
1780        assert_eq!(patches.search_index(2047), SearchResult::Found(3));
1781        assert_eq!(patches.search_index(2048), SearchResult::Found(4));
1782
1783        assert_eq!(patches.search_index(1022), SearchResult::NotFound(0));
1784        assert_eq!(patches.search_index(2046), SearchResult::NotFound(3));
1785    }
1786
1787    #[test]
1788    fn test_chunk_offsets_slice_edge_cases() {
1789        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
1790        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1791        let chunk_offsets = buffer![0u64, 3, 5].into_array();
1792        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1793
1794        // Slice at the very beginning
1795        let sliced = patches.slice(0..10).unwrap();
1796        assert_eq!(sliced.num_patches(), 2);
1797        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1798        assert_eq!(sliced.search_index(1), SearchResult::Found(1));
1799
1800        // Slice at the very end
1801        let sliced = patches.slice(2040..3000).unwrap();
1802        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
1803        assert_eq!(sliced.search_index(7), SearchResult::Found(0)); // 2047 - 2040
1804        assert_eq!(sliced.search_index(8), SearchResult::Found(1)); // 2048 - 2040
1805    }
1806
1807    #[test]
1808    fn test_chunk_offsets_slice_nested() {
1809        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
1810        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1811        let chunk_offsets = buffer![0u64].into_array();
1812        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets));
1813
1814        let sliced1 = patches.slice(150..550).unwrap();
1815        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
1816
1817        let sliced2 = sliced1.slice(100..250).unwrap();
1818        assert_eq!(sliced2.num_patches(), 1); // 300
1819        assert_eq!(sliced2.offset(), 250);
1820
1821        assert_eq!(sliced2.search_index(50), SearchResult::Found(0)); // 300 - 250
1822        assert_eq!(sliced2.search_index(150), SearchResult::NotFound(1));
1823    }
1824
1825    #[test]
1826    fn test_index_larger_than_length() {
1827        let chunk_offsets = buffer![0u64].into_array();
1828        let indices = buffer![1023u64].into_array();
1829        let values = buffer![42i32].into_array();
1830        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets));
1831        assert_eq!(patches.search_index(2048), SearchResult::NotFound(1));
1832    }
1833}