Skip to main content

vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use itertools::Itertools;
10use num_traits::NumCast;
11use vortex_buffer::BitBuffer;
12use vortex_buffer::BufferMut;
13use vortex_error::VortexError;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_error::vortex_err;
19use vortex_mask::AllOr;
20use vortex_mask::Mask;
21use vortex_mask::MaskMut;
22use vortex_utils::aliases::hash_map::HashMap;
23
24use crate::ArrayRef;
25use crate::ExecutionCtx;
26use crate::IntoArray;
27use crate::ToCanonical;
28use crate::arrays::BoolArray;
29use crate::arrays::PrimitiveArray;
30use crate::arrays::bool::BoolArrayExt;
31use crate::builtins::ArrayBuiltins;
32use crate::dtype::DType;
33use crate::dtype::IntegerPType;
34use crate::dtype::NativePType;
35use crate::dtype::Nullability;
36use crate::dtype::Nullability::NonNullable;
37use crate::dtype::PType;
38use crate::dtype::UnsignedPType;
39use crate::match_each_integer_ptype;
40use crate::match_each_unsigned_integer_ptype;
41use crate::scalar::PValue;
42use crate::scalar::Scalar;
43use crate::search_sorted::SearchResult;
44use crate::search_sorted::SearchSorted;
45use crate::search_sorted::SearchSortedSide;
46use crate::validity::Validity;
47
48/// One patch index offset is stored for each chunk.
49/// This allows for constant time patch index lookups.
50pub const PATCH_CHUNK_SIZE: usize = 1024;
51
52#[derive(Copy, Clone, prost::Message)]
53pub struct PatchesMetadata {
54    #[prost(uint64, tag = "1")]
55    len: u64,
56    #[prost(uint64, tag = "2")]
57    offset: u64,
58    #[prost(enumeration = "PType", tag = "3")]
59    indices_ptype: i32,
60    #[prost(uint64, optional, tag = "4")]
61    chunk_offsets_len: Option<u64>,
62    #[prost(enumeration = "PType", optional, tag = "5")]
63    chunk_offsets_ptype: Option<i32>,
64    #[prost(uint64, optional, tag = "6")]
65    offset_within_chunk: Option<u64>,
66}
67
68impl PatchesMetadata {
69    #[inline]
70    pub fn new(
71        len: usize,
72        offset: usize,
73        indices_ptype: PType,
74        chunk_offsets_len: Option<usize>,
75        chunk_offsets_ptype: Option<PType>,
76        offset_within_chunk: Option<usize>,
77    ) -> Self {
78        Self {
79            len: len as u64,
80            offset: offset as u64,
81            indices_ptype: indices_ptype as i32,
82            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
83            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
84            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
85        }
86    }
87
88    #[inline]
89    pub fn len(&self) -> VortexResult<usize> {
90        usize::try_from(self.len).map_err(|_| vortex_err!("len does not fit in usize"))
91    }
92
93    #[inline]
94    pub fn is_empty(&self) -> bool {
95        self.len == 0
96    }
97
98    #[inline]
99    pub fn offset(&self) -> VortexResult<usize> {
100        usize::try_from(self.offset).map_err(|_| vortex_err!("offset does not fit in usize"))
101    }
102
103    #[inline]
104    pub fn chunk_offsets_dtype(&self) -> VortexResult<Option<DType>> {
105        self.chunk_offsets_ptype
106            .map(|t| {
107                PType::try_from(t)
108                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
109                    .map(|ptype| DType::Primitive(ptype, NonNullable))
110            })
111            .transpose()
112    }
113
114    #[inline]
115    pub fn indices_dtype(&self) -> VortexResult<DType> {
116        let ptype = PType::try_from(self.indices_ptype).map_err(|e| {
117            vortex_err!("invalid i32 value {} for PType: {}", self.indices_ptype, e)
118        })?;
119        vortex_ensure!(
120            ptype.is_unsigned_int(),
121            "Patch indices must be unsigned integers"
122        );
123        Ok(DType::Primitive(ptype, NonNullable))
124    }
125}
126
127/// A helper for working with patched arrays.
128#[derive(Debug, Clone)]
129pub struct Patches {
130    array_len: usize,
131    offset: usize,
132    indices: ArrayRef,
133    values: ArrayRef,
134    /// Stores the patch index offset for each chunk.
135    ///
136    /// This allows us to lookup the patches for a given chunk in constant time via
137    /// `patch_indices[chunk_offsets[i]..chunk_offsets[i+1]]`.
138    ///
139    /// This is optional for compatibility reasons.
140    chunk_offsets: Option<ArrayRef>,
141    /// Chunk offsets are only sliced off in case the slice is fully
142    /// outside of the chunk range.
143    ///
144    /// Though the range for indices and values is sliced in terms of
145    /// individual elements, not chunks. To account for that we do a
146    /// saturating sub when adjusting the indices based on the chunk offset.
147    ///
148    /// `offset_within_chunk` is necessary in order to keep track of how many
149    /// elements were sliced off within the chunk.
150    offset_within_chunk: Option<usize>,
151}
152
153impl Patches {
154    pub fn new(
155        array_len: usize,
156        offset: usize,
157        indices: ArrayRef,
158        values: ArrayRef,
159        chunk_offsets: Option<ArrayRef>,
160    ) -> VortexResult<Self> {
161        vortex_ensure!(
162            indices.len() == values.len(),
163            "Patch indices and values must have the same length"
164        );
165        vortex_ensure!(
166            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
167            "Patch indices must be non-nullable unsigned integers, got {:?}",
168            indices.dtype()
169        );
170
171        vortex_ensure!(
172            indices.len() <= array_len,
173            "Patch indices must be shorter than the array length"
174        );
175        vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty");
176
177        // Perform validation of components when they are host-resident.
178        // This is not possible to do eagerly when the data is on GPU memory.
179        if indices.is_host() && values.is_host() {
180            let max = usize::try_from(&indices.scalar_at(indices.len() - 1)?)
181                .map_err(|_| vortex_err!("indices must be a number"))?;
182            vortex_ensure!(
183                max - offset < array_len,
184                "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
185            );
186
187            #[cfg(debug_assertions)]
188            {
189                use crate::VortexSessionExecute;
190                let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
191                assert!(
192                    crate::aggregate_fn::fns::is_sorted::is_sorted(&indices, &mut ctx)
193                        .unwrap_or(false),
194                    "Patch indices must be sorted"
195                );
196            }
197        }
198
199        Ok(Self {
200            array_len,
201            offset,
202            indices,
203            values,
204            chunk_offsets: chunk_offsets.clone(),
205            // Initialize with `Some(0)` only if `chunk_offsets` are set.
206            offset_within_chunk: chunk_offsets.map(|_| 0),
207        })
208    }
209
210    /// Construct new patches without validating any of the arguments
211    ///
212    /// # Safety
213    ///
214    /// Users have to assert that
215    /// * Indices and values have the same length
216    /// * Indices is an unsigned integer type
217    /// * Indices must be sorted
218    /// * Last value in indices is smaller than array_len
219    pub unsafe fn new_unchecked(
220        array_len: usize,
221        offset: usize,
222        indices: ArrayRef,
223        values: ArrayRef,
224        chunk_offsets: Option<ArrayRef>,
225        offset_within_chunk: Option<usize>,
226    ) -> Self {
227        Self {
228            array_len,
229            offset,
230            indices,
231            values,
232            chunk_offsets,
233            offset_within_chunk,
234        }
235    }
236
237    #[inline]
238    pub fn array_len(&self) -> usize {
239        self.array_len
240    }
241
242    #[inline]
243    pub fn num_patches(&self) -> usize {
244        self.indices.len()
245    }
246
247    #[inline]
248    pub fn dtype(&self) -> &DType {
249        self.values.dtype()
250    }
251
252    #[inline]
253    pub fn indices(&self) -> &ArrayRef {
254        &self.indices
255    }
256
257    #[inline]
258    pub fn into_indices(self) -> ArrayRef {
259        self.indices
260    }
261
262    #[inline]
263    pub fn indices_mut(&mut self) -> &mut ArrayRef {
264        &mut self.indices
265    }
266
267    #[inline]
268    pub fn values(&self) -> &ArrayRef {
269        &self.values
270    }
271
272    #[inline]
273    pub fn into_values(self) -> ArrayRef {
274        self.values
275    }
276
277    #[inline]
278    pub fn values_mut(&mut self) -> &mut ArrayRef {
279        &mut self.values
280    }
281
282    #[inline]
283    // Absolute offset: 0 if the array is unsliced.
284    pub fn offset(&self) -> usize {
285        self.offset
286    }
287
288    #[inline]
289    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
290        &self.chunk_offsets
291    }
292
293    #[inline]
294    pub fn chunk_offset_at(&self, idx: usize) -> VortexResult<usize> {
295        let Some(chunk_offsets) = &self.chunk_offsets else {
296            vortex_bail!("chunk_offsets must be set to retrieve offset at index")
297        };
298
299        chunk_offsets
300            .scalar_at(idx)?
301            .as_primitive()
302            .as_::<usize>()
303            .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))
304    }
305
306    /// Returns the number of patches sliced off from the current first chunk.
307    ///
308    /// When patches are sliced, the chunk offsets array is also sliced to only include
309    /// chunks that overlap with the slice range. However, the slice boundary may fall
310    /// in the middle of a chunk's patch range. This offset indicates how many patches
311    /// at the start of the first chunk should be skipped.
312    ///
313    /// Returns `None` if chunk offsets are not set.
314    #[inline]
315    pub fn offset_within_chunk(&self) -> Option<usize> {
316        self.offset_within_chunk
317    }
318
319    #[inline]
320    pub fn indices_ptype(&self) -> VortexResult<PType> {
321        PType::try_from(self.indices.dtype())
322            .map_err(|_| vortex_err!("indices dtype is not primitive"))
323    }
324
325    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
326        if self.indices.len() > len {
327            vortex_bail!(
328                "Patch indices {} are longer than the array length {}",
329                self.indices.len(),
330                len
331            );
332        }
333        if self.values.dtype() != dtype {
334            vortex_bail!(
335                "Patch values dtype {} does not match array dtype {}",
336                self.values.dtype(),
337                dtype
338            );
339        }
340        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
341        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
342
343        Ok(PatchesMetadata::new(
344            self.indices.len(),
345            self.offset,
346            self.indices.dtype().as_ptype(),
347            chunk_offsets_len,
348            chunk_offsets_ptype,
349            self.offset_within_chunk,
350        ))
351    }
352
353    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
354        // SAFETY: casting does not affect the relationship between the indices and values
355        unsafe {
356            Ok(Self::new_unchecked(
357                self.array_len,
358                self.offset,
359                self.indices,
360                self.values.cast(values_dtype.clone())?,
361                self.chunk_offsets,
362                self.offset_within_chunk,
363            ))
364        }
365    }
366
367    /// Get the patched value at a given index if it exists.
368    pub fn get_patched(&self, index: usize) -> VortexResult<Option<Scalar>> {
369        self.search_index(index)?
370            .to_found()
371            .map(|patch_idx| self.values().scalar_at(patch_idx))
372            .transpose()
373    }
374
375    /// Searches for `index` in the indices array.
376    ///
377    /// Chooses between chunked search when [`Self::chunk_offsets`] is
378    /// available, and binary search otherwise. The `index` parameter is
379    /// adjusted by [`Self::offset`] for both.
380    ///
381    /// # Arguments
382    /// * `index` - The index to search for
383    ///
384    /// # Returns
385    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
386    ///   position in the patches array
387    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
388    ///   a patch at this index would be inserted to maintain sorted order
389    ///
390    /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found
391    /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound
392    pub fn search_index(&self, index: usize) -> VortexResult<SearchResult> {
393        if self.chunk_offsets.is_some() {
394            return self.search_index_chunked(index);
395        }
396
397        Self::search_index_binary_search(&self.indices, index + self.offset)
398    }
399
400    /// Binary searches for `needle` in the indices array.
401    ///
402    /// # Returns
403    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
404    /// with the insertion point if not found.
405    fn search_index_binary_search(indices: &ArrayRef, needle: usize) -> VortexResult<SearchResult> {
406        if indices.is_canonical() {
407            let primitive = indices.to_primitive();
408            match_each_integer_ptype!(primitive.ptype(), |T| {
409                let Ok(needle) = T::try_from(needle) else {
410                    // If the needle is not of type T, then it cannot possibly be in this array.
411                    //
412                    // The needle is a non-negative integer (a usize); therefore, it must be larger
413                    // than all values in this array.
414                    return Ok(SearchResult::NotFound(primitive.len()));
415                };
416                return primitive
417                    .as_slice::<T>()
418                    .search_sorted(&needle, SearchSortedSide::Left);
419            });
420        }
421        indices
422            .as_primitive_typed()
423            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
424    }
425
426    /// Constant time searches for `index` in the indices array.
427    ///
428    /// First determines which chunk the target index falls into, then performs
429    /// a binary search within that chunk's range.
430    ///
431    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
432    /// or the insertion point if not found.
433    ///
434    /// Returns an error if `chunk_offsets` or `offset_within_chunk` are not set.
435    fn search_index_chunked(&self, index: usize) -> VortexResult<SearchResult> {
436        let Some(chunk_offsets) = &self.chunk_offsets else {
437            vortex_bail!("chunk_offsets is required to be set")
438        };
439
440        let Some(offset_within_chunk) = self.offset_within_chunk else {
441            vortex_bail!("offset_within_chunk is required to be set")
442        };
443
444        if index >= self.array_len() {
445            return Ok(SearchResult::NotFound(self.indices().len()));
446        }
447
448        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
449
450        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
451        let base_offset = self.chunk_offset_at(0)?;
452
453        let patches_start_idx = (self.chunk_offset_at(chunk_idx)? - base_offset)
454            // Chunk offsets are only sliced off in case the slice is fully
455            // outside of the chunk range.
456            //
457            // Though the range for indices and values is sliced in terms of
458            // individual elements, not chunks. To account for that we do a
459            // saturating sub when adjusting the indices based on the chunk offset.
460            .saturating_sub(offset_within_chunk);
461
462        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
463            (self.chunk_offset_at(chunk_idx + 1)? - base_offset)
464                .saturating_sub(offset_within_chunk)
465                .min(self.indices.len())
466        } else {
467            self.indices.len()
468        };
469
470        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx)?;
471        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset)?;
472
473        Ok(match result {
474            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
475            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
476        })
477    }
478
479    /// Batch version of `search_index`.
480    ///
481    /// In contrast to `search_index`, this function requires `indices` as
482    /// well as `chunk_offsets` to be passed as slices. This is to avoid
483    /// redundant canonicalization and `scalar_at` lookups across calls.
484    fn search_index_chunked_batch<T, O>(
485        &self,
486        indices: &[T],
487        chunk_offsets: &[O],
488        index: T,
489    ) -> VortexResult<SearchResult>
490    where
491        T: UnsignedPType,
492        O: UnsignedPType,
493        usize: TryFrom<T>,
494        usize: TryFrom<O>,
495    {
496        let Some(offset_within_chunk) = self.offset_within_chunk else {
497            vortex_bail!("offset_within_chunk is required to be set")
498        };
499
500        let chunk_idx = {
501            let Ok(index) = usize::try_from(index) else {
502                // If the needle cannot be converted to usize, it's larger than all values in this array.
503                return Ok(SearchResult::NotFound(indices.len()));
504            };
505
506            if index >= self.array_len() {
507                return Ok(SearchResult::NotFound(self.indices().len()));
508            }
509
510            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
511        };
512
513        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
514        let chunk_offset = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0])
515            .map_err(|_| vortex_err!("chunk_offset failed to convert to usize"))?;
516
517        let patches_start_idx = chunk_offset
518            // Chunk offsets are only sliced off in case the slice is fully
519            // outside of the chunk range.
520            //
521            // Though the range for indices and values is sliced in terms of
522            // individual elements, not chunks. To account for that we do a
523            // saturating sub when adjusting the indices based on the chunk offset.
524            .saturating_sub(offset_within_chunk);
525
526        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
527            usize::try_from(chunk_offsets[chunk_idx + 1] - chunk_offsets[0])
528                .map_err(|_| vortex_err!("patches_end_idx failed to convert to usize"))?
529                .saturating_sub(offset_within_chunk)
530                .min(indices.len())
531        } else {
532            self.indices.len()
533        };
534
535        let Some(offset) = T::from(self.offset) else {
536            // If the offset cannot be converted to T, it's larger than all values in this array.
537            return Ok(SearchResult::NotFound(indices.len()));
538        };
539
540        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
541        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left)?;
542
543        Ok(match result {
544            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
545            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
546        })
547    }
548
549    /// Returns the minimum patch index
550    pub fn min_index(&self) -> VortexResult<usize> {
551        let first = self
552            .indices
553            .scalar_at(0)?
554            .as_primitive()
555            .as_::<usize>()
556            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
557        Ok(first - self.offset)
558    }
559
560    /// Returns the maximum patch index
561    pub fn max_index(&self) -> VortexResult<usize> {
562        let last = self
563            .indices
564            .scalar_at(self.indices.len() - 1)?
565            .as_primitive()
566            .as_::<usize>()
567            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
568        Ok(last - self.offset)
569    }
570
571    /// Filter the patches by a mask, resulting in new patches for the filtered array.
572    pub fn filter(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
573        if mask.len() != self.array_len {
574            vortex_bail!(
575                "Filter mask length {} does not match array length {}",
576                mask.len(),
577                self.array_len
578            );
579        }
580
581        match mask.indices() {
582            AllOr::All => Ok(Some(self.clone())),
583            AllOr::None => Ok(None),
584            AllOr::Some(mask_indices) => {
585                let flat_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
586                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
587                    filter_patches_with_mask(
588                        flat_indices.as_slice::<I>(),
589                        self.offset(),
590                        self.values(),
591                        mask_indices,
592                    )
593                })
594            }
595        }
596    }
597
598    /// Mask the patches, REMOVING the patches where the mask is true.
599    /// Unlike filter, this preserves the patch indices.
600    /// Unlike mask on a single array, this does not set masked values to null.
601    // TODO(joe): make this lazy and remove the ctx.
602    pub fn mask(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
603        if mask.len() != self.array_len {
604            vortex_bail!(
605                "Filter mask length {} does not match array length {}",
606                mask.len(),
607                self.array_len
608            );
609        }
610
611        let filter_mask = match mask.bit_buffer() {
612            AllOr::All => return Ok(None),
613            AllOr::None => return Ok(Some(self.clone())),
614            AllOr::Some(masked) => {
615                let patch_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
616                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
617                    let patch_indices = patch_indices.as_slice::<P>();
618                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
619                        #[allow(clippy::cast_possible_truncation)]
620                        let idx = (patch_indices[i] as usize) - self.offset;
621                        !masked.value(idx)
622                    }))
623                })
624            }
625        };
626
627        if filter_mask.all_false() {
628            return Ok(None);
629        }
630
631        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
632        let filtered_indices = self.indices.filter(filter_mask.clone())?;
633        let filtered_values = self.values.filter(filter_mask)?;
634
635        Ok(Some(Self {
636            array_len: self.array_len,
637            offset: self.offset,
638            indices: filtered_indices,
639            values: filtered_values,
640            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
641            chunk_offsets: None,
642            offset_within_chunk: self.offset_within_chunk,
643        }))
644    }
645
646    /// Slice the patches by a range of the patched array.
647    pub fn slice(&self, range: Range<usize>) -> VortexResult<Option<Self>> {
648        let slice_start_idx = self.search_index(range.start)?.to_index();
649        let slice_end_idx = self.search_index(range.end)?.to_index();
650
651        if slice_start_idx == slice_end_idx {
652            return Ok(None);
653        }
654
655        let values = self.values().slice(slice_start_idx..slice_end_idx)?;
656        let indices = self.indices().slice(slice_start_idx..slice_end_idx)?;
657
658        let new_chunk_offsets = self
659            .chunk_offsets
660            .as_ref()
661            .map(|chunk_offsets| -> VortexResult<ArrayRef> {
662                let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
663                let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
664                let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
665                chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
666            })
667            .transpose()?;
668
669        let offset_within_chunk = new_chunk_offsets
670            .as_ref()
671            .map(|new_chunk_offsets| -> VortexResult<usize> {
672                let new_chunk_base = new_chunk_offsets
673                    .scalar_at(0)?
674                    .as_primitive()
675                    .as_::<usize>()
676                    .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))?;
677                let parent_chunk_base = self.chunk_offset_at(0)?;
678                let parent_within = self.offset_within_chunk.unwrap_or(0);
679                Ok(parent_chunk_base + parent_within + slice_start_idx - new_chunk_base)
680            })
681            .transpose()?;
682
683        Ok(Some(Self {
684            array_len: range.len(),
685            offset: range.start + self.offset(),
686            indices,
687            values,
688            chunk_offsets: new_chunk_offsets,
689            offset_within_chunk,
690        }))
691    }
692
693    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
694    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
695
696    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
697        (self.num_patches() as f64 / take_indices.len() as f64)
698            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
699    }
700
701    /// Take the indices from the patches
702    ///
703    /// Any nulls in take_indices are added to the resulting patches.
704    pub fn take_with_nulls(
705        &self,
706        take_indices: &ArrayRef,
707        ctx: &mut ExecutionCtx,
708    ) -> VortexResult<Option<Self>> {
709        if take_indices.is_empty() {
710            return Ok(None);
711        }
712
713        let take_indices = take_indices.clone().execute::<PrimitiveArray>(ctx)?;
714        if self.is_map_faster_than_search(&take_indices) {
715            self.take_map(take_indices, true, ctx)
716        } else {
717            self.take_search(take_indices, true, ctx)
718        }
719    }
720
721    /// Take the indices from the patches.
722    ///
723    /// Any nulls in take_indices are ignored.
724    pub fn take(
725        &self,
726        take_indices: &ArrayRef,
727        ctx: &mut ExecutionCtx,
728    ) -> VortexResult<Option<Self>> {
729        if take_indices.is_empty() {
730            return Ok(None);
731        }
732
733        let take_indices = take_indices.clone().execute::<PrimitiveArray>(ctx)?;
734        if self.is_map_faster_than_search(&take_indices) {
735            self.take_map(take_indices, false, ctx)
736        } else {
737            self.take_search(take_indices, false, ctx)
738        }
739    }
740
741    #[expect(
742        clippy::cognitive_complexity,
743        reason = "complexity is from nested match_each_* macros"
744    )]
745    pub fn take_search(
746        &self,
747        take_indices: PrimitiveArray,
748        include_nulls: bool,
749        ctx: &mut ExecutionCtx,
750    ) -> VortexResult<Option<Self>> {
751        let take_indices_validity = take_indices.validity()?;
752        let patch_indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
753        let chunk_offsets = self
754            .chunk_offsets()
755            .as_ref()
756            .map(|co| co.clone().execute::<PrimitiveArray>(ctx))
757            .transpose()?;
758
759        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
760            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
761                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
762                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
763                    let take_slice = take_indices.as_slice::<TakeT>();
764
765                    if let Some(chunk_offsets) = chunk_offsets {
766                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
767                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
768                            take_indices_with_search_fn(
769                                patch_indices_slice,
770                                take_slice,
771                                take_indices.validity_mask()?,
772                                include_nulls,
773                                |take_idx| {
774                                    self.search_index_chunked_batch(
775                                        patch_indices_slice,
776                                        chunk_offsets,
777                                        take_idx,
778                                    )
779                                },
780                            )?
781                        })
782                    } else {
783                        take_indices_with_search_fn(
784                            patch_indices_slice,
785                            take_slice,
786                            take_indices.validity_mask()?,
787                            include_nulls,
788                            |take_idx| {
789                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
790                                    // If the offset cannot be converted to T, it's larger than all values in this array.
791                                    return Ok(SearchResult::NotFound(patch_indices_slice.len()));
792                                };
793
794                                patch_indices_slice
795                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
796                            },
797                        )?
798                    }
799                })
800            });
801
802        if new_indices.is_empty() {
803            return Ok(None);
804        }
805
806        let new_indices = new_indices.into_array();
807        let new_array_len = take_indices.len();
808        let values_validity = take_indices_validity.take(&new_indices)?;
809
810        Ok(Some(Self {
811            array_len: new_array_len,
812            offset: 0,
813            indices: new_indices,
814            values: self
815                .values()
816                .take(PrimitiveArray::new(values_indices, values_validity).into_array())?,
817            chunk_offsets: None,
818            offset_within_chunk: Some(0), // Reset when creating new Patches.
819        }))
820    }
821
822    pub fn take_map(
823        &self,
824        take_indices: PrimitiveArray,
825        include_nulls: bool,
826        ctx: &mut ExecutionCtx,
827    ) -> VortexResult<Option<Self>> {
828        let indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
829        let new_length = take_indices.len();
830
831        let min_index = self.min_index()?;
832        let max_index = self.max_index()?;
833
834        let Some((new_sparse_indices, value_indices)) =
835            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
836                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
837                    let take_validity = take_indices
838                        .validity()?
839                        .execute_mask(take_indices.len(), ctx)?;
840                    let take_nullability = take_indices.validity()?.nullability();
841                    let take_slice = take_indices.as_slice::<TakeIndices>();
842                    take_map::<_, TakeIndices>(
843                        indices.as_slice::<Indices>(),
844                        take_slice,
845                        take_validity,
846                        take_nullability,
847                        self.offset(),
848                        min_index,
849                        max_index,
850                        include_nulls,
851                    )?
852                })
853            })
854        else {
855            return Ok(None);
856        };
857
858        let taken_values = self.values().take(value_indices)?;
859
860        Ok(Some(Patches {
861            array_len: new_length,
862            offset: 0,
863            indices: new_sparse_indices,
864            values: taken_values,
865            // TODO(0ax1): Chunk offsets are invalid after take is applied.
866            chunk_offsets: None,
867            offset_within_chunk: self.offset_within_chunk,
868        }))
869    }
870
871    /// Apply patches to a mutable buffer and validity mask.
872    ///
873    /// This method applies the patch values to the given buffer at the positions specified by the
874    /// patch indices. For non-null patch values, it updates the buffer and marks the position as
875    /// valid. For null patch values, it marks the position as invalid.
876    ///
877    /// # Safety
878    ///
879    /// - All patch indices after offset adjustment must be valid indices into the buffer.
880    /// - The buffer and validity mask must have the same length.
881    pub unsafe fn apply_to_buffer<P: NativePType>(
882        &self,
883        buffer: &mut [P],
884        validity: &mut MaskMut,
885        ctx: &mut ExecutionCtx,
886    ) {
887        let patch_indices = self
888            .indices
889            .clone()
890            .execute::<PrimitiveArray>(ctx)
891            .vortex_expect("patch indices must be convertible to PrimitiveArray");
892        let patch_values = self
893            .values
894            .clone()
895            .execute::<PrimitiveArray>(ctx)
896            .vortex_expect("patch values must be convertible to PrimitiveArray");
897        let patches_validity = patch_values
898            .validity()
899            .vortex_expect("patch values validity should be derivable");
900
901        let patch_values_slice = patch_values.as_slice::<P>();
902        match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
903            let patch_indices_slice = patch_indices.as_slice::<I>();
904
905            // SAFETY:
906            // - `Patches` invariant guarantees indices are sorted and within array bounds.
907            // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
908            // - `buffer` and `validity` have equal length (precondition).
909            // - All patch indices are valid after offset adjustment (precondition).
910            unsafe {
911                apply_patches_to_buffer_inner(
912                    buffer,
913                    validity,
914                    patch_indices_slice,
915                    self.offset,
916                    patch_values_slice,
917                    &patches_validity,
918                    ctx,
919                );
920            }
921        });
922    }
923
924    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
925    where
926        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
927    {
928        let values = f(self.values)?;
929        if self.indices.len() != values.len() {
930            vortex_bail!(
931                "map_values must preserve length: expected {} received {}",
932                self.indices.len(),
933                values.len()
934            )
935        }
936
937        Ok(Self {
938            array_len: self.array_len,
939            offset: self.offset,
940            indices: self.indices,
941            values,
942            chunk_offsets: self.chunk_offsets,
943            offset_within_chunk: self.offset_within_chunk,
944        })
945    }
946}
947
948/// Helper function to apply patches to a buffer.
949///
950/// # Safety
951///
952/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
953///   into both `buffer` and `validity`.
954/// - `patch_indices` must be sorted in ascending order.
955/// - `patch_indices` and `patch_values` must have the same length.
956/// - `buffer` and `validity` must have the same length.
957unsafe fn apply_patches_to_buffer_inner<P, I>(
958    buffer: &mut [P],
959    validity: &mut MaskMut,
960    patch_indices: &[I],
961    patch_offset: usize,
962    patch_values: &[P],
963    patches_validity: &Validity,
964    ctx: &mut ExecutionCtx,
965) where
966    P: NativePType,
967    I: UnsignedPType,
968{
969    debug_assert!(!patch_indices.is_empty());
970    debug_assert_eq!(patch_indices.len(), patch_values.len());
971    debug_assert_eq!(buffer.len(), validity.len());
972
973    match patches_validity {
974        Validity::NonNullable | Validity::AllValid => {
975            // All patch values are valid, apply them all.
976            for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
977                let index = i.as_() - patch_offset;
978
979                // SAFETY: `index` is valid because caller guarantees all patch indices are within
980                // bounds after offset adjustment.
981                unsafe {
982                    validity.set_unchecked(index);
983                }
984                buffer[index] = value;
985            }
986        }
987        Validity::AllInvalid => {
988            // All patch values are null, just mark positions as invalid.
989            for &i in patch_indices {
990                let index = i.as_() - patch_offset;
991
992                // SAFETY: `index` is valid because caller guarantees all patch indices are within
993                // bounds after offset adjustment.
994                unsafe {
995                    validity.unset_unchecked(index);
996                }
997            }
998        }
999        Validity::Array(array) => {
1000            // Some patch values may be null, check each one.
1001            let bool_array = array
1002                .clone()
1003                .execute::<BoolArray>(ctx)
1004                .vortex_expect("validity array must be convertible to BoolArray");
1005            let mask = bool_array.to_bit_buffer();
1006            for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() {
1007                let index = i.as_() - patch_offset;
1008
1009                // SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch
1010                // indices are within bounds after offset adjustment.
1011                unsafe {
1012                    if mask.value_unchecked(patch_idx) {
1013                        buffer[index] = value;
1014                        validity.set_unchecked(index);
1015                    } else {
1016                        validity.unset_unchecked(index);
1017                    }
1018                }
1019            }
1020        }
1021    }
1022}
1023
1024#[allow(clippy::too_many_arguments)] // private function, can clean up one day
1025fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
1026    indices: &[I],
1027    take_indices: &[T],
1028    take_validity: Mask,
1029    take_nullability: Nullability,
1030    indices_offset: usize,
1031    min_index: usize,
1032    max_index: usize,
1033    include_nulls: bool,
1034) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
1035where
1036    usize: TryFrom<T>,
1037    VortexError: From<<I as TryFrom<usize>>::Error>,
1038{
1039    let offset_i = I::try_from(indices_offset)?;
1040
1041    let sparse_index_to_value_index: HashMap<I, usize> = indices
1042        .iter()
1043        .copied()
1044        .map(|idx| idx - offset_i)
1045        .enumerate()
1046        .map(|(value_index, sparse_index)| (sparse_index, value_index))
1047        .collect();
1048
1049    let mut new_sparse_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1050    let mut value_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1051
1052    for (idx_in_take, &take_idx) in take_indices.iter().enumerate() {
1053        let ti = usize::try_from(take_idx)
1054            .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
1055
1056        // If we have to take nulls the take index doesn't matter, make it 0 for consistency
1057        let is_null = match take_validity.bit_buffer() {
1058            AllOr::All => false,
1059            AllOr::None => true,
1060            AllOr::Some(buf) => !buf.value(idx_in_take),
1061        };
1062        if is_null {
1063            if include_nulls {
1064                new_sparse_indices.push(idx_in_take as u64);
1065                value_indices.push(0);
1066            }
1067        } else if ti >= min_index && ti <= max_index {
1068            let ti_as_i = I::try_from(ti)
1069                .map_err(|_| vortex_err!("take index does not fit in index type"))?;
1070            if let Some(&value_index) = sparse_index_to_value_index.get(&ti_as_i) {
1071                new_sparse_indices.push(idx_in_take as u64);
1072                value_indices.push(value_index as u64);
1073            }
1074        }
1075    }
1076
1077    if new_sparse_indices.is_empty() {
1078        return Ok(None);
1079    }
1080
1081    let new_sparse_indices = new_sparse_indices.into_array();
1082    let values_validity =
1083        Validity::from_mask(take_validity, take_nullability).take(&new_sparse_indices)?;
1084    Ok(Some((
1085        new_sparse_indices,
1086        PrimitiveArray::new(value_indices, values_validity).into_array(),
1087    )))
1088}
1089
1090/// Filter patches with the provided mask (in flattened space).
1091///
1092/// The filter mask may contain indices that are non-patched. The return value of this function
1093/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
1094/// patch values.
1095fn filter_patches_with_mask<T: IntegerPType>(
1096    patch_indices: &[T],
1097    offset: usize,
1098    patch_values: &ArrayRef,
1099    mask_indices: &[usize],
1100) -> VortexResult<Option<Patches>> {
1101    let true_count = mask_indices.len();
1102    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
1103    let mut new_mask_indices = Vec::with_capacity(true_count);
1104
1105    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
1106    // the patches are relatively sparse compared to the overall mask, and so many indices in the
1107    // mask will end up being skipped.
1108    const STRIDE: usize = 4;
1109
1110    let mut mask_idx = 0usize;
1111    let mut true_idx = 0usize;
1112
1113    while mask_idx < patch_indices.len() && true_idx < true_count {
1114        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
1115        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
1116        //  the mask (which covers both patch and non-patch values of the parent array), and so to
1117        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
1118        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
1119        //  fallback to performing a two-way iterator merge.
1120        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
1121            // Load a vector of each into our registers.
1122            let left_min = patch_indices[mask_idx]
1123                .to_usize()
1124                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1125                - offset;
1126            let left_max = patch_indices[mask_idx + STRIDE]
1127                .to_usize()
1128                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1129                - offset;
1130            let right_min = mask_indices[true_idx];
1131            let right_max = mask_indices[true_idx + STRIDE];
1132
1133            if left_min > right_max {
1134                // Advance right side
1135                true_idx += STRIDE;
1136                continue;
1137            } else if right_min > left_max {
1138                mask_idx += STRIDE;
1139                continue;
1140            } else {
1141                // Fallthrough to direct comparison path.
1142            }
1143        }
1144
1145        // Two-way sorted iterator merge:
1146
1147        let left = patch_indices[mask_idx]
1148            .to_usize()
1149            .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1150            - offset;
1151        let right = mask_indices[true_idx];
1152
1153        match left.cmp(&right) {
1154            Ordering::Less => {
1155                mask_idx += 1;
1156            }
1157            Ordering::Greater => {
1158                true_idx += 1;
1159            }
1160            Ordering::Equal => {
1161                // Save the mask index as well as the positional index.
1162                new_mask_indices.push(mask_idx);
1163                new_patch_indices.push(true_idx as u64);
1164
1165                mask_idx += 1;
1166                true_idx += 1;
1167            }
1168        }
1169    }
1170
1171    if new_mask_indices.is_empty() {
1172        return Ok(None);
1173    }
1174
1175    let new_patch_indices = new_patch_indices.into_array();
1176    let new_patch_values =
1177        patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?;
1178
1179    Ok(Some(Patches::new(
1180        true_count,
1181        0,
1182        new_patch_indices,
1183        new_patch_values,
1184        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
1185        None,
1186    )?))
1187}
1188
1189fn take_indices_with_search_fn<
1190    I: UnsignedPType,
1191    T: IntegerPType,
1192    F: Fn(I) -> VortexResult<SearchResult>,
1193>(
1194    indices: &[I],
1195    take_indices: &[T],
1196    take_validity: Mask,
1197    include_nulls: bool,
1198    search_fn: F,
1199) -> VortexResult<(BufferMut<u64>, BufferMut<u64>)> {
1200    let mut values_indices = BufferMut::with_capacity(take_indices.len());
1201    let mut new_indices = BufferMut::with_capacity(take_indices.len());
1202
1203    for (new_patch_idx, &take_idx) in take_indices.iter().enumerate() {
1204        if !take_validity.value(new_patch_idx) {
1205            if include_nulls {
1206                // For nulls, patch index doesn't matter - use 0 for consistency
1207                values_indices.push(0u64);
1208                new_indices.push(new_patch_idx as u64);
1209            }
1210            continue;
1211        } else {
1212            let search_result = match I::from(take_idx) {
1213                Some(idx) => search_fn(idx)?,
1214                None => SearchResult::NotFound(indices.len()),
1215            };
1216
1217            if let Some(patch_idx) = search_result.to_found() {
1218                values_indices.push(patch_idx as u64);
1219                new_indices.push(new_patch_idx as u64);
1220            }
1221        }
1222    }
1223
1224    Ok((values_indices, new_indices))
1225}
1226
1227#[cfg(test)]
1228mod test {
1229    use vortex_buffer::BufferMut;
1230    use vortex_buffer::buffer;
1231    use vortex_mask::Mask;
1232
1233    use crate::IntoArray;
1234    use crate::LEGACY_SESSION;
1235    use crate::ToCanonical;
1236    use crate::VortexSessionExecute;
1237    use crate::assert_arrays_eq;
1238    use crate::patches::Patches;
1239    use crate::patches::PrimitiveArray;
1240    use crate::search_sorted::SearchResult;
1241    use crate::validity::Validity;
1242
1243    #[test]
1244    fn test_filter() {
1245        let patches = Patches::new(
1246            100,
1247            0,
1248            buffer![10u32, 11, 20].into_array(),
1249            buffer![100, 110, 200].into_array(),
1250            None,
1251        )
1252        .unwrap();
1253
1254        let filtered = patches
1255            .filter(
1256                &Mask::from_indices(100, vec![10, 20, 30]),
1257                &mut LEGACY_SESSION.create_execution_ctx(),
1258            )
1259            .unwrap()
1260            .unwrap();
1261
1262        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([0u64, 1]));
1263        assert_arrays_eq!(filtered.values(), PrimitiveArray::from_iter([100i32, 200]));
1264    }
1265
1266    #[test]
1267    fn take_with_nulls() {
1268        let patches = Patches::new(
1269            20,
1270            0,
1271            buffer![2u64, 9, 15].into_array(),
1272            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1273            None,
1274        )
1275        .unwrap();
1276
1277        let taken = patches
1278            .take(
1279                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1280                    .into_array(),
1281                &mut LEGACY_SESSION.create_execution_ctx(),
1282            )
1283            .unwrap()
1284            .unwrap();
1285        let primitive_values = taken.values().to_primitive();
1286        let primitive_indices = taken.indices().to_primitive();
1287        assert_eq!(taken.array_len(), 2);
1288        assert_arrays_eq!(
1289            primitive_values,
1290            PrimitiveArray::from_option_iter([Some(44i32)])
1291        );
1292        assert_arrays_eq!(primitive_indices, PrimitiveArray::from_iter([0u64]));
1293        assert_eq!(
1294            primitive_values.validity_mask().unwrap(),
1295            Mask::from_iter(vec![true])
1296        );
1297    }
1298
1299    #[test]
1300    fn take_search_with_nulls_chunked() {
1301        let patches = Patches::new(
1302            20,
1303            0,
1304            buffer![2u64, 9, 15].into_array(),
1305            buffer![33_i32, 44, 55].into_array(),
1306            Some(buffer![0u64].into_array()),
1307        )
1308        .unwrap();
1309
1310        let taken = patches
1311            .take_search(
1312                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1313                true,
1314                &mut LEGACY_SESSION.create_execution_ctx(),
1315            )
1316            .unwrap()
1317            .unwrap();
1318
1319        let primitive_values = taken.values().to_primitive();
1320        assert_eq!(taken.array_len(), 2);
1321        assert_arrays_eq!(
1322            primitive_values,
1323            PrimitiveArray::from_option_iter([Some(44i32), None])
1324        );
1325        assert_arrays_eq!(taken.indices(), PrimitiveArray::from_iter([0u64, 1]));
1326
1327        assert_eq!(
1328            primitive_values.validity_mask().unwrap(),
1329            Mask::from_iter([true, false])
1330        );
1331    }
1332
1333    #[test]
1334    fn take_search_chunked_multiple_chunks() {
1335        let patches = Patches::new(
1336            2048,
1337            0,
1338            buffer![100u64, 500, 1200, 1800].into_array(),
1339            buffer![10_i32, 20, 30, 40].into_array(),
1340            Some(buffer![0u64, 2].into_array()),
1341        )
1342        .unwrap();
1343
1344        let taken = patches
1345            .take_search(
1346                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1347                true,
1348                &mut LEGACY_SESSION.create_execution_ctx(),
1349            )
1350            .unwrap()
1351            .unwrap();
1352
1353        assert_eq!(taken.array_len(), 3);
1354        assert_arrays_eq!(
1355            taken.values(),
1356            PrimitiveArray::from_option_iter([Some(20i32), Some(30)])
1357        );
1358    }
1359
1360    #[test]
1361    fn take_search_chunked_indices_with_no_patches() {
1362        let patches = Patches::new(
1363            20,
1364            0,
1365            buffer![2u64, 9, 15].into_array(),
1366            buffer![33_i32, 44, 55].into_array(),
1367            Some(buffer![0u64].into_array()),
1368        )
1369        .unwrap();
1370
1371        let taken = patches
1372            .take_search(
1373                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1374                true,
1375                &mut LEGACY_SESSION.create_execution_ctx(),
1376            )
1377            .unwrap();
1378
1379        assert!(taken.is_none());
1380    }
1381
1382    #[test]
1383    fn take_search_chunked_interleaved() {
1384        let patches = Patches::new(
1385            30,
1386            0,
1387            buffer![5u64, 10, 20, 25].into_array(),
1388            buffer![100_i32, 200, 300, 400].into_array(),
1389            Some(buffer![0u64].into_array()),
1390        )
1391        .unwrap();
1392
1393        let taken = patches
1394            .take_search(
1395                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1396                true,
1397                &mut LEGACY_SESSION.create_execution_ctx(),
1398            )
1399            .unwrap()
1400            .unwrap();
1401
1402        assert_eq!(taken.array_len(), 4);
1403        assert_arrays_eq!(
1404            taken.values(),
1405            PrimitiveArray::from_option_iter([Some(200i32), Some(300)])
1406        );
1407    }
1408
1409    #[test]
1410    fn test_take_search_multiple_chunk_offsets() {
1411        let patches = Patches::new(
1412            1500,
1413            0,
1414            BufferMut::from_iter(0..1500u64).into_array(),
1415            BufferMut::from_iter(0..1500i32).into_array(),
1416            Some(buffer![0u64, 1024u64].into_array()),
1417        )
1418        .unwrap();
1419
1420        let taken = patches
1421            .take_search(
1422                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1423                false,
1424                &mut LEGACY_SESSION.create_execution_ctx(),
1425            )
1426            .unwrap()
1427            .unwrap();
1428
1429        assert_eq!(taken.array_len(), 1500);
1430    }
1431
1432    #[test]
1433    fn test_slice() {
1434        let values = buffer![15_u32, 135, 13531, 42].into_array();
1435        let indices = buffer![10_u64, 11, 50, 100].into_array();
1436
1437        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1438
1439        let sliced = patches.slice(15..100).unwrap().unwrap();
1440        assert_eq!(sliced.array_len(), 100 - 15);
1441        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([13531u32]));
1442    }
1443
1444    #[test]
1445    fn doubly_sliced() {
1446        let values = buffer![15_u32, 135, 13531, 42].into_array();
1447        let indices = buffer![10_u64, 11, 50, 100].into_array();
1448
1449        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1450
1451        let sliced = patches.slice(15..100).unwrap().unwrap();
1452        assert_eq!(sliced.array_len(), 100 - 15);
1453        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([13531u32]));
1454
1455        let doubly_sliced = sliced.slice(35..36).unwrap().unwrap();
1456        assert_arrays_eq!(
1457            doubly_sliced.values(),
1458            PrimitiveArray::from_iter([13531u32])
1459        );
1460    }
1461
1462    #[test]
1463    fn test_mask_all_true() {
1464        let patches = Patches::new(
1465            10,
1466            0,
1467            buffer![2u64, 5, 8].into_array(),
1468            buffer![100i32, 200, 300].into_array(),
1469            None,
1470        )
1471        .unwrap();
1472
1473        let mask = Mask::new_true(10);
1474        let masked = patches
1475            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1476            .unwrap();
1477        assert!(masked.is_none());
1478    }
1479
1480    #[test]
1481    fn test_mask_all_false() {
1482        let patches = Patches::new(
1483            10,
1484            0,
1485            buffer![2u64, 5, 8].into_array(),
1486            buffer![100i32, 200, 300].into_array(),
1487            None,
1488        )
1489        .unwrap();
1490
1491        let mask = Mask::new_false(10);
1492        let masked = patches
1493            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1494            .unwrap()
1495            .unwrap();
1496
1497        // No patch values should be masked
1498        assert_arrays_eq!(
1499            masked.values(),
1500            PrimitiveArray::from_iter([100i32, 200, 300])
1501        );
1502        assert!(masked.values().is_valid(0).unwrap());
1503        assert!(masked.values().is_valid(1).unwrap());
1504        assert!(masked.values().is_valid(2).unwrap());
1505
1506        // Indices should remain unchanged
1507        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1508    }
1509
1510    #[test]
1511    fn test_mask_partial() {
1512        let patches = Patches::new(
1513            10,
1514            0,
1515            buffer![2u64, 5, 8].into_array(),
1516            buffer![100i32, 200, 300].into_array(),
1517            None,
1518        )
1519        .unwrap();
1520
1521        // Mask that removes patches at indices 2 and 8 (but not 5)
1522        let mask = Mask::from_iter([
1523            false, false, true, false, false, false, false, false, true, false,
1524        ]);
1525        let masked = patches
1526            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1527            .unwrap()
1528            .unwrap();
1529
1530        // Only the patch at index 5 should remain
1531        assert_eq!(masked.values().len(), 1);
1532        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32]));
1533
1534        // Only index 5 should remain
1535        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([5u64]));
1536    }
1537
1538    #[test]
1539    fn test_mask_with_offset() {
1540        let patches = Patches::new(
1541            10,
1542            5,                                  // offset
1543            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1544            buffer![100i32, 200, 300].into_array(),
1545            None,
1546        )
1547        .unwrap();
1548
1549        // Mask that sets actual index 2 to null
1550        let mask = Mask::from_iter([
1551            false, false, true, false, false, false, false, false, false, false,
1552        ]);
1553
1554        let masked = patches
1555            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1556            .unwrap()
1557            .unwrap();
1558        assert_eq!(masked.array_len(), 10);
1559        assert_eq!(masked.offset(), 5);
1560        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([10u64, 13]));
1561        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32, 300]));
1562    }
1563
1564    #[test]
1565    fn test_mask_nullable_values() {
1566        let patches = Patches::new(
1567            10,
1568            0,
1569            buffer![2u64, 5, 8].into_array(),
1570            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1571            None,
1572        )
1573        .unwrap();
1574
1575        // Test masking removes patch at index 2
1576        let mask = Mask::from_iter([
1577            false, false, true, false, false, false, false, false, false, false,
1578        ]);
1579        let masked = patches
1580            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1581            .unwrap()
1582            .unwrap();
1583
1584        // Patches at indices 5 and 8 should remain
1585        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([5u64, 8]));
1586
1587        // Values should be the null and 300
1588        let masked_values = masked.values().to_primitive();
1589        assert_eq!(masked_values.len(), 2);
1590        assert!(!masked_values.is_valid(0).unwrap()); // the null value at index 5
1591        assert!(masked_values.is_valid(1).unwrap()); // the 300 value at index 8
1592        assert_eq!(
1593            i32::try_from(&masked_values.scalar_at(1).unwrap()).unwrap(),
1594            300i32
1595        );
1596    }
1597
1598    #[test]
1599    fn test_filter_keep_all() {
1600        let patches = Patches::new(
1601            10,
1602            0,
1603            buffer![2u64, 5, 8].into_array(),
1604            buffer![100i32, 200, 300].into_array(),
1605            None,
1606        )
1607        .unwrap();
1608
1609        // Keep all indices (mask with indices 0-9)
1610        let mask = Mask::from_indices(10, (0..10).collect());
1611        let filtered = patches
1612            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1613            .unwrap()
1614            .unwrap();
1615
1616        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1617        assert_arrays_eq!(
1618            filtered.values(),
1619            PrimitiveArray::from_iter([100i32, 200, 300])
1620        );
1621    }
1622
1623    #[test]
1624    fn test_filter_none() {
1625        let patches = Patches::new(
1626            10,
1627            0,
1628            buffer![2u64, 5, 8].into_array(),
1629            buffer![100i32, 200, 300].into_array(),
1630            None,
1631        )
1632        .unwrap();
1633
1634        // Filter out all (empty mask means keep nothing)
1635        let mask = Mask::from_indices(10, vec![]);
1636        let filtered = patches
1637            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1638            .unwrap();
1639        assert!(filtered.is_none());
1640    }
1641
1642    #[test]
1643    fn test_filter_with_indices() {
1644        let patches = Patches::new(
1645            10,
1646            0,
1647            buffer![2u64, 5, 8].into_array(),
1648            buffer![100i32, 200, 300].into_array(),
1649            None,
1650        )
1651        .unwrap();
1652
1653        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1654        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1655        let filtered = patches
1656            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1657            .unwrap()
1658            .unwrap();
1659
1660        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([0u64, 1])); // Adjusted indices
1661        assert_arrays_eq!(filtered.values(), PrimitiveArray::from_iter([100i32, 200]));
1662    }
1663
1664    #[test]
1665    fn test_slice_full_range() {
1666        let patches = Patches::new(
1667            10,
1668            0,
1669            buffer![2u64, 5, 8].into_array(),
1670            buffer![100i32, 200, 300].into_array(),
1671            None,
1672        )
1673        .unwrap();
1674
1675        let sliced = patches.slice(0..10).unwrap().unwrap();
1676
1677        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1678        assert_arrays_eq!(
1679            sliced.values(),
1680            PrimitiveArray::from_iter([100i32, 200, 300])
1681        );
1682    }
1683
1684    #[test]
1685    fn test_slice_partial() {
1686        let patches = Patches::new(
1687            10,
1688            0,
1689            buffer![2u64, 5, 8].into_array(),
1690            buffer![100i32, 200, 300].into_array(),
1691            None,
1692        )
1693        .unwrap();
1694
1695        // Slice from 3 to 8 (includes patch at 5)
1696        let sliced = patches.slice(3..8).unwrap().unwrap();
1697
1698        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([5u64])); // Index stays the same
1699        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([200i32]));
1700        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1701        assert_eq!(sliced.offset(), 3); // New offset
1702    }
1703
1704    #[test]
1705    fn test_slice_no_patches() {
1706        let patches = Patches::new(
1707            10,
1708            0,
1709            buffer![2u64, 5, 8].into_array(),
1710            buffer![100i32, 200, 300].into_array(),
1711            None,
1712        )
1713        .unwrap();
1714
1715        // Slice from 6 to 7 (no patches in this range)
1716        let sliced = patches.slice(6..7).unwrap();
1717        assert!(sliced.is_none());
1718    }
1719
1720    #[test]
1721    fn test_slice_with_offset() {
1722        let patches = Patches::new(
1723            10,
1724            5,                                  // offset
1725            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1726            buffer![100i32, 200, 300].into_array(),
1727            None,
1728        )
1729        .unwrap();
1730
1731        // Slice from 3 to 8 (includes patch at actual index 5)
1732        let sliced = patches.slice(3..8).unwrap().unwrap();
1733
1734        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([10u64])); // Index stays the same (offset + 5 = 10)
1735        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([200i32]));
1736        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1737    }
1738
1739    #[test]
1740    fn test_patch_values() {
1741        let patches = Patches::new(
1742            10,
1743            0,
1744            buffer![2u64, 5, 8].into_array(),
1745            buffer![100i32, 200, 300].into_array(),
1746            None,
1747        )
1748        .unwrap();
1749
1750        let values = patches.values().to_primitive();
1751        assert_eq!(
1752            i32::try_from(&values.scalar_at(0).unwrap()).unwrap(),
1753            100i32
1754        );
1755        assert_eq!(
1756            i32::try_from(&values.scalar_at(1).unwrap()).unwrap(),
1757            200i32
1758        );
1759        assert_eq!(
1760            i32::try_from(&values.scalar_at(2).unwrap()).unwrap(),
1761            300i32
1762        );
1763    }
1764
1765    #[test]
1766    fn test_indices_range() {
1767        let patches = Patches::new(
1768            10,
1769            0,
1770            buffer![2u64, 5, 8].into_array(),
1771            buffer![100i32, 200, 300].into_array(),
1772            None,
1773        )
1774        .unwrap();
1775
1776        assert_eq!(patches.min_index().unwrap(), 2);
1777        assert_eq!(patches.max_index().unwrap(), 8);
1778    }
1779
1780    #[test]
1781    fn test_search_index() {
1782        let patches = Patches::new(
1783            10,
1784            0,
1785            buffer![2u64, 5, 8].into_array(),
1786            buffer![100i32, 200, 300].into_array(),
1787            None,
1788        )
1789        .unwrap();
1790
1791        // Search for exact indices
1792        assert_eq!(patches.search_index(2).unwrap(), SearchResult::Found(0));
1793        assert_eq!(patches.search_index(5).unwrap(), SearchResult::Found(1));
1794        assert_eq!(patches.search_index(8).unwrap(), SearchResult::Found(2));
1795
1796        // Search for non-patch indices
1797        assert_eq!(patches.search_index(0).unwrap(), SearchResult::NotFound(0));
1798        assert_eq!(patches.search_index(3).unwrap(), SearchResult::NotFound(1));
1799        assert_eq!(patches.search_index(6).unwrap(), SearchResult::NotFound(2));
1800        assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3));
1801    }
1802
1803    #[test]
1804    fn test_mask_boundary_patches() {
1805        // Test masking patches at array boundaries
1806        let patches = Patches::new(
1807            10,
1808            0,
1809            buffer![0u64, 9].into_array(),
1810            buffer![100i32, 200].into_array(),
1811            None,
1812        )
1813        .unwrap();
1814
1815        let mask = Mask::from_iter([
1816            true, false, false, false, false, false, false, false, false, false,
1817        ]);
1818        let masked = patches
1819            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1820            .unwrap();
1821        assert!(masked.is_some());
1822        let masked = masked.unwrap();
1823        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([9u64]));
1824        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32]));
1825    }
1826
1827    #[test]
1828    fn test_mask_all_patches_removed() {
1829        // Test when all patches are masked out
1830        let patches = Patches::new(
1831            10,
1832            0,
1833            buffer![2u64, 5, 8].into_array(),
1834            buffer![100i32, 200, 300].into_array(),
1835            None,
1836        )
1837        .unwrap();
1838
1839        // Mask that removes all patches
1840        let mask = Mask::from_iter([
1841            false, false, true, false, false, true, false, false, true, false,
1842        ]);
1843        let masked = patches
1844            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1845            .unwrap();
1846        assert!(masked.is_none());
1847    }
1848
1849    #[test]
1850    fn test_mask_no_patches_removed() {
1851        // Test when no patches are masked
1852        let patches = Patches::new(
1853            10,
1854            0,
1855            buffer![2u64, 5, 8].into_array(),
1856            buffer![100i32, 200, 300].into_array(),
1857            None,
1858        )
1859        .unwrap();
1860
1861        // Mask that doesn't affect any patches
1862        let mask = Mask::from_iter([
1863            true, false, false, true, false, false, true, false, false, true,
1864        ]);
1865        let masked = patches
1866            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1867            .unwrap()
1868            .unwrap();
1869
1870        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1871        assert_arrays_eq!(
1872            masked.values(),
1873            PrimitiveArray::from_iter([100i32, 200, 300])
1874        );
1875    }
1876
1877    #[test]
1878    fn test_mask_single_patch() {
1879        // Test with a single patch
1880        let patches = Patches::new(
1881            5,
1882            0,
1883            buffer![2u64].into_array(),
1884            buffer![42i32].into_array(),
1885            None,
1886        )
1887        .unwrap();
1888
1889        // Mask that removes the single patch
1890        let mask = Mask::from_iter([false, false, true, false, false]);
1891        let masked = patches
1892            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1893            .unwrap();
1894        assert!(masked.is_none());
1895
1896        // Mask that keeps the single patch
1897        let mask = Mask::from_iter([true, false, false, true, false]);
1898        let masked = patches
1899            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1900            .unwrap()
1901            .unwrap();
1902        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64]));
1903    }
1904
1905    #[test]
1906    fn test_mask_contiguous_patches() {
1907        // Test with contiguous patches
1908        let patches = Patches::new(
1909            10,
1910            0,
1911            buffer![3u64, 4, 5, 6].into_array(),
1912            buffer![100i32, 200, 300, 400].into_array(),
1913            None,
1914        )
1915        .unwrap();
1916
1917        // Mask that removes middle patches
1918        let mask = Mask::from_iter([
1919            false, false, false, false, true, true, false, false, false, false,
1920        ]);
1921        let masked = patches
1922            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1923            .unwrap()
1924            .unwrap();
1925
1926        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([3u64, 6]));
1927        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([100i32, 400]));
1928    }
1929
1930    #[test]
1931    fn test_mask_with_large_offset() {
1932        // Test with a large offset that shifts all indices
1933        let patches = Patches::new(
1934            20,
1935            15,
1936            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1937            buffer![100i32, 200, 300].into_array(),
1938            None,
1939        )
1940        .unwrap();
1941
1942        // Mask that removes the patch at actual index 2
1943        let mask = Mask::from_iter([
1944            false, false, true, false, false, false, false, false, false, false, false, false,
1945            false, false, false, false, false, false, false, false,
1946        ]);
1947        let masked = patches
1948            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1949            .unwrap()
1950            .unwrap();
1951
1952        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([16u64, 19]));
1953        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([100i32, 300]));
1954    }
1955
1956    #[test]
1957    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1958    fn test_mask_wrong_length() {
1959        let patches = Patches::new(
1960            10,
1961            0,
1962            buffer![2u64, 5, 8].into_array(),
1963            buffer![100i32, 200, 300].into_array(),
1964            None,
1965        )
1966        .unwrap();
1967
1968        // Mask with wrong length
1969        let mask = Mask::from_iter([false, false, true, false, false]);
1970        patches
1971            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1972            .unwrap();
1973    }
1974
1975    #[test]
1976    fn test_chunk_offsets_search() {
1977        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1978        let values = buffer![10i32, 20, 30, 40].into_array();
1979        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1980        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets)).unwrap();
1981
1982        assert!(patches.chunk_offsets.is_some());
1983
1984        // chunk 0: patches at 100, 200
1985        assert_eq!(patches.search_index(100).unwrap(), SearchResult::Found(0));
1986        assert_eq!(patches.search_index(200).unwrap(), SearchResult::Found(1));
1987
1988        // chunks 1, 2: no patches
1989        assert_eq!(
1990            patches.search_index(1500).unwrap(),
1991            SearchResult::NotFound(2)
1992        );
1993        assert_eq!(
1994            patches.search_index(2000).unwrap(),
1995            SearchResult::NotFound(2)
1996        );
1997
1998        // chunk 3: patches at 3000, 3100
1999        assert_eq!(patches.search_index(3000).unwrap(), SearchResult::Found(2));
2000        assert_eq!(patches.search_index(3100).unwrap(), SearchResult::Found(3));
2001
2002        assert_eq!(
2003            patches.search_index(1024).unwrap(),
2004            SearchResult::NotFound(2)
2005        );
2006    }
2007
2008    #[test]
2009    fn test_chunk_offsets_with_slice() {
2010        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2011        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2012        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2013        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2014
2015        let sliced = patches.slice(1000..2200).unwrap().unwrap();
2016
2017        assert!(sliced.chunk_offsets.is_some());
2018        assert_eq!(sliced.offset(), 1000);
2019
2020        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(0));
2021        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2022        assert_eq!(sliced.search_index(1100).unwrap(), SearchResult::Found(4));
2023
2024        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(1));
2025    }
2026
2027    #[test]
2028    fn test_chunk_offsets_with_slice_after_first_chunk() {
2029        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2030        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2031        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2032        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2033
2034        let sliced = patches.slice(1300..2200).unwrap().unwrap();
2035
2036        assert!(sliced.chunk_offsets.is_some());
2037        assert_eq!(sliced.offset(), 1300);
2038
2039        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2040        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(1));
2041        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2042        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(2));
2043        assert_eq!(sliced.search_index(900).unwrap(), SearchResult::NotFound(4));
2044    }
2045
2046    #[test]
2047    fn test_chunk_offsets_slice_empty_result() {
2048        let indices = buffer![100u64, 200, 3000, 3100].into_array();
2049        let values = buffer![10i32, 20, 30, 40].into_array();
2050        let chunk_offsets = buffer![0u64, 2].into_array();
2051        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets)).unwrap();
2052
2053        let sliced = patches.slice(1000..2000).unwrap();
2054        assert!(sliced.is_none());
2055    }
2056
2057    #[test]
2058    fn test_chunk_offsets_slice_single_patch() {
2059        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
2060        let values = buffer![10i32, 20, 30, 40].into_array();
2061        let chunk_offsets = buffer![0u64, 1, 3].into_array();
2062        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2063
2064        let sliced = patches.slice(1100..1250).unwrap().unwrap();
2065
2066        assert_eq!(sliced.num_patches(), 1);
2067        assert_eq!(sliced.offset(), 1100);
2068        assert_eq!(sliced.search_index(100).unwrap(), SearchResult::Found(0)); // 1200 - 1100 = 100
2069        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::NotFound(0));
2070        assert_eq!(sliced.search_index(150).unwrap(), SearchResult::NotFound(1));
2071    }
2072
2073    #[test]
2074    fn test_chunk_offsets_slice_across_chunks() {
2075        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
2076        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2077        let chunk_offsets = buffer![0u64, 2, 4].into_array();
2078        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2079
2080        let sliced = patches.slice(150..2150).unwrap().unwrap();
2081
2082        assert_eq!(sliced.num_patches(), 4);
2083        assert_eq!(sliced.offset(), 150);
2084
2085        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::Found(0)); // 200 - 150 = 50
2086        assert_eq!(sliced.search_index(950).unwrap(), SearchResult::Found(1)); // 1100 - 150 = 950
2087        assert_eq!(sliced.search_index(1050).unwrap(), SearchResult::Found(2)); // 1200 - 150 = 1050
2088        assert_eq!(sliced.search_index(1950).unwrap(), SearchResult::Found(3)); // 2100 - 150 = 1950
2089    }
2090
2091    #[test]
2092    fn test_chunk_offsets_boundary_searches() {
2093        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
2094        let values = buffer![10i32, 20, 30, 40, 50].into_array();
2095        let chunk_offsets = buffer![0u64, 1, 4].into_array();
2096        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2097
2098        assert_eq!(patches.search_index(1023).unwrap(), SearchResult::Found(0));
2099        assert_eq!(patches.search_index(1024).unwrap(), SearchResult::Found(1));
2100        assert_eq!(patches.search_index(1025).unwrap(), SearchResult::Found(2));
2101        assert_eq!(patches.search_index(2047).unwrap(), SearchResult::Found(3));
2102        assert_eq!(patches.search_index(2048).unwrap(), SearchResult::Found(4));
2103
2104        assert_eq!(
2105            patches.search_index(1022).unwrap(),
2106            SearchResult::NotFound(0)
2107        );
2108        assert_eq!(
2109            patches.search_index(2046).unwrap(),
2110            SearchResult::NotFound(3)
2111        );
2112    }
2113
2114    #[test]
2115    fn test_chunk_offsets_slice_edge_cases() {
2116        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
2117        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2118        let chunk_offsets = buffer![0u64, 3, 5].into_array();
2119        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2120
2121        // Slice at the very beginning
2122        let sliced = patches.slice(0..10).unwrap().unwrap();
2123        assert_eq!(sliced.num_patches(), 2);
2124        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2125        assert_eq!(sliced.search_index(1).unwrap(), SearchResult::Found(1));
2126
2127        // Slice at the very end
2128        let sliced = patches.slice(2040..3000).unwrap().unwrap();
2129        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
2130        assert_eq!(sliced.search_index(7).unwrap(), SearchResult::Found(0)); // 2047 - 2040
2131        assert_eq!(sliced.search_index(8).unwrap(), SearchResult::Found(1)); // 2048 - 2040
2132    }
2133
2134    #[test]
2135    fn test_chunk_offsets_slice_nested() {
2136        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
2137        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2138        let chunk_offsets = buffer![0u64].into_array();
2139        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets)).unwrap();
2140
2141        let sliced1 = patches.slice(150..550).unwrap().unwrap();
2142        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
2143
2144        let sliced2 = sliced1.slice(100..250).unwrap().unwrap();
2145        assert_eq!(sliced2.num_patches(), 1); // 300
2146        assert_eq!(sliced2.offset(), 250);
2147
2148        assert_eq!(sliced2.search_index(50).unwrap(), SearchResult::Found(0)); // 300 - 250
2149        assert_eq!(
2150            sliced2.search_index(150).unwrap(),
2151            SearchResult::NotFound(1)
2152        );
2153    }
2154
2155    #[test]
2156    fn test_nested_slice_with_dropped_first_chunk() {
2157        // PATCH_CHUNK_SIZE = 1024, so the two patches land in different chunks.
2158        let indices = buffer![0u64, 1024].into_array();
2159        let values = buffer![1i32, 2].into_array();
2160        let chunk_offsets = buffer![0u64, 1].into_array();
2161        let patches = Patches::new(2048, 0, indices, values, Some(chunk_offsets)).unwrap();
2162
2163        // Drop chunk 0, then re-slice the result.
2164        let dropped_first = patches.slice(1024..2048).unwrap().unwrap();
2165        let resliced = dropped_first.slice(0..1024).unwrap().unwrap();
2166        assert_eq!(resliced.num_patches(), 1);
2167    }
2168
2169    #[test]
2170    fn test_index_larger_than_length() {
2171        let chunk_offsets = buffer![0u64].into_array();
2172        let indices = buffer![1023u64].into_array();
2173        let values = buffer![42i32].into_array();
2174        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets)).unwrap();
2175        assert_eq!(
2176            patches.search_index(2048).unwrap(),
2177            SearchResult::NotFound(1)
2178        );
2179    }
2180}