Skip to main content

vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use num_traits::NumCast;
10use vortex_buffer::BitBuffer;
11use vortex_buffer::BufferMut;
12use vortex_error::VortexError;
13use vortex_error::VortexExpect as _;
14use vortex_error::VortexResult;
15use vortex_error::vortex_bail;
16use vortex_error::vortex_ensure;
17use vortex_error::vortex_err;
18use vortex_mask::AllOr;
19use vortex_mask::Mask;
20use vortex_utils::aliases::hash_map::HashMap;
21
22use crate::ArrayRef;
23use crate::ArraySlots;
24use crate::ExecutionCtx;
25use crate::IntoArray;
26use crate::LEGACY_SESSION;
27use crate::VortexSessionExecute;
28use crate::arrays::Primitive;
29use crate::arrays::PrimitiveArray;
30use crate::arrays::primitive::PrimitiveArrayExt;
31use crate::builtins::ArrayBuiltins;
32use crate::dtype::DType;
33use crate::dtype::IntegerPType;
34use crate::dtype::NativePType;
35use crate::dtype::Nullability;
36use crate::dtype::Nullability::NonNullable;
37use crate::dtype::PType;
38use crate::dtype::UnsignedPType;
39use crate::match_each_integer_ptype;
40use crate::match_each_unsigned_integer_ptype;
41use crate::scalar::PValue;
42use crate::scalar::Scalar;
43use crate::search_sorted::SearchResult;
44use crate::search_sorted::SearchSorted;
45use crate::search_sorted::SearchSortedSide;
46use crate::validity::Validity;
47
48/// One patch index offset is stored for each chunk.
49/// This allows for constant time patch index lookups.
50pub const PATCH_CHUNK_SIZE: usize = 1024;
51
52#[derive(Copy, Clone, prost::Message)]
53pub struct PatchesMetadata {
54    #[prost(uint64, tag = "1")]
55    len: u64,
56    #[prost(uint64, tag = "2")]
57    offset: u64,
58    #[prost(enumeration = "PType", tag = "3")]
59    indices_ptype: i32,
60    #[prost(uint64, optional, tag = "4")]
61    chunk_offsets_len: Option<u64>,
62    #[prost(enumeration = "PType", optional, tag = "5")]
63    chunk_offsets_ptype: Option<i32>,
64    #[prost(uint64, optional, tag = "6")]
65    offset_within_chunk: Option<u64>,
66}
67
68impl PatchesMetadata {
69    #[inline]
70    pub fn new(
71        len: usize,
72        offset: usize,
73        indices_ptype: PType,
74        chunk_offsets_len: Option<usize>,
75        chunk_offsets_ptype: Option<PType>,
76        offset_within_chunk: Option<usize>,
77    ) -> Self {
78        Self {
79            len: len as u64,
80            offset: offset as u64,
81            indices_ptype: indices_ptype as i32,
82            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
83            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
84            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
85        }
86    }
87
88    #[inline]
89    pub fn len(&self) -> VortexResult<usize> {
90        usize::try_from(self.len).map_err(|_| vortex_err!("len does not fit in usize"))
91    }
92
93    #[inline]
94    pub fn is_empty(&self) -> bool {
95        self.len == 0
96    }
97
98    #[inline]
99    pub fn offset(&self) -> VortexResult<usize> {
100        usize::try_from(self.offset).map_err(|_| vortex_err!("offset does not fit in usize"))
101    }
102
103    #[inline]
104    pub fn chunk_offsets_dtype(&self) -> VortexResult<Option<DType>> {
105        self.chunk_offsets_ptype
106            .map(|t| {
107                PType::try_from(t)
108                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
109                    .map(|ptype| DType::Primitive(ptype, NonNullable))
110            })
111            .transpose()
112    }
113
114    #[inline]
115    pub fn indices_dtype(&self) -> VortexResult<DType> {
116        let ptype = PType::try_from(self.indices_ptype).map_err(|e| {
117            vortex_err!("invalid i32 value {} for PType: {}", self.indices_ptype, e)
118        })?;
119        vortex_ensure!(
120            ptype.is_unsigned_int(),
121            "Patch indices must be unsigned integers"
122        );
123        Ok(DType::Primitive(ptype, NonNullable))
124    }
125}
126
127/// Metadata stored in an array's data struct for reconstructing [`Patches`] from slots.
128///
129/// The actual patch arrays (indices, values, chunk_offsets) live in the array's
130/// slots. This struct stores only the scalar metadata needed to reassemble them.
131#[derive(Clone, Debug, Hash, PartialEq, Eq)]
132pub struct PatchesData {
133    offset: usize,
134    offset_within_chunk: Option<usize>,
135}
136
137/// Slot indices for the three patch components within a slot array.
138#[derive(Copy, Clone, Debug)]
139pub struct PatchSlotIndices {
140    pub indices: usize,
141    pub values: usize,
142    pub chunk_offsets: usize,
143}
144
145impl PatchesData {
146    /// Extract patch metadata from an existing [`Patches`].
147    pub fn from_patches(patches: &Patches) -> Self {
148        Self {
149            offset: patches.offset(),
150            offset_within_chunk: patches.offset_within_chunk(),
151        }
152    }
153
154    /// Reconstruct patches from the given slot positions.
155    ///
156    /// Returns `None` if `patches_data` is `None`.
157    /// Panics if `patches_data` is `Some` but the indices or values slots are missing.
158    pub fn patches_from_slots(
159        patches_data: Option<&Self>,
160        len: usize,
161        slots: &[Option<ArrayRef>],
162        slot_idx: PatchSlotIndices,
163    ) -> Option<Patches> {
164        let data = patches_data?;
165        let indices = slots[slot_idx.indices]
166            .as_ref()
167            .vortex_expect("patches_data is set but patch_indices slot is missing");
168        let values = slots[slot_idx.values]
169            .as_ref()
170            .vortex_expect("patches_data is set but patch_values slot is missing");
171        Some(unsafe {
172            Patches::new_unchecked(
173                len,
174                data.offset,
175                indices.clone(),
176                values.clone(),
177                slots[slot_idx.chunk_offsets].clone(),
178                data.offset_within_chunk,
179            )
180        })
181    }
182
183    /// Push 3 patch slots (indices, values, chunk_offsets) onto a slot vector.
184    ///
185    /// If `patches` is `None`, pushes three `None` entries.
186    pub fn push_slots(slots: &mut ArraySlots, patches: Option<&Patches>) {
187        match patches {
188            Some(p) => {
189                slots.push(Some(p.indices().clone()));
190                slots.push(Some(p.values().clone()));
191                slots.push(p.chunk_offsets().clone());
192            }
193            None => {
194                slots.push(None);
195                slots.push(None);
196                slots.push(None);
197            }
198        }
199    }
200
201    /// Returns the patch offset.
202    #[inline]
203    pub fn offset(&self) -> usize {
204        self.offset
205    }
206
207    /// Returns the offset within the first chunk, if chunk offsets are present.
208    #[inline]
209    pub fn offset_within_chunk(&self) -> Option<usize> {
210        self.offset_within_chunk
211    }
212}
213
214/// A helper for working with patched arrays.
215#[derive(Debug, Clone)]
216pub struct Patches {
217    array_len: usize,
218    offset: usize,
219    indices: ArrayRef,
220    values: ArrayRef,
221    /// Stores the patch index offset for each chunk.
222    ///
223    /// This allows us to lookup the patches for a given chunk in constant time via
224    /// `patch_indices[chunk_offsets[i]..chunk_offsets[i+1]]`.
225    ///
226    /// This is optional for compatibility reasons.
227    chunk_offsets: Option<ArrayRef>,
228    /// Chunk offsets are only sliced off in case the slice is fully
229    /// outside of the chunk range.
230    ///
231    /// Though the range for indices and values is sliced in terms of
232    /// individual elements, not chunks. To account for that we do a
233    /// saturating sub when adjusting the indices based on the chunk offset.
234    ///
235    /// `offset_within_chunk` is necessary in order to keep track of how many
236    /// elements were sliced off within the chunk.
237    offset_within_chunk: Option<usize>,
238}
239
240impl Patches {
241    pub fn new(
242        array_len: usize,
243        offset: usize,
244        indices: ArrayRef,
245        values: ArrayRef,
246        chunk_offsets: Option<ArrayRef>,
247    ) -> VortexResult<Self> {
248        vortex_ensure!(
249            indices.len() == values.len(),
250            "Patch indices and values must have the same length"
251        );
252        vortex_ensure!(
253            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
254            "Patch indices must be non-nullable unsigned integers, got {:?}",
255            indices.dtype()
256        );
257
258        vortex_ensure!(
259            indices.len() <= array_len,
260            "Patch indices must be shorter than the array length"
261        );
262        vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty");
263
264        // Perform validation of components when they are host-resident.
265        // This is not possible to do eagerly when the data is on GPU memory.
266        if indices.is_host() && values.is_host() {
267            let max = usize::try_from(&indices.execute_scalar(
268                indices.len() - 1,
269                &mut LEGACY_SESSION.create_execution_ctx(),
270            )?)
271            .map_err(|_| vortex_err!("indices must be a number"))?;
272            vortex_ensure!(
273                max - offset < array_len,
274                "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
275            );
276
277            #[cfg(debug_assertions)]
278            {
279                use crate::VortexSessionExecute;
280                use crate::aggregate_fn::fns::is_sorted::is_sorted;
281                let mut ctx = LEGACY_SESSION.create_execution_ctx();
282                assert!(
283                    is_sorted(&indices, &mut ctx).unwrap_or(false),
284                    "Patch indices must be sorted"
285                );
286            }
287        }
288
289        Ok(Self {
290            array_len,
291            offset,
292            indices,
293            values,
294            chunk_offsets: chunk_offsets.clone(),
295            // Initialize with `Some(0)` only if `chunk_offsets` are set.
296            offset_within_chunk: chunk_offsets.map(|_| 0),
297        })
298    }
299
300    /// Construct new patches without validating any of the arguments
301    ///
302    /// # Safety
303    ///
304    /// Users have to assert that
305    /// * Indices and values have the same length
306    /// * Indices is an unsigned integer type
307    /// * Indices must be sorted
308    /// * Last value in indices is smaller than array_len
309    pub unsafe fn new_unchecked(
310        array_len: usize,
311        offset: usize,
312        indices: ArrayRef,
313        values: ArrayRef,
314        chunk_offsets: Option<ArrayRef>,
315        offset_within_chunk: Option<usize>,
316    ) -> Self {
317        Self {
318            array_len,
319            offset,
320            indices,
321            values,
322            chunk_offsets,
323            offset_within_chunk,
324        }
325    }
326
327    #[inline]
328    pub fn array_len(&self) -> usize {
329        self.array_len
330    }
331
332    #[inline]
333    pub fn num_patches(&self) -> usize {
334        self.indices.len()
335    }
336
337    #[inline]
338    pub fn dtype(&self) -> &DType {
339        self.values.dtype()
340    }
341
342    #[inline]
343    pub fn indices(&self) -> &ArrayRef {
344        &self.indices
345    }
346
347    #[inline]
348    pub fn into_indices(self) -> ArrayRef {
349        self.indices
350    }
351
352    #[inline]
353    pub fn indices_mut(&mut self) -> &mut ArrayRef {
354        &mut self.indices
355    }
356
357    #[inline]
358    pub fn values(&self) -> &ArrayRef {
359        &self.values
360    }
361
362    #[inline]
363    pub fn into_values(self) -> ArrayRef {
364        self.values
365    }
366
367    #[inline]
368    pub fn values_mut(&mut self) -> &mut ArrayRef {
369        &mut self.values
370    }
371
372    #[inline]
373    // Absolute offset: 0 if the array is unsliced.
374    pub fn offset(&self) -> usize {
375        self.offset
376    }
377
378    #[inline]
379    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
380        &self.chunk_offsets
381    }
382
383    #[inline]
384    pub fn chunk_offset_at(&self, idx: usize) -> VortexResult<usize> {
385        let Some(chunk_offsets) = &self.chunk_offsets else {
386            vortex_bail!("chunk_offsets must be set to retrieve offset at index")
387        };
388
389        chunk_offsets
390            .execute_scalar(idx, &mut LEGACY_SESSION.create_execution_ctx())?
391            .as_primitive()
392            .as_::<usize>()
393            .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))
394    }
395
396    /// Returns the number of patches sliced off from the current first chunk.
397    ///
398    /// When patches are sliced, the chunk offsets array is also sliced to only include
399    /// chunks that overlap with the slice range. However, the slice boundary may fall
400    /// in the middle of a chunk's patch range. This offset indicates how many patches
401    /// at the start of the first chunk should be skipped.
402    ///
403    /// Returns `None` if chunk offsets are not set.
404    #[inline]
405    pub fn offset_within_chunk(&self) -> Option<usize> {
406        self.offset_within_chunk
407    }
408
409    #[inline]
410    pub fn indices_ptype(&self) -> VortexResult<PType> {
411        PType::try_from(self.indices.dtype())
412            .map_err(|_| vortex_err!("indices dtype is not primitive"))
413    }
414
415    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
416        if self.indices.len() > len {
417            vortex_bail!(
418                "Patch indices {} are longer than the array length {}",
419                self.indices.len(),
420                len
421            );
422        }
423        if self.values.dtype() != dtype {
424            vortex_bail!(
425                "Patch values dtype {} does not match array dtype {}",
426                self.values.dtype(),
427                dtype
428            );
429        }
430        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
431        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
432
433        Ok(PatchesMetadata::new(
434            self.indices.len(),
435            self.offset,
436            self.indices.dtype().as_ptype(),
437            chunk_offsets_len,
438            chunk_offsets_ptype,
439            self.offset_within_chunk,
440        ))
441    }
442
443    /// Get the patched value at a given index if it exists.
444    pub fn get_patched(&self, index: usize) -> VortexResult<Option<Scalar>> {
445        self.search_index(index)?
446            .to_found()
447            .map(|patch_idx| {
448                self.values()
449                    .execute_scalar(patch_idx, &mut LEGACY_SESSION.create_execution_ctx())
450            })
451            .transpose()
452    }
453
454    /// Searches for `index` in the indices array.
455    ///
456    /// Chooses between chunked search when [`Self::chunk_offsets`] is
457    /// available, and binary search otherwise. The `index` parameter is
458    /// adjusted by [`Self::offset`] for both.
459    ///
460    /// # Arguments
461    /// * `index` - The index to search for
462    ///
463    /// # Returns
464    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
465    ///   position in the patches array
466    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
467    ///   a patch at this index would be inserted to maintain sorted order
468    ///
469    /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found
470    /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound
471    pub fn search_index(&self, index: usize) -> VortexResult<SearchResult> {
472        if self.chunk_offsets.is_some() {
473            return self.search_index_chunked(index);
474        }
475
476        Self::search_index_binary_search(&self.indices, index + self.offset)
477    }
478
479    /// Binary searches for `needle` in the indices array.
480    ///
481    /// # Returns
482    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
483    /// with the insertion point if not found.
484    fn search_index_binary_search(indices: &ArrayRef, needle: usize) -> VortexResult<SearchResult> {
485        if let Some(primitive) = indices.as_opt::<Primitive>() {
486            match_each_integer_ptype!(primitive.ptype(), |T| {
487                let Ok(needle) = T::try_from(needle) else {
488                    // If the needle is not of type T, then it cannot possibly be in this array.
489                    //
490                    // The needle is a non-negative integer (a usize); therefore, it must be larger
491                    // than all values in this array.
492                    return Ok(SearchResult::NotFound(primitive.len()));
493                };
494                return primitive
495                    .as_slice::<T>()
496                    .search_sorted(&needle, SearchSortedSide::Left);
497            });
498        }
499        indices
500            .as_primitive_typed()
501            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
502    }
503
504    /// Constant time searches for `index` in the indices array.
505    ///
506    /// First determines which chunk the target index falls into, then performs
507    /// a binary search within that chunk's range.
508    ///
509    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
510    /// or the insertion point if not found.
511    ///
512    /// Returns an error if `chunk_offsets` or `offset_within_chunk` are not set.
513    fn search_index_chunked(&self, index: usize) -> VortexResult<SearchResult> {
514        let Some(chunk_offsets) = &self.chunk_offsets else {
515            vortex_bail!("chunk_offsets is required to be set")
516        };
517
518        let Some(offset_within_chunk) = self.offset_within_chunk else {
519            vortex_bail!("offset_within_chunk is required to be set")
520        };
521
522        if index >= self.array_len() {
523            return Ok(SearchResult::NotFound(self.indices().len()));
524        }
525
526        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
527
528        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
529        let base_offset = self.chunk_offset_at(0)?;
530
531        let patches_start_idx = (self.chunk_offset_at(chunk_idx)? - base_offset)
532            // Chunk offsets are only sliced off in case the slice is fully
533            // outside of the chunk range.
534            //
535            // Though the range for indices and values is sliced in terms of
536            // individual elements, not chunks. To account for that we do a
537            // saturating sub when adjusting the indices based on the chunk offset.
538            .saturating_sub(offset_within_chunk);
539
540        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
541            (self.chunk_offset_at(chunk_idx + 1)? - base_offset)
542                .saturating_sub(offset_within_chunk)
543                .min(self.indices.len())
544        } else {
545            self.indices.len()
546        };
547
548        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx)?;
549        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset)?;
550
551        Ok(match result {
552            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
553            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
554        })
555    }
556
557    /// Batch version of `search_index`.
558    ///
559    /// In contrast to `search_index`, this function requires `indices` as
560    /// well as `chunk_offsets` to be passed as slices. This is to avoid
561    /// redundant canonicalization and `scalar_at` lookups across calls.
562    fn search_index_chunked_batch<T, O>(
563        &self,
564        indices: &[T],
565        chunk_offsets: &[O],
566        index: T,
567    ) -> VortexResult<SearchResult>
568    where
569        T: UnsignedPType,
570        O: UnsignedPType,
571        usize: TryFrom<T>,
572        usize: TryFrom<O>,
573    {
574        let Some(offset_within_chunk) = self.offset_within_chunk else {
575            vortex_bail!("offset_within_chunk is required to be set")
576        };
577
578        let chunk_idx = {
579            let Ok(index) = usize::try_from(index) else {
580                // If the needle cannot be converted to usize, it's larger than all values in this array.
581                return Ok(SearchResult::NotFound(indices.len()));
582            };
583
584            if index >= self.array_len() {
585                return Ok(SearchResult::NotFound(self.indices().len()));
586            }
587
588            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
589        };
590
591        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
592        let chunk_offset = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0])
593            .map_err(|_| vortex_err!("chunk_offset failed to convert to usize"))?;
594
595        let patches_start_idx = chunk_offset
596            // Chunk offsets are only sliced off in case the slice is fully
597            // outside of the chunk range.
598            //
599            // Though the range for indices and values is sliced in terms of
600            // individual elements, not chunks. To account for that we do a
601            // saturating sub when adjusting the indices based on the chunk offset.
602            .saturating_sub(offset_within_chunk);
603
604        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
605            usize::try_from(chunk_offsets[chunk_idx + 1] - chunk_offsets[0])
606                .map_err(|_| vortex_err!("patches_end_idx failed to convert to usize"))?
607                .saturating_sub(offset_within_chunk)
608                .min(indices.len())
609        } else {
610            self.indices.len()
611        };
612
613        let Some(offset) = T::from(self.offset) else {
614            // If the offset cannot be converted to T, it's larger than all values in this array.
615            return Ok(SearchResult::NotFound(indices.len()));
616        };
617
618        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
619        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left)?;
620
621        Ok(match result {
622            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
623            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
624        })
625    }
626
627    /// Returns the minimum patch index
628    pub fn min_index(&self) -> VortexResult<usize> {
629        let first = self
630            .indices
631            .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?
632            .as_primitive()
633            .as_::<usize>()
634            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
635        Ok(first - self.offset)
636    }
637
638    /// Returns the maximum patch index
639    pub fn max_index(&self) -> VortexResult<usize> {
640        let last = self
641            .indices
642            .execute_scalar(
643                self.indices.len() - 1,
644                &mut LEGACY_SESSION.create_execution_ctx(),
645            )?
646            .as_primitive()
647            .as_::<usize>()
648            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
649        Ok(last - self.offset)
650    }
651
652    /// Filter the patches by a mask, resulting in new patches for the filtered array.
653    pub fn filter(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
654        if mask.len() != self.array_len {
655            vortex_bail!(
656                "Filter mask length {} does not match array length {}",
657                mask.len(),
658                self.array_len
659            );
660        }
661
662        match mask.indices() {
663            AllOr::All => Ok(Some(self.clone())),
664            AllOr::None => Ok(None),
665            AllOr::Some(mask_indices) => {
666                let flat_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
667                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
668                    filter_patches_with_mask(
669                        flat_indices.as_slice::<I>(),
670                        self.offset(),
671                        self.values(),
672                        mask_indices,
673                    )
674                })
675            }
676        }
677    }
678
679    /// Mask the patches, REMOVING the patches where the mask is true.
680    /// Unlike filter, this preserves the patch indices.
681    /// Unlike mask on a single array, this does not set masked values to null.
682    ///
683    /// Masking an array always yields a nullable result (see [`crate::scalar_fn::fns::mask`]),
684    /// so the surviving patch values are widened to nullable to match the masked parent. Callers
685    /// therefore receive patches with the correct dtype and do not need to re-cast them.
686    // TODO(joe): make this lazy and remove the ctx.
687    pub fn mask(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
688        if mask.len() != self.array_len {
689            vortex_bail!(
690                "Filter mask length {} does not match array length {}",
691                mask.len(),
692                self.array_len
693            );
694        }
695
696        let filter_mask = match mask.bit_buffer() {
697            AllOr::All => return Ok(None),
698            AllOr::None => return self.clone().into_nullable_values().map(Some),
699            AllOr::Some(masked) => {
700                let patch_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
701                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
702                    let patch_indices = patch_indices.as_slice::<P>();
703                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
704                        #[allow(clippy::cast_possible_truncation)]
705                        let idx = (patch_indices[i] as usize) - self.offset;
706                        !masked.value(idx)
707                    }))
708                })
709            }
710        };
711
712        if filter_mask.all_false() {
713            return Ok(None);
714        }
715
716        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
717        let filtered_indices = self.indices.filter(filter_mask.clone())?;
718        let filtered_values = self.values.filter(filter_mask)?;
719
720        Self {
721            array_len: self.array_len,
722            offset: self.offset,
723            indices: filtered_indices,
724            values: filtered_values,
725            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
726            chunk_offsets: None,
727            offset_within_chunk: self.offset_within_chunk,
728        }
729        .into_nullable_values()
730        .map(Some)
731    }
732
733    /// Widen the patch values to their nullable dtype, leaving indices and offsets untouched.
734    ///
735    /// The values stay logically unchanged (all currently-valid entries remain valid); only the
736    /// dtype's nullability flag is set. Used by [`Self::mask`], whose result must be nullable.
737    fn into_nullable_values(self) -> VortexResult<Self> {
738        if self.values.dtype().is_nullable() {
739            return Ok(self);
740        }
741        let nullable = self.values.dtype().as_nullable();
742        self.map_values(|values| values.cast(nullable))
743    }
744
745    /// Slice the patches by a range of the patched array.
746    pub fn slice(&self, range: Range<usize>) -> VortexResult<Option<Self>> {
747        let slice_start_idx = self.search_index(range.start)?.to_index();
748        let slice_end_idx = self.search_index(range.end)?.to_index();
749
750        if slice_start_idx == slice_end_idx {
751            return Ok(None);
752        }
753
754        let values = self.values().slice(slice_start_idx..slice_end_idx)?;
755        let indices = self.indices().slice(slice_start_idx..slice_end_idx)?;
756
757        let new_chunk_offsets = self
758            .chunk_offsets
759            .as_ref()
760            .map(|chunk_offsets| -> VortexResult<ArrayRef> {
761                let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
762                let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
763                let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
764                chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
765            })
766            .transpose()?;
767
768        let offset_within_chunk = new_chunk_offsets
769            .as_ref()
770            .map(|new_chunk_offsets| -> VortexResult<usize> {
771                let new_chunk_base = new_chunk_offsets
772                    .execute_scalar(0, &mut LEGACY_SESSION.create_execution_ctx())?
773                    .as_primitive()
774                    .as_::<usize>()
775                    .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))?;
776                let parent_chunk_base = self.chunk_offset_at(0)?;
777                let parent_within = self.offset_within_chunk.unwrap_or(0);
778                Ok(parent_chunk_base + parent_within + slice_start_idx - new_chunk_base)
779            })
780            .transpose()?;
781
782        Ok(Some(Self {
783            array_len: range.len(),
784            offset: range.start + self.offset(),
785            indices,
786            values,
787            chunk_offsets: new_chunk_offsets,
788            offset_within_chunk,
789        }))
790    }
791
792    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
793    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
794
795    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
796        (self.num_patches() as f64 / take_indices.len() as f64)
797            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
798    }
799
800    /// Take the indices from the patches
801    ///
802    /// Any nulls in take_indices are added to the resulting patches.
803    pub fn take_with_nulls(
804        &self,
805        take_indices: &ArrayRef,
806        ctx: &mut ExecutionCtx,
807    ) -> VortexResult<Option<Self>> {
808        if take_indices.is_empty() {
809            return Ok(None);
810        }
811
812        let take_indices = take_indices.clone().execute::<PrimitiveArray>(ctx)?;
813        if self.is_map_faster_than_search(&take_indices) {
814            self.take_map(take_indices, true, ctx)
815        } else {
816            self.take_search(take_indices, true, ctx)
817        }
818    }
819
820    /// Take the indices from the patches.
821    ///
822    /// Any nulls in take_indices are ignored.
823    pub fn take(
824        &self,
825        take_indices: &ArrayRef,
826        ctx: &mut ExecutionCtx,
827    ) -> VortexResult<Option<Self>> {
828        if take_indices.is_empty() {
829            return Ok(None);
830        }
831
832        let take_indices = take_indices.clone().execute::<PrimitiveArray>(ctx)?;
833        if self.is_map_faster_than_search(&take_indices) {
834            self.take_map(take_indices, false, ctx)
835        } else {
836            self.take_search(take_indices, false, ctx)
837        }
838    }
839
840    #[expect(
841        clippy::cognitive_complexity,
842        reason = "complexity is from nested match_each_* macros"
843    )]
844    pub fn take_search(
845        &self,
846        take_indices: PrimitiveArray,
847        include_nulls: bool,
848        ctx: &mut ExecutionCtx,
849    ) -> VortexResult<Option<Self>> {
850        let take_indices_validity = take_indices.validity()?;
851        // Take indices are non-negative; reinterpret to unsigned so the `TakeT` dimension
852        // dispatches over 4 widths instead of 8.
853        let take_indices_unsigned =
854            take_indices.reinterpret_cast(take_indices.ptype().to_unsigned());
855        let patch_indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
856        let chunk_offsets = self
857            .chunk_offsets()
858            .as_ref()
859            .map(|co| co.clone().execute::<PrimitiveArray>(ctx))
860            .transpose()?;
861
862        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
863            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
864                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
865                match_each_unsigned_integer_ptype!(take_indices_unsigned.ptype(), |TakeT| {
866                    let take_slice = take_indices_unsigned.as_slice::<TakeT>();
867
868                    if let Some(chunk_offsets) = chunk_offsets {
869                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
870                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
871                            take_indices_with_search_fn(
872                                patch_indices_slice,
873                                take_slice,
874                                take_indices
875                                    .as_ref()
876                                    .validity()?
877                                    .execute_mask(take_indices.as_ref().len(), ctx)?,
878                                include_nulls,
879                                |take_idx| {
880                                    self.search_index_chunked_batch(
881                                        patch_indices_slice,
882                                        chunk_offsets,
883                                        take_idx,
884                                    )
885                                },
886                            )?
887                        })
888                    } else {
889                        take_indices_with_search_fn(
890                            patch_indices_slice,
891                            take_slice,
892                            take_indices
893                                .as_ref()
894                                .validity()?
895                                .execute_mask(take_indices.as_ref().len(), ctx)?,
896                            include_nulls,
897                            |take_idx| {
898                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
899                                    // If the offset cannot be converted to T, it's larger than all values in this array.
900                                    return Ok(SearchResult::NotFound(patch_indices_slice.len()));
901                                };
902
903                                patch_indices_slice
904                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
905                            },
906                        )?
907                    }
908                })
909            });
910
911        if new_indices.is_empty() {
912            return Ok(None);
913        }
914
915        let new_indices = new_indices.into_array();
916        let new_array_len = take_indices.len();
917        let values_validity = take_indices_validity.take(&new_indices)?;
918
919        Ok(Some(Self {
920            array_len: new_array_len,
921            offset: 0,
922            indices: new_indices,
923            values: self
924                .values()
925                .take(PrimitiveArray::new(values_indices, values_validity).into_array())?,
926            chunk_offsets: None,
927            offset_within_chunk: Some(0), // Reset when creating new Patches.
928        }))
929    }
930
931    pub fn take_map(
932        &self,
933        take_indices: PrimitiveArray,
934        include_nulls: bool,
935        ctx: &mut ExecutionCtx,
936    ) -> VortexResult<Option<Self>> {
937        let indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
938        let new_length = take_indices.len();
939        // Take indices are non-negative; reinterpret to unsigned (4 widths instead of 8).
940        let take_indices_unsigned =
941            take_indices.reinterpret_cast(take_indices.ptype().to_unsigned());
942
943        let min_index = self.min_index()?;
944        let max_index = self.max_index()?;
945
946        let Some((new_sparse_indices, value_indices)) =
947            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
948                match_each_unsigned_integer_ptype!(take_indices_unsigned.ptype(), |TakeIndices| {
949                    let take_validity = take_indices
950                        .validity()?
951                        .execute_mask(take_indices.len(), ctx)?;
952                    let take_nullability = take_indices.validity()?.nullability();
953                    let take_slice = take_indices_unsigned.as_slice::<TakeIndices>();
954                    take_map::<_, TakeIndices>(
955                        indices.as_slice::<Indices>(),
956                        take_slice,
957                        take_validity,
958                        take_nullability,
959                        self.offset(),
960                        min_index,
961                        max_index,
962                        include_nulls,
963                    )?
964                })
965            })
966        else {
967            return Ok(None);
968        };
969
970        let taken_values = self.values().take(value_indices)?;
971
972        Ok(Some(Patches {
973            array_len: new_length,
974            offset: 0,
975            indices: new_sparse_indices,
976            values: taken_values,
977            // TODO(0ax1): Chunk offsets are invalid after take is applied.
978            chunk_offsets: None,
979            offset_within_chunk: self.offset_within_chunk,
980        }))
981    }
982
983    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
984    where
985        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
986    {
987        let values = f(self.values)?;
988        if self.indices.len() != values.len() {
989            vortex_bail!(
990                "map_values must preserve length: expected {} received {}",
991                self.indices.len(),
992                values.len()
993            )
994        }
995
996        Ok(Self {
997            array_len: self.array_len,
998            offset: self.offset,
999            indices: self.indices,
1000            values,
1001            chunk_offsets: self.chunk_offsets,
1002            offset_within_chunk: self.offset_within_chunk,
1003        })
1004    }
1005}
1006
1007#[expect(clippy::too_many_arguments)] // private function, can clean up one day
1008fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
1009    indices: &[I],
1010    take_indices: &[T],
1011    take_validity: Mask,
1012    take_nullability: Nullability,
1013    indices_offset: usize,
1014    min_index: usize,
1015    max_index: usize,
1016    include_nulls: bool,
1017) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
1018where
1019    usize: TryFrom<T>,
1020    VortexError: From<<I as TryFrom<usize>>::Error>,
1021{
1022    let offset_i = I::try_from(indices_offset)?;
1023
1024    let sparse_index_to_value_index: HashMap<I, usize> = indices
1025        .iter()
1026        .copied()
1027        .map(|idx| idx - offset_i)
1028        .enumerate()
1029        .map(|(value_index, sparse_index)| (sparse_index, value_index))
1030        .collect();
1031
1032    let mut new_sparse_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1033    let mut value_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1034
1035    for (idx_in_take, &take_idx) in take_indices.iter().enumerate() {
1036        let ti = usize::try_from(take_idx)
1037            .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
1038
1039        // If we have to take nulls the take index doesn't matter, make it 0 for consistency
1040        let is_null = match take_validity.bit_buffer() {
1041            AllOr::All => false,
1042            AllOr::None => true,
1043            AllOr::Some(buf) => !buf.value(idx_in_take),
1044        };
1045        if is_null {
1046            if include_nulls {
1047                new_sparse_indices.push(idx_in_take as u64);
1048                value_indices.push(0);
1049            }
1050        } else if ti >= min_index && ti <= max_index {
1051            let ti_as_i = I::try_from(ti)
1052                .map_err(|_| vortex_err!("take index does not fit in index type"))?;
1053            if let Some(&value_index) = sparse_index_to_value_index.get(&ti_as_i) {
1054                new_sparse_indices.push(idx_in_take as u64);
1055                value_indices.push(value_index as u64);
1056            }
1057        }
1058    }
1059
1060    if new_sparse_indices.is_empty() {
1061        return Ok(None);
1062    }
1063
1064    let new_sparse_indices = new_sparse_indices.into_array();
1065    let values_validity =
1066        Validity::from_mask(take_validity, take_nullability).take(&new_sparse_indices)?;
1067    Ok(Some((
1068        new_sparse_indices,
1069        PrimitiveArray::new(value_indices, values_validity).into_array(),
1070    )))
1071}
1072
1073/// Filter patches with the provided mask (in flattened space).
1074///
1075/// The filter mask may contain indices that are non-patched. The return value of this function
1076/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
1077/// patch values.
1078fn filter_patches_with_mask<T: IntegerPType>(
1079    patch_indices: &[T],
1080    offset: usize,
1081    patch_values: &ArrayRef,
1082    mask_indices: &[usize],
1083) -> VortexResult<Option<Patches>> {
1084    let true_count = mask_indices.len();
1085    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
1086    let mut new_mask_indices = Vec::with_capacity(true_count);
1087
1088    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
1089    // the patches are relatively sparse compared to the overall mask, and so many indices in the
1090    // mask will end up being skipped.
1091    const STRIDE: usize = 4;
1092
1093    let mut mask_idx = 0usize;
1094    let mut true_idx = 0usize;
1095
1096    while mask_idx < patch_indices.len() && true_idx < true_count {
1097        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
1098        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
1099        //  the mask (which covers both patch and non-patch values of the parent array), and so to
1100        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
1101        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
1102        //  fallback to performing a two-way iterator merge.
1103        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
1104            // Load a vector of each into our registers.
1105            let left_min = patch_indices[mask_idx]
1106                .to_usize()
1107                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1108                - offset;
1109            let left_max = patch_indices[mask_idx + STRIDE]
1110                .to_usize()
1111                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1112                - offset;
1113            let right_min = mask_indices[true_idx];
1114            let right_max = mask_indices[true_idx + STRIDE];
1115
1116            if left_min > right_max {
1117                // Advance right side
1118                true_idx += STRIDE;
1119                continue;
1120            } else if right_min > left_max {
1121                mask_idx += STRIDE;
1122                continue;
1123            } else {
1124                // Fallthrough to direct comparison path.
1125            }
1126        }
1127
1128        // Two-way sorted iterator merge:
1129
1130        let left = patch_indices[mask_idx]
1131            .to_usize()
1132            .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1133            - offset;
1134        let right = mask_indices[true_idx];
1135
1136        match left.cmp(&right) {
1137            Ordering::Less => {
1138                mask_idx += 1;
1139            }
1140            Ordering::Greater => {
1141                true_idx += 1;
1142            }
1143            Ordering::Equal => {
1144                // Save the mask index as well as the positional index.
1145                new_mask_indices.push(mask_idx);
1146                new_patch_indices.push(true_idx as u64);
1147
1148                mask_idx += 1;
1149                true_idx += 1;
1150            }
1151        }
1152    }
1153
1154    if new_mask_indices.is_empty() {
1155        return Ok(None);
1156    }
1157
1158    let new_patch_indices = new_patch_indices.into_array();
1159    let new_patch_values =
1160        patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?;
1161
1162    Ok(Some(Patches::new(
1163        true_count,
1164        0,
1165        new_patch_indices,
1166        new_patch_values,
1167        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
1168        None,
1169    )?))
1170}
1171
1172fn take_indices_with_search_fn<
1173    I: UnsignedPType,
1174    T: IntegerPType,
1175    F: Fn(I) -> VortexResult<SearchResult>,
1176>(
1177    indices: &[I],
1178    take_indices: &[T],
1179    take_validity: Mask,
1180    include_nulls: bool,
1181    search_fn: F,
1182) -> VortexResult<(BufferMut<u64>, BufferMut<u64>)> {
1183    let mut values_indices = BufferMut::with_capacity(take_indices.len());
1184    let mut new_indices = BufferMut::with_capacity(take_indices.len());
1185
1186    for (new_patch_idx, &take_idx) in take_indices.iter().enumerate() {
1187        if !take_validity.value(new_patch_idx) {
1188            if include_nulls {
1189                // For nulls, patch index doesn't matter - use 0 for consistency
1190                values_indices.push(0u64);
1191                new_indices.push(new_patch_idx as u64);
1192            }
1193            continue;
1194        } else {
1195            let search_result = match I::from(take_idx) {
1196                Some(idx) => search_fn(idx)?,
1197                None => SearchResult::NotFound(indices.len()),
1198            };
1199
1200            if let Some(patch_idx) = search_result.to_found() {
1201                values_indices.push(patch_idx as u64);
1202                new_indices.push(new_patch_idx as u64);
1203            }
1204        }
1205    }
1206
1207    Ok((values_indices, new_indices))
1208}
1209
1210#[cfg(test)]
1211mod test {
1212    use vortex_buffer::BufferMut;
1213    use vortex_buffer::buffer;
1214    use vortex_mask::Mask;
1215
1216    use crate::IntoArray;
1217    #[expect(deprecated)]
1218    use crate::ToCanonical as _;
1219    use crate::VortexSessionExecute;
1220    use crate::array_session;
1221    use crate::assert_arrays_eq;
1222    use crate::patches::Patches;
1223    use crate::patches::PrimitiveArray;
1224    use crate::search_sorted::SearchResult;
1225    use crate::validity::Validity;
1226
1227    #[test]
1228    fn test_filter() {
1229        let mut ctx = array_session().create_execution_ctx();
1230        let patches = Patches::new(
1231            100,
1232            0,
1233            buffer![10u32, 11, 20].into_array(),
1234            buffer![100, 110, 200].into_array(),
1235            None,
1236        )
1237        .unwrap();
1238
1239        let filtered = patches
1240            .filter(&Mask::from_indices(100, vec![10, 20, 30]), &mut ctx)
1241            .unwrap()
1242            .unwrap();
1243
1244        assert_arrays_eq!(
1245            filtered.indices(),
1246            PrimitiveArray::from_iter([0u64, 1]),
1247            &mut ctx
1248        );
1249        assert_arrays_eq!(
1250            filtered.values(),
1251            PrimitiveArray::from_iter([100i32, 200]),
1252            &mut ctx
1253        );
1254    }
1255
1256    #[test]
1257    fn take_with_nulls() {
1258        let mut ctx = array_session().create_execution_ctx();
1259        let patches = Patches::new(
1260            20,
1261            0,
1262            buffer![2u64, 9, 15].into_array(),
1263            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1264            None,
1265        )
1266        .unwrap();
1267
1268        let taken = patches
1269            .take(
1270                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1271                    .into_array(),
1272                &mut ctx,
1273            )
1274            .unwrap()
1275            .unwrap();
1276        #[expect(deprecated)]
1277        let primitive_values = taken.values().to_primitive();
1278        #[expect(deprecated)]
1279        let primitive_indices = taken.indices().to_primitive();
1280        assert_eq!(taken.array_len(), 2);
1281        assert_arrays_eq!(
1282            primitive_values,
1283            PrimitiveArray::from_option_iter([Some(44i32)]),
1284            &mut ctx
1285        );
1286        assert_arrays_eq!(
1287            primitive_indices,
1288            PrimitiveArray::from_iter([0u64]),
1289            &mut ctx
1290        );
1291        assert_eq!(
1292            primitive_values
1293                .as_ref()
1294                .validity()
1295                .unwrap()
1296                .execute_mask(primitive_values.as_ref().len(), &mut ctx)
1297                .unwrap(),
1298            Mask::from_iter(vec![true])
1299        );
1300    }
1301
1302    #[test]
1303    fn take_search_with_nulls_chunked() {
1304        let mut ctx = array_session().create_execution_ctx();
1305        let patches = Patches::new(
1306            20,
1307            0,
1308            buffer![2u64, 9, 15].into_array(),
1309            buffer![33_i32, 44, 55].into_array(),
1310            Some(buffer![0u64].into_array()),
1311        )
1312        .unwrap();
1313
1314        let taken = patches
1315            .take_search(
1316                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1317                true,
1318                &mut ctx,
1319            )
1320            .unwrap()
1321            .unwrap();
1322
1323        #[expect(deprecated)]
1324        let primitive_values = taken.values().to_primitive();
1325        assert_eq!(taken.array_len(), 2);
1326        assert_arrays_eq!(
1327            primitive_values,
1328            PrimitiveArray::from_option_iter([Some(44i32), None]),
1329            &mut ctx
1330        );
1331        assert_arrays_eq!(
1332            taken.indices(),
1333            PrimitiveArray::from_iter([0u64, 1]),
1334            &mut ctx
1335        );
1336
1337        assert_eq!(
1338            primitive_values
1339                .as_ref()
1340                .validity()
1341                .unwrap()
1342                .execute_mask(primitive_values.as_ref().len(), &mut ctx)
1343                .unwrap(),
1344            Mask::from_iter([true, false])
1345        );
1346    }
1347
1348    #[test]
1349    fn take_search_chunked_multiple_chunks() {
1350        let mut ctx = array_session().create_execution_ctx();
1351        let patches = Patches::new(
1352            2048,
1353            0,
1354            buffer![100u64, 500, 1200, 1800].into_array(),
1355            buffer![10_i32, 20, 30, 40].into_array(),
1356            Some(buffer![0u64, 2].into_array()),
1357        )
1358        .unwrap();
1359
1360        let taken = patches
1361            .take_search(
1362                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1363                true,
1364                &mut ctx,
1365            )
1366            .unwrap()
1367            .unwrap();
1368
1369        assert_eq!(taken.array_len(), 3);
1370        assert_arrays_eq!(
1371            taken.values(),
1372            PrimitiveArray::from_option_iter([Some(20i32), Some(30)]),
1373            &mut ctx
1374        );
1375    }
1376
1377    #[test]
1378    fn take_search_chunked_indices_with_no_patches() {
1379        let mut ctx = array_session().create_execution_ctx();
1380        let patches = Patches::new(
1381            20,
1382            0,
1383            buffer![2u64, 9, 15].into_array(),
1384            buffer![33_i32, 44, 55].into_array(),
1385            Some(buffer![0u64].into_array()),
1386        )
1387        .unwrap();
1388
1389        let taken = patches
1390            .take_search(
1391                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1392                true,
1393                &mut ctx,
1394            )
1395            .unwrap();
1396
1397        assert!(taken.is_none());
1398    }
1399
1400    #[test]
1401    fn take_search_chunked_interleaved() {
1402        let mut ctx = array_session().create_execution_ctx();
1403        let patches = Patches::new(
1404            30,
1405            0,
1406            buffer![5u64, 10, 20, 25].into_array(),
1407            buffer![100_i32, 200, 300, 400].into_array(),
1408            Some(buffer![0u64].into_array()),
1409        )
1410        .unwrap();
1411
1412        let taken = patches
1413            .take_search(
1414                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1415                true,
1416                &mut ctx,
1417            )
1418            .unwrap()
1419            .unwrap();
1420
1421        assert_eq!(taken.array_len(), 4);
1422        assert_arrays_eq!(
1423            taken.values(),
1424            PrimitiveArray::from_option_iter([Some(200i32), Some(300)]),
1425            &mut ctx
1426        );
1427    }
1428
1429    #[test]
1430    fn test_take_search_multiple_chunk_offsets() {
1431        let mut ctx = array_session().create_execution_ctx();
1432        let patches = Patches::new(
1433            1500,
1434            0,
1435            BufferMut::from_iter(0..1500u64).into_array(),
1436            BufferMut::from_iter(0..1500i32).into_array(),
1437            Some(buffer![0u64, 1024u64].into_array()),
1438        )
1439        .unwrap();
1440
1441        let taken = patches
1442            .take_search(
1443                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1444                false,
1445                &mut ctx,
1446            )
1447            .unwrap()
1448            .unwrap();
1449
1450        assert_eq!(taken.array_len(), 1500);
1451    }
1452
1453    #[test]
1454    fn test_slice() {
1455        let mut ctx = array_session().create_execution_ctx();
1456        let values = buffer![15_u32, 135, 13531, 42].into_array();
1457        let indices = buffer![10_u64, 11, 50, 100].into_array();
1458
1459        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1460
1461        let sliced = patches.slice(15..100).unwrap().unwrap();
1462        assert_eq!(sliced.array_len(), 100 - 15);
1463        assert_arrays_eq!(
1464            sliced.values(),
1465            PrimitiveArray::from_iter([13531u32]),
1466            &mut ctx
1467        );
1468    }
1469
1470    #[test]
1471    fn doubly_sliced() {
1472        let mut ctx = array_session().create_execution_ctx();
1473        let values = buffer![15_u32, 135, 13531, 42].into_array();
1474        let indices = buffer![10_u64, 11, 50, 100].into_array();
1475
1476        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1477
1478        let sliced = patches.slice(15..100).unwrap().unwrap();
1479        assert_eq!(sliced.array_len(), 100 - 15);
1480        assert_arrays_eq!(
1481            sliced.values(),
1482            PrimitiveArray::from_iter([13531u32]),
1483            &mut ctx
1484        );
1485
1486        let doubly_sliced = sliced.slice(35..36).unwrap().unwrap();
1487        assert_arrays_eq!(
1488            doubly_sliced.values(),
1489            PrimitiveArray::from_iter([13531u32]),
1490            &mut ctx
1491        );
1492    }
1493
1494    #[test]
1495    fn test_mask_all_true() {
1496        let mut ctx = array_session().create_execution_ctx();
1497        let patches = Patches::new(
1498            10,
1499            0,
1500            buffer![2u64, 5, 8].into_array(),
1501            buffer![100i32, 200, 300].into_array(),
1502            None,
1503        )
1504        .unwrap();
1505
1506        let mask = Mask::new_true(10);
1507        let masked = patches.mask(&mask, &mut ctx).unwrap();
1508        assert!(masked.is_none());
1509    }
1510
1511    #[test]
1512    fn test_mask_all_false() {
1513        let mut ctx = array_session().create_execution_ctx();
1514        let patches = Patches::new(
1515            10,
1516            0,
1517            buffer![2u64, 5, 8].into_array(),
1518            buffer![100i32, 200, 300].into_array(),
1519            None,
1520        )
1521        .unwrap();
1522
1523        let mask = Mask::new_false(10);
1524        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1525
1526        // Masking widens the surviving (still valid) values to nullable.
1527        assert!(masked.values().dtype().is_nullable());
1528        assert_arrays_eq!(
1529            masked.values(),
1530            PrimitiveArray::from_option_iter([Some(100i32), Some(200), Some(300)]),
1531            &mut ctx
1532        );
1533        assert!(masked.values().is_valid(0, &mut ctx).unwrap());
1534        assert!(masked.values().is_valid(1, &mut ctx).unwrap());
1535        assert!(masked.values().is_valid(2, &mut ctx).unwrap());
1536
1537        // Indices should remain unchanged
1538        assert_arrays_eq!(
1539            masked.indices(),
1540            PrimitiveArray::from_iter([2u64, 5, 8]),
1541            &mut ctx
1542        );
1543    }
1544
1545    #[test]
1546    fn test_mask_partial() {
1547        let mut ctx = array_session().create_execution_ctx();
1548        let patches = Patches::new(
1549            10,
1550            0,
1551            buffer![2u64, 5, 8].into_array(),
1552            buffer![100i32, 200, 300].into_array(),
1553            None,
1554        )
1555        .unwrap();
1556
1557        // Mask that removes patches at indices 2 and 8 (but not 5)
1558        let mask = Mask::from_iter([
1559            false, false, true, false, false, false, false, false, true, false,
1560        ]);
1561        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1562
1563        // Only the patch at index 5 should remain
1564        assert_eq!(masked.values().len(), 1);
1565        assert_arrays_eq!(
1566            masked.values(),
1567            PrimitiveArray::from_option_iter([Some(200i32)]),
1568            &mut ctx
1569        );
1570
1571        // Only index 5 should remain
1572        assert_arrays_eq!(
1573            masked.indices(),
1574            PrimitiveArray::from_iter([5u64]),
1575            &mut ctx
1576        );
1577    }
1578
1579    #[test]
1580    fn test_mask_with_offset() {
1581        let mut ctx = array_session().create_execution_ctx();
1582        let patches = Patches::new(
1583            10,
1584            5,                                  // offset
1585            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1586            buffer![100i32, 200, 300].into_array(),
1587            None,
1588        )
1589        .unwrap();
1590
1591        // Mask that sets actual index 2 to null
1592        let mask = Mask::from_iter([
1593            false, false, true, false, false, false, false, false, false, false,
1594        ]);
1595
1596        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1597        assert_eq!(masked.array_len(), 10);
1598        assert_eq!(masked.offset(), 5);
1599        assert_arrays_eq!(
1600            masked.indices(),
1601            PrimitiveArray::from_iter([10u64, 13]),
1602            &mut ctx
1603        );
1604        assert_arrays_eq!(
1605            masked.values(),
1606            PrimitiveArray::from_option_iter([Some(200i32), Some(300)]),
1607            &mut ctx
1608        );
1609    }
1610
1611    #[test]
1612    fn test_mask_nullable_values() {
1613        let mut ctx = array_session().create_execution_ctx();
1614        let patches = Patches::new(
1615            10,
1616            0,
1617            buffer![2u64, 5, 8].into_array(),
1618            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1619            None,
1620        )
1621        .unwrap();
1622
1623        // Test masking removes patch at index 2
1624        let mask = Mask::from_iter([
1625            false, false, true, false, false, false, false, false, false, false,
1626        ]);
1627        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1628
1629        // Patches at indices 5 and 8 should remain
1630        assert_arrays_eq!(
1631            masked.indices(),
1632            PrimitiveArray::from_iter([5u64, 8]),
1633            &mut ctx
1634        );
1635
1636        // Values should be the null and 300
1637        #[expect(deprecated)]
1638        let masked_values = masked.values().to_primitive();
1639        assert_eq!(masked_values.len(), 2);
1640        assert!(!masked_values.is_valid(0, &mut ctx).unwrap()); // the null value at index 5
1641        assert!(masked_values.is_valid(1, &mut ctx).unwrap()); // the 300 value at index 8
1642        assert_eq!(
1643            i32::try_from(&masked_values.execute_scalar(1, &mut ctx).unwrap()).unwrap(),
1644            300i32
1645        );
1646    }
1647
1648    #[test]
1649    fn test_filter_keep_all() {
1650        let mut ctx = array_session().create_execution_ctx();
1651        let patches = Patches::new(
1652            10,
1653            0,
1654            buffer![2u64, 5, 8].into_array(),
1655            buffer![100i32, 200, 300].into_array(),
1656            None,
1657        )
1658        .unwrap();
1659
1660        // Keep all indices (mask with indices 0-9)
1661        let mask = Mask::from_indices(10, 0..10);
1662        let filtered = patches.filter(&mask, &mut ctx).unwrap().unwrap();
1663
1664        assert_arrays_eq!(
1665            filtered.indices(),
1666            PrimitiveArray::from_iter([2u64, 5, 8]),
1667            &mut ctx
1668        );
1669        assert_arrays_eq!(
1670            filtered.values(),
1671            PrimitiveArray::from_iter([100i32, 200, 300]),
1672            &mut ctx
1673        );
1674    }
1675
1676    #[test]
1677    fn test_filter_none() {
1678        let mut ctx = array_session().create_execution_ctx();
1679        let patches = Patches::new(
1680            10,
1681            0,
1682            buffer![2u64, 5, 8].into_array(),
1683            buffer![100i32, 200, 300].into_array(),
1684            None,
1685        )
1686        .unwrap();
1687
1688        // Filter out all (empty mask means keep nothing)
1689        let mask = Mask::from_indices(10, vec![]);
1690        let filtered = patches.filter(&mask, &mut ctx).unwrap();
1691        assert!(filtered.is_none());
1692    }
1693
1694    #[test]
1695    fn test_filter_with_indices() {
1696        let mut ctx = array_session().create_execution_ctx();
1697        let patches = Patches::new(
1698            10,
1699            0,
1700            buffer![2u64, 5, 8].into_array(),
1701            buffer![100i32, 200, 300].into_array(),
1702            None,
1703        )
1704        .unwrap();
1705
1706        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1707        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1708        let filtered = patches.filter(&mask, &mut ctx).unwrap().unwrap();
1709
1710        assert_arrays_eq!(
1711            filtered.indices(),
1712            PrimitiveArray::from_iter([0u64, 1]),
1713            &mut ctx
1714        ); // Adjusted indices
1715        assert_arrays_eq!(
1716            filtered.values(),
1717            PrimitiveArray::from_iter([100i32, 200]),
1718            &mut ctx
1719        );
1720    }
1721
1722    #[test]
1723    fn test_slice_full_range() {
1724        let mut ctx = array_session().create_execution_ctx();
1725        let patches = Patches::new(
1726            10,
1727            0,
1728            buffer![2u64, 5, 8].into_array(),
1729            buffer![100i32, 200, 300].into_array(),
1730            None,
1731        )
1732        .unwrap();
1733
1734        let sliced = patches.slice(0..10).unwrap().unwrap();
1735
1736        assert_arrays_eq!(
1737            sliced.indices(),
1738            PrimitiveArray::from_iter([2u64, 5, 8]),
1739            &mut ctx
1740        );
1741        assert_arrays_eq!(
1742            sliced.values(),
1743            PrimitiveArray::from_iter([100i32, 200, 300]),
1744            &mut ctx
1745        );
1746    }
1747
1748    #[test]
1749    fn test_slice_partial() {
1750        let mut ctx = array_session().create_execution_ctx();
1751        let patches = Patches::new(
1752            10,
1753            0,
1754            buffer![2u64, 5, 8].into_array(),
1755            buffer![100i32, 200, 300].into_array(),
1756            None,
1757        )
1758        .unwrap();
1759
1760        // Slice from 3 to 8 (includes patch at 5)
1761        let sliced = patches.slice(3..8).unwrap().unwrap();
1762
1763        assert_arrays_eq!(
1764            sliced.indices(),
1765            PrimitiveArray::from_iter([5u64]),
1766            &mut ctx
1767        ); // Index stays the same
1768        assert_arrays_eq!(
1769            sliced.values(),
1770            PrimitiveArray::from_iter([200i32]),
1771            &mut ctx
1772        );
1773        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1774        assert_eq!(sliced.offset(), 3); // New offset
1775    }
1776
1777    #[test]
1778    fn test_slice_no_patches() {
1779        let patches = Patches::new(
1780            10,
1781            0,
1782            buffer![2u64, 5, 8].into_array(),
1783            buffer![100i32, 200, 300].into_array(),
1784            None,
1785        )
1786        .unwrap();
1787
1788        // Slice from 6 to 7 (no patches in this range)
1789        let sliced = patches.slice(6..7).unwrap();
1790        assert!(sliced.is_none());
1791    }
1792
1793    #[test]
1794    fn test_slice_with_offset() {
1795        let mut ctx = array_session().create_execution_ctx();
1796        let patches = Patches::new(
1797            10,
1798            5,                                  // offset
1799            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1800            buffer![100i32, 200, 300].into_array(),
1801            None,
1802        )
1803        .unwrap();
1804
1805        // Slice from 3 to 8 (includes patch at actual index 5)
1806        let sliced = patches.slice(3..8).unwrap().unwrap();
1807
1808        assert_arrays_eq!(
1809            sliced.indices(),
1810            PrimitiveArray::from_iter([10u64]),
1811            &mut ctx
1812        ); // Index stays the same (offset + 5 = 10)
1813        assert_arrays_eq!(
1814            sliced.values(),
1815            PrimitiveArray::from_iter([200i32]),
1816            &mut ctx
1817        );
1818        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1819    }
1820
1821    #[test]
1822    fn test_patch_values() {
1823        let mut ctx = array_session().create_execution_ctx();
1824        let patches = Patches::new(
1825            10,
1826            0,
1827            buffer![2u64, 5, 8].into_array(),
1828            buffer![100i32, 200, 300].into_array(),
1829            None,
1830        )
1831        .unwrap();
1832
1833        #[expect(deprecated)]
1834        let values = patches.values().to_primitive();
1835        assert_eq!(
1836            i32::try_from(&values.execute_scalar(0, &mut ctx).unwrap()).unwrap(),
1837            100i32
1838        );
1839        assert_eq!(
1840            i32::try_from(&values.execute_scalar(1, &mut ctx).unwrap()).unwrap(),
1841            200i32
1842        );
1843        assert_eq!(
1844            i32::try_from(&values.execute_scalar(2, &mut ctx).unwrap()).unwrap(),
1845            300i32
1846        );
1847    }
1848
1849    #[test]
1850    fn test_indices_range() {
1851        let patches = Patches::new(
1852            10,
1853            0,
1854            buffer![2u64, 5, 8].into_array(),
1855            buffer![100i32, 200, 300].into_array(),
1856            None,
1857        )
1858        .unwrap();
1859
1860        assert_eq!(patches.min_index().unwrap(), 2);
1861        assert_eq!(patches.max_index().unwrap(), 8);
1862    }
1863
1864    #[test]
1865    fn test_search_index() {
1866        let patches = Patches::new(
1867            10,
1868            0,
1869            buffer![2u64, 5, 8].into_array(),
1870            buffer![100i32, 200, 300].into_array(),
1871            None,
1872        )
1873        .unwrap();
1874
1875        // Search for exact indices
1876        assert_eq!(patches.search_index(2).unwrap(), SearchResult::Found(0));
1877        assert_eq!(patches.search_index(5).unwrap(), SearchResult::Found(1));
1878        assert_eq!(patches.search_index(8).unwrap(), SearchResult::Found(2));
1879
1880        // Search for non-patch indices
1881        assert_eq!(patches.search_index(0).unwrap(), SearchResult::NotFound(0));
1882        assert_eq!(patches.search_index(3).unwrap(), SearchResult::NotFound(1));
1883        assert_eq!(patches.search_index(6).unwrap(), SearchResult::NotFound(2));
1884        assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3));
1885    }
1886
1887    #[test]
1888    fn test_mask_boundary_patches() {
1889        let mut ctx = array_session().create_execution_ctx();
1890        // Test masking patches at array boundaries
1891        let patches = Patches::new(
1892            10,
1893            0,
1894            buffer![0u64, 9].into_array(),
1895            buffer![100i32, 200].into_array(),
1896            None,
1897        )
1898        .unwrap();
1899
1900        let mask = Mask::from_iter([
1901            true, false, false, false, false, false, false, false, false, false,
1902        ]);
1903        let masked = patches.mask(&mask, &mut ctx).unwrap();
1904        assert!(masked.is_some());
1905        let masked = masked.unwrap();
1906        assert_arrays_eq!(
1907            masked.indices(),
1908            PrimitiveArray::from_iter([9u64]),
1909            &mut ctx
1910        );
1911        assert_arrays_eq!(
1912            masked.values(),
1913            PrimitiveArray::from_option_iter([Some(200i32)]),
1914            &mut ctx
1915        );
1916    }
1917
1918    #[test]
1919    fn test_mask_all_patches_removed() {
1920        let mut ctx = array_session().create_execution_ctx();
1921        // Test when all patches are masked out
1922        let patches = Patches::new(
1923            10,
1924            0,
1925            buffer![2u64, 5, 8].into_array(),
1926            buffer![100i32, 200, 300].into_array(),
1927            None,
1928        )
1929        .unwrap();
1930
1931        // Mask that removes all patches
1932        let mask = Mask::from_iter([
1933            false, false, true, false, false, true, false, false, true, false,
1934        ]);
1935        let masked = patches.mask(&mask, &mut ctx).unwrap();
1936        assert!(masked.is_none());
1937    }
1938
1939    #[test]
1940    fn test_mask_no_patches_removed() {
1941        let mut ctx = array_session().create_execution_ctx();
1942        // Test when no patches are masked
1943        let patches = Patches::new(
1944            10,
1945            0,
1946            buffer![2u64, 5, 8].into_array(),
1947            buffer![100i32, 200, 300].into_array(),
1948            None,
1949        )
1950        .unwrap();
1951
1952        // Mask that doesn't affect any patches
1953        let mask = Mask::from_iter([
1954            true, false, false, true, false, false, true, false, false, true,
1955        ]);
1956        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1957
1958        assert_arrays_eq!(
1959            masked.indices(),
1960            PrimitiveArray::from_iter([2u64, 5, 8]),
1961            &mut ctx
1962        );
1963        assert_arrays_eq!(
1964            masked.values(),
1965            PrimitiveArray::from_option_iter([Some(100i32), Some(200), Some(300)]),
1966            &mut ctx
1967        );
1968    }
1969
1970    #[test]
1971    fn test_mask_single_patch() {
1972        let mut ctx = array_session().create_execution_ctx();
1973        // Test with a single patch
1974        let patches = Patches::new(
1975            5,
1976            0,
1977            buffer![2u64].into_array(),
1978            buffer![42i32].into_array(),
1979            None,
1980        )
1981        .unwrap();
1982
1983        // Mask that removes the single patch
1984        let mask = Mask::from_iter([false, false, true, false, false]);
1985        let masked = patches.mask(&mask, &mut ctx).unwrap();
1986        assert!(masked.is_none());
1987
1988        // Mask that keeps the single patch
1989        let mask = Mask::from_iter([true, false, false, true, false]);
1990        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
1991        assert_arrays_eq!(
1992            masked.indices(),
1993            PrimitiveArray::from_iter([2u64]),
1994            &mut ctx
1995        );
1996    }
1997
1998    #[test]
1999    fn test_mask_contiguous_patches() {
2000        let mut ctx = array_session().create_execution_ctx();
2001        // Test with contiguous patches
2002        let patches = Patches::new(
2003            10,
2004            0,
2005            buffer![3u64, 4, 5, 6].into_array(),
2006            buffer![100i32, 200, 300, 400].into_array(),
2007            None,
2008        )
2009        .unwrap();
2010
2011        // Mask that removes middle patches
2012        let mask = Mask::from_iter([
2013            false, false, false, false, true, true, false, false, false, false,
2014        ]);
2015        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
2016
2017        assert_arrays_eq!(
2018            masked.indices(),
2019            PrimitiveArray::from_iter([3u64, 6]),
2020            &mut ctx
2021        );
2022        assert_arrays_eq!(
2023            masked.values(),
2024            PrimitiveArray::from_option_iter([Some(100i32), Some(400)]),
2025            &mut ctx
2026        );
2027    }
2028
2029    #[test]
2030    fn test_mask_with_large_offset() {
2031        let mut ctx = array_session().create_execution_ctx();
2032        // Test with a large offset that shifts all indices
2033        let patches = Patches::new(
2034            20,
2035            15,
2036            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
2037            buffer![100i32, 200, 300].into_array(),
2038            None,
2039        )
2040        .unwrap();
2041
2042        // Mask that removes the patch at actual index 2
2043        let mask = Mask::from_iter([
2044            false, false, true, false, false, false, false, false, false, false, false, false,
2045            false, false, false, false, false, false, false, false,
2046        ]);
2047        let masked = patches.mask(&mask, &mut ctx).unwrap().unwrap();
2048
2049        assert_arrays_eq!(
2050            masked.indices(),
2051            PrimitiveArray::from_iter([16u64, 19]),
2052            &mut ctx
2053        );
2054        assert_arrays_eq!(
2055            masked.values(),
2056            PrimitiveArray::from_option_iter([Some(100i32), Some(300)]),
2057            &mut ctx
2058        );
2059    }
2060
2061    #[test]
2062    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
2063    fn test_mask_wrong_length() {
2064        let mut ctx = array_session().create_execution_ctx();
2065        let patches = Patches::new(
2066            10,
2067            0,
2068            buffer![2u64, 5, 8].into_array(),
2069            buffer![100i32, 200, 300].into_array(),
2070            None,
2071        )
2072        .unwrap();
2073
2074        // Mask with wrong length
2075        let mask = Mask::from_iter([false, false, true, false, false]);
2076        patches.mask(&mask, &mut ctx).unwrap();
2077    }
2078
2079    #[test]
2080    fn test_chunk_offsets_search() {
2081        let indices = buffer![100u64, 200, 3000, 3100].into_array();
2082        let values = buffer![10i32, 20, 30, 40].into_array();
2083        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
2084        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets)).unwrap();
2085
2086        assert!(patches.chunk_offsets.is_some());
2087
2088        // chunk 0: patches at 100, 200
2089        assert_eq!(patches.search_index(100).unwrap(), SearchResult::Found(0));
2090        assert_eq!(patches.search_index(200).unwrap(), SearchResult::Found(1));
2091
2092        // chunks 1, 2: no patches
2093        assert_eq!(
2094            patches.search_index(1500).unwrap(),
2095            SearchResult::NotFound(2)
2096        );
2097        assert_eq!(
2098            patches.search_index(2000).unwrap(),
2099            SearchResult::NotFound(2)
2100        );
2101
2102        // chunk 3: patches at 3000, 3100
2103        assert_eq!(patches.search_index(3000).unwrap(), SearchResult::Found(2));
2104        assert_eq!(patches.search_index(3100).unwrap(), SearchResult::Found(3));
2105
2106        assert_eq!(
2107            patches.search_index(1024).unwrap(),
2108            SearchResult::NotFound(2)
2109        );
2110    }
2111
2112    #[test]
2113    fn test_chunk_offsets_with_slice() {
2114        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2115        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2116        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2117        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2118
2119        let sliced = patches.slice(1000..2200).unwrap().unwrap();
2120
2121        assert!(sliced.chunk_offsets.is_some());
2122        assert_eq!(sliced.offset(), 1000);
2123
2124        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(0));
2125        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2126        assert_eq!(sliced.search_index(1100).unwrap(), SearchResult::Found(4));
2127
2128        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(1));
2129    }
2130
2131    #[test]
2132    fn test_chunk_offsets_with_slice_after_first_chunk() {
2133        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2134        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2135        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2136        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2137
2138        let sliced = patches.slice(1300..2200).unwrap().unwrap();
2139
2140        assert!(sliced.chunk_offsets.is_some());
2141        assert_eq!(sliced.offset(), 1300);
2142
2143        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2144        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(1));
2145        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2146        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(2));
2147        assert_eq!(sliced.search_index(900).unwrap(), SearchResult::NotFound(4));
2148    }
2149
2150    #[test]
2151    fn test_chunk_offsets_slice_empty_result() {
2152        let indices = buffer![100u64, 200, 3000, 3100].into_array();
2153        let values = buffer![10i32, 20, 30, 40].into_array();
2154        let chunk_offsets = buffer![0u64, 2].into_array();
2155        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets)).unwrap();
2156
2157        let sliced = patches.slice(1000..2000).unwrap();
2158        assert!(sliced.is_none());
2159    }
2160
2161    #[test]
2162    fn test_chunk_offsets_slice_single_patch() {
2163        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
2164        let values = buffer![10i32, 20, 30, 40].into_array();
2165        let chunk_offsets = buffer![0u64, 1, 3].into_array();
2166        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2167
2168        let sliced = patches.slice(1100..1250).unwrap().unwrap();
2169
2170        assert_eq!(sliced.num_patches(), 1);
2171        assert_eq!(sliced.offset(), 1100);
2172        assert_eq!(sliced.search_index(100).unwrap(), SearchResult::Found(0)); // 1200 - 1100 = 100
2173        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::NotFound(0));
2174        assert_eq!(sliced.search_index(150).unwrap(), SearchResult::NotFound(1));
2175    }
2176
2177    #[test]
2178    fn test_chunk_offsets_slice_across_chunks() {
2179        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
2180        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2181        let chunk_offsets = buffer![0u64, 2, 4].into_array();
2182        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2183
2184        let sliced = patches.slice(150..2150).unwrap().unwrap();
2185
2186        assert_eq!(sliced.num_patches(), 4);
2187        assert_eq!(sliced.offset(), 150);
2188
2189        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::Found(0)); // 200 - 150 = 50
2190        assert_eq!(sliced.search_index(950).unwrap(), SearchResult::Found(1)); // 1100 - 150 = 950
2191        assert_eq!(sliced.search_index(1050).unwrap(), SearchResult::Found(2)); // 1200 - 150 = 1050
2192        assert_eq!(sliced.search_index(1950).unwrap(), SearchResult::Found(3)); // 2100 - 150 = 1950
2193    }
2194
2195    #[test]
2196    fn test_chunk_offsets_boundary_searches() {
2197        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
2198        let values = buffer![10i32, 20, 30, 40, 50].into_array();
2199        let chunk_offsets = buffer![0u64, 1, 4].into_array();
2200        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2201
2202        assert_eq!(patches.search_index(1023).unwrap(), SearchResult::Found(0));
2203        assert_eq!(patches.search_index(1024).unwrap(), SearchResult::Found(1));
2204        assert_eq!(patches.search_index(1025).unwrap(), SearchResult::Found(2));
2205        assert_eq!(patches.search_index(2047).unwrap(), SearchResult::Found(3));
2206        assert_eq!(patches.search_index(2048).unwrap(), SearchResult::Found(4));
2207
2208        assert_eq!(
2209            patches.search_index(1022).unwrap(),
2210            SearchResult::NotFound(0)
2211        );
2212        assert_eq!(
2213            patches.search_index(2046).unwrap(),
2214            SearchResult::NotFound(3)
2215        );
2216    }
2217
2218    #[test]
2219    fn test_chunk_offsets_slice_edge_cases() {
2220        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
2221        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2222        let chunk_offsets = buffer![0u64, 3, 5].into_array();
2223        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2224
2225        // Slice at the very beginning
2226        let sliced = patches.slice(0..10).unwrap().unwrap();
2227        assert_eq!(sliced.num_patches(), 2);
2228        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2229        assert_eq!(sliced.search_index(1).unwrap(), SearchResult::Found(1));
2230
2231        // Slice at the very end
2232        let sliced = patches.slice(2040..3000).unwrap().unwrap();
2233        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
2234        assert_eq!(sliced.search_index(7).unwrap(), SearchResult::Found(0)); // 2047 - 2040
2235        assert_eq!(sliced.search_index(8).unwrap(), SearchResult::Found(1)); // 2048 - 2040
2236    }
2237
2238    #[test]
2239    fn test_chunk_offsets_slice_nested() {
2240        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
2241        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2242        let chunk_offsets = buffer![0u64].into_array();
2243        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets)).unwrap();
2244
2245        let sliced1 = patches.slice(150..550).unwrap().unwrap();
2246        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
2247
2248        let sliced2 = sliced1.slice(100..250).unwrap().unwrap();
2249        assert_eq!(sliced2.num_patches(), 1); // 300
2250        assert_eq!(sliced2.offset(), 250);
2251
2252        assert_eq!(sliced2.search_index(50).unwrap(), SearchResult::Found(0)); // 300 - 250
2253        assert_eq!(
2254            sliced2.search_index(150).unwrap(),
2255            SearchResult::NotFound(1)
2256        );
2257    }
2258
2259    #[test]
2260    fn test_nested_slice_with_dropped_first_chunk() {
2261        // PATCH_CHUNK_SIZE = 1024, so the two patches land in different chunks.
2262        let indices = buffer![0u64, 1024].into_array();
2263        let values = buffer![1i32, 2].into_array();
2264        let chunk_offsets = buffer![0u64, 1].into_array();
2265        let patches = Patches::new(2048, 0, indices, values, Some(chunk_offsets)).unwrap();
2266
2267        // Drop chunk 0, then re-slice the result.
2268        let dropped_first = patches.slice(1024..2048).unwrap().unwrap();
2269        let resliced = dropped_first.slice(0..1024).unwrap().unwrap();
2270        assert_eq!(resliced.num_patches(), 1);
2271    }
2272
2273    #[test]
2274    fn test_index_larger_than_length() {
2275        let chunk_offsets = buffer![0u64].into_array();
2276        let indices = buffer![1023u64].into_array();
2277        let values = buffer![42i32].into_array();
2278        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets)).unwrap();
2279        assert_eq!(
2280            patches.search_index(2048).unwrap(),
2281            SearchResult::NotFound(1)
2282        );
2283    }
2284}