Skip to main content

vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use itertools::Itertools;
10use num_traits::NumCast;
11use vortex_buffer::BitBuffer;
12use vortex_buffer::BufferMut;
13use vortex_error::VortexError;
14use vortex_error::VortexExpect;
15use vortex_error::VortexResult;
16use vortex_error::vortex_bail;
17use vortex_error::vortex_ensure;
18use vortex_error::vortex_err;
19use vortex_mask::AllOr;
20use vortex_mask::Mask;
21use vortex_mask::MaskMut;
22use vortex_utils::aliases::hash_map::HashMap;
23
24use crate::ArrayRef;
25use crate::ArrayVisitor;
26use crate::DynArray;
27use crate::ExecutionCtx;
28use crate::IntoArray;
29use crate::ToCanonical;
30use crate::arrays::BoolArray;
31use crate::arrays::PrimitiveArray;
32use crate::builtins::ArrayBuiltins;
33use crate::dtype::DType;
34use crate::dtype::IntegerPType;
35use crate::dtype::NativePType;
36use crate::dtype::Nullability;
37use crate::dtype::Nullability::NonNullable;
38use crate::dtype::PType;
39use crate::dtype::UnsignedPType;
40use crate::match_each_integer_ptype;
41use crate::match_each_unsigned_integer_ptype;
42use crate::scalar::PValue;
43use crate::scalar::Scalar;
44use crate::search_sorted::SearchResult;
45use crate::search_sorted::SearchSorted;
46use crate::search_sorted::SearchSortedSide;
47use crate::validity::Validity;
48use crate::vtable::ValidityHelper;
49
50/// One patch index offset is stored for each chunk.
51/// This allows for constant time patch index lookups.
52pub const PATCH_CHUNK_SIZE: usize = 1024;
53
54#[derive(Copy, Clone, prost::Message)]
55pub struct PatchesMetadata {
56    #[prost(uint64, tag = "1")]
57    len: u64,
58    #[prost(uint64, tag = "2")]
59    offset: u64,
60    #[prost(enumeration = "PType", tag = "3")]
61    indices_ptype: i32,
62    #[prost(uint64, optional, tag = "4")]
63    chunk_offsets_len: Option<u64>,
64    #[prost(enumeration = "PType", optional, tag = "5")]
65    chunk_offsets_ptype: Option<i32>,
66    #[prost(uint64, optional, tag = "6")]
67    offset_within_chunk: Option<u64>,
68}
69
70impl PatchesMetadata {
71    #[inline]
72    pub fn new(
73        len: usize,
74        offset: usize,
75        indices_ptype: PType,
76        chunk_offsets_len: Option<usize>,
77        chunk_offsets_ptype: Option<PType>,
78        offset_within_chunk: Option<usize>,
79    ) -> Self {
80        Self {
81            len: len as u64,
82            offset: offset as u64,
83            indices_ptype: indices_ptype as i32,
84            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
85            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
86            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
87        }
88    }
89
90    #[inline]
91    pub fn len(&self) -> VortexResult<usize> {
92        usize::try_from(self.len).map_err(|_| vortex_err!("len does not fit in usize"))
93    }
94
95    #[inline]
96    pub fn is_empty(&self) -> bool {
97        self.len == 0
98    }
99
100    #[inline]
101    pub fn offset(&self) -> VortexResult<usize> {
102        usize::try_from(self.offset).map_err(|_| vortex_err!("offset does not fit in usize"))
103    }
104
105    #[inline]
106    pub fn chunk_offsets_dtype(&self) -> VortexResult<Option<DType>> {
107        self.chunk_offsets_ptype
108            .map(|t| {
109                PType::try_from(t)
110                    .map_err(|e| vortex_err!("invalid i32 value {t} for PType: {}", e))
111                    .map(|ptype| DType::Primitive(ptype, NonNullable))
112            })
113            .transpose()
114    }
115
116    #[inline]
117    pub fn indices_dtype(&self) -> VortexResult<DType> {
118        let ptype = PType::try_from(self.indices_ptype).map_err(|e| {
119            vortex_err!("invalid i32 value {} for PType: {}", self.indices_ptype, e)
120        })?;
121        vortex_ensure!(
122            ptype.is_unsigned_int(),
123            "Patch indices must be unsigned integers"
124        );
125        Ok(DType::Primitive(ptype, NonNullable))
126    }
127}
128
129/// A helper for working with patched arrays.
130#[derive(Debug, Clone)]
131pub struct Patches {
132    array_len: usize,
133    offset: usize,
134    indices: ArrayRef,
135    values: ArrayRef,
136    /// Stores the patch index offset for each chunk.
137    ///
138    /// This allows us to lookup the patches for a given chunk in constant time via
139    /// `patch_indices[chunk_offsets[i]..chunk_offsets[i+1]]`.
140    ///
141    /// This is optional for compatibility reasons.
142    chunk_offsets: Option<ArrayRef>,
143    /// Chunk offsets are only sliced off in case the slice is fully
144    /// outside of the chunk range.
145    ///
146    /// Though the range for indices and values is sliced in terms of
147    /// individual elements, not chunks. To account for that we do a
148    /// saturating sub when adjusting the indices based on the chunk offset.
149    ///
150    /// `offset_within_chunk` is necessary in order to keep track of how many
151    /// elements were sliced off within the chunk.
152    offset_within_chunk: Option<usize>,
153}
154
155impl Patches {
156    pub fn new(
157        array_len: usize,
158        offset: usize,
159        indices: ArrayRef,
160        values: ArrayRef,
161        chunk_offsets: Option<ArrayRef>,
162    ) -> VortexResult<Self> {
163        vortex_ensure!(
164            indices.len() == values.len(),
165            "Patch indices and values must have the same length"
166        );
167        vortex_ensure!(
168            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
169            "Patch indices must be non-nullable unsigned integers, got {:?}",
170            indices.dtype()
171        );
172
173        vortex_ensure!(
174            indices.len() <= array_len,
175            "Patch indices must be shorter than the array length"
176        );
177        vortex_ensure!(!indices.is_empty(), "Patch indices must not be empty");
178
179        // Perform validation of components when they are host-resident.
180        // This is not possible to do eagerly when the data is on GPU memory.
181        if indices.is_host() && values.is_host() {
182            let max = usize::try_from(&indices.scalar_at(indices.len() - 1)?)
183                .map_err(|_| vortex_err!("indices must be a number"))?;
184            vortex_ensure!(
185                max - offset < array_len,
186                "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
187            );
188
189            #[cfg(debug_assertions)]
190            {
191                use crate::VortexSessionExecute;
192                let mut ctx = crate::LEGACY_SESSION.create_execution_ctx();
193                assert!(
194                    crate::aggregate_fn::fns::is_sorted::is_sorted(&indices, &mut ctx)
195                        .unwrap_or(false),
196                    "Patch indices must be sorted"
197                );
198            }
199        }
200
201        Ok(Self {
202            array_len,
203            offset,
204            indices,
205            values,
206            chunk_offsets: chunk_offsets.clone(),
207            // Initialize with `Some(0)` only if `chunk_offsets` are set.
208            offset_within_chunk: chunk_offsets.map(|_| 0),
209        })
210    }
211
212    /// Construct new patches without validating any of the arguments
213    ///
214    /// # Safety
215    ///
216    /// Users have to assert that
217    /// * Indices and values have the same length
218    /// * Indices is an unsigned integer type
219    /// * Indices must be sorted
220    /// * Last value in indices is smaller than array_len
221    pub unsafe fn new_unchecked(
222        array_len: usize,
223        offset: usize,
224        indices: ArrayRef,
225        values: ArrayRef,
226        chunk_offsets: Option<ArrayRef>,
227        offset_within_chunk: Option<usize>,
228    ) -> Self {
229        Self {
230            array_len,
231            offset,
232            indices,
233            values,
234            chunk_offsets,
235            offset_within_chunk,
236        }
237    }
238
239    #[inline]
240    pub fn array_len(&self) -> usize {
241        self.array_len
242    }
243
244    #[inline]
245    pub fn num_patches(&self) -> usize {
246        self.indices.len()
247    }
248
249    #[inline]
250    pub fn dtype(&self) -> &DType {
251        self.values.dtype()
252    }
253
254    #[inline]
255    pub fn indices(&self) -> &ArrayRef {
256        &self.indices
257    }
258
259    #[inline]
260    pub fn into_indices(self) -> ArrayRef {
261        self.indices
262    }
263
264    #[inline]
265    pub fn indices_mut(&mut self) -> &mut ArrayRef {
266        &mut self.indices
267    }
268
269    #[inline]
270    pub fn values(&self) -> &ArrayRef {
271        &self.values
272    }
273
274    #[inline]
275    pub fn into_values(self) -> ArrayRef {
276        self.values
277    }
278
279    #[inline]
280    pub fn values_mut(&mut self) -> &mut ArrayRef {
281        &mut self.values
282    }
283
284    #[inline]
285    // Absolute offset: 0 if the array is unsliced.
286    pub fn offset(&self) -> usize {
287        self.offset
288    }
289
290    #[inline]
291    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
292        &self.chunk_offsets
293    }
294
295    #[inline]
296    pub fn chunk_offset_at(&self, idx: usize) -> VortexResult<usize> {
297        let Some(chunk_offsets) = &self.chunk_offsets else {
298            vortex_bail!("chunk_offsets must be set to retrieve offset at index")
299        };
300
301        chunk_offsets
302            .scalar_at(idx)?
303            .as_primitive()
304            .as_::<usize>()
305            .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))
306    }
307
308    /// Returns the number of patches sliced off from the current first chunk.
309    ///
310    /// When patches are sliced, the chunk offsets array is also sliced to only include
311    /// chunks that overlap with the slice range. However, the slice boundary may fall
312    /// in the middle of a chunk's patch range. This offset indicates how many patches
313    /// at the start of the first chunk should be skipped.
314    ///
315    /// Returns `None` if chunk offsets are not set.
316    #[inline]
317    pub fn offset_within_chunk(&self) -> Option<usize> {
318        self.offset_within_chunk
319    }
320
321    #[inline]
322    pub fn indices_ptype(&self) -> VortexResult<PType> {
323        PType::try_from(self.indices.dtype())
324            .map_err(|_| vortex_err!("indices dtype is not primitive"))
325    }
326
327    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
328        if self.indices.len() > len {
329            vortex_bail!(
330                "Patch indices {} are longer than the array length {}",
331                self.indices.len(),
332                len
333            );
334        }
335        if self.values.dtype() != dtype {
336            vortex_bail!(
337                "Patch values dtype {} does not match array dtype {}",
338                self.values.dtype(),
339                dtype
340            );
341        }
342        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
343        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
344
345        Ok(PatchesMetadata::new(
346            self.indices.len(),
347            self.offset,
348            self.indices.dtype().as_ptype(),
349            chunk_offsets_len,
350            chunk_offsets_ptype,
351            self.offset_within_chunk,
352        ))
353    }
354
355    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
356        // SAFETY: casting does not affect the relationship between the indices and values
357        unsafe {
358            Ok(Self::new_unchecked(
359                self.array_len,
360                self.offset,
361                self.indices,
362                self.values.cast(values_dtype.clone())?,
363                self.chunk_offsets,
364                self.offset_within_chunk,
365            ))
366        }
367    }
368
369    /// Get the patched value at a given index if it exists.
370    pub fn get_patched(&self, index: usize) -> VortexResult<Option<Scalar>> {
371        self.search_index(index)?
372            .to_found()
373            .map(|patch_idx| self.values().scalar_at(patch_idx))
374            .transpose()
375    }
376
377    /// Searches for `index` in the indices array.
378    ///
379    /// Chooses between chunked search when [`Self::chunk_offsets`] is
380    /// available, and binary search otherwise. The `index` parameter is
381    /// adjusted by [`Self::offset`] for both.
382    ///
383    /// # Arguments
384    /// * `index` - The index to search for
385    ///
386    /// # Returns
387    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
388    ///   position in the patches array
389    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
390    ///   a patch at this index would be inserted to maintain sorted order
391    ///
392    /// [`SearchResult::Found(patch_idx)`]: SearchResult::Found
393    /// [`SearchResult::NotFound(insertion_point)`]: SearchResult::NotFound
394    pub fn search_index(&self, index: usize) -> VortexResult<SearchResult> {
395        if self.chunk_offsets.is_some() {
396            return self.search_index_chunked(index);
397        }
398
399        Self::search_index_binary_search(&self.indices, index + self.offset)
400    }
401
402    /// Binary searches for `needle` in the indices array.
403    ///
404    /// # Returns
405    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
406    /// with the insertion point if not found.
407    fn search_index_binary_search(indices: &ArrayRef, needle: usize) -> VortexResult<SearchResult> {
408        if indices.is_canonical() {
409            let primitive = indices.to_primitive();
410            match_each_integer_ptype!(primitive.ptype(), |T| {
411                let Ok(needle) = T::try_from(needle) else {
412                    // If the needle is not of type T, then it cannot possibly be in this array.
413                    //
414                    // The needle is a non-negative integer (a usize); therefore, it must be larger
415                    // than all values in this array.
416                    return Ok(SearchResult::NotFound(primitive.len()));
417                };
418                return primitive
419                    .as_slice::<T>()
420                    .search_sorted(&needle, SearchSortedSide::Left);
421            });
422        }
423        indices
424            .as_primitive_typed()
425            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
426    }
427
428    /// Constant time searches for `index` in the indices array.
429    ///
430    /// First determines which chunk the target index falls into, then performs
431    /// a binary search within that chunk's range.
432    ///
433    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
434    /// or the insertion point if not found.
435    ///
436    /// Returns an error if `chunk_offsets` or `offset_within_chunk` are not set.
437    fn search_index_chunked(&self, index: usize) -> VortexResult<SearchResult> {
438        let Some(chunk_offsets) = &self.chunk_offsets else {
439            vortex_bail!("chunk_offsets is required to be set")
440        };
441
442        let Some(offset_within_chunk) = self.offset_within_chunk else {
443            vortex_bail!("offset_within_chunk is required to be set")
444        };
445
446        if index >= self.array_len() {
447            return Ok(SearchResult::NotFound(self.indices().len()));
448        }
449
450        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
451
452        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
453        let base_offset = self.chunk_offset_at(0)?;
454
455        let patches_start_idx = (self.chunk_offset_at(chunk_idx)? - base_offset)
456            // Chunk offsets are only sliced off in case the slice is fully
457            // outside of the chunk range.
458            //
459            // Though the range for indices and values is sliced in terms of
460            // individual elements, not chunks. To account for that we do a
461            // saturating sub when adjusting the indices based on the chunk offset.
462            .saturating_sub(offset_within_chunk);
463
464        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
465            self.chunk_offset_at(chunk_idx + 1)? - base_offset - offset_within_chunk
466        } else {
467            self.indices.len()
468        };
469
470        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx)?;
471        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset)?;
472
473        Ok(match result {
474            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
475            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
476        })
477    }
478
479    /// Batch version of `search_index`.
480    ///
481    /// In contrast to `search_index`, this function requires `indices` as
482    /// well as `chunk_offsets` to be passed as slices. This is to avoid
483    /// redundant canonicalization and `scalar_at` lookups across calls.
484    fn search_index_chunked_batch<T, O>(
485        &self,
486        indices: &[T],
487        chunk_offsets: &[O],
488        index: T,
489    ) -> VortexResult<SearchResult>
490    where
491        T: UnsignedPType,
492        O: UnsignedPType,
493        usize: TryFrom<T>,
494        usize: TryFrom<O>,
495    {
496        let Some(offset_within_chunk) = self.offset_within_chunk else {
497            vortex_bail!("offset_within_chunk is required to be set")
498        };
499
500        let chunk_idx = {
501            let Ok(index) = usize::try_from(index) else {
502                // If the needle cannot be converted to usize, it's larger than all values in this array.
503                return Ok(SearchResult::NotFound(indices.len()));
504            };
505
506            if index >= self.array_len() {
507                return Ok(SearchResult::NotFound(self.indices().len()));
508            }
509
510            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
511        };
512
513        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
514        let chunk_offset = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0])
515            .map_err(|_| vortex_err!("chunk_offset failed to convert to usize"))?;
516
517        let patches_start_idx = chunk_offset
518            // Chunk offsets are only sliced off in case the slice is fully
519            // outside of the chunk range.
520            //
521            // Though the range for indices and values is sliced in terms of
522            // individual elements, not chunks. To account for that we do a
523            // saturating sub when adjusting the indices based on the chunk offset.
524            .saturating_sub(offset_within_chunk);
525
526        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
527            let base_offset_end = chunk_offsets[chunk_idx + 1];
528
529            let offset_within_chunk = O::from(offset_within_chunk)
530                .ok_or_else(|| vortex_err!("offset_within_chunk failed to convert to O"))?;
531
532            usize::try_from(base_offset_end - chunk_offsets[0] - offset_within_chunk)
533                .map_err(|_| vortex_err!("patches_end_idx failed to convert to usize"))?
534        } else {
535            self.indices.len()
536        };
537
538        let Some(offset) = T::from(self.offset) else {
539            // If the offset cannot be converted to T, it's larger than all values in this array.
540            return Ok(SearchResult::NotFound(indices.len()));
541        };
542
543        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
544        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left)?;
545
546        Ok(match result {
547            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
548            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
549        })
550    }
551
552    /// Returns the minimum patch index
553    pub fn min_index(&self) -> VortexResult<usize> {
554        let first = self
555            .indices
556            .scalar_at(0)?
557            .as_primitive()
558            .as_::<usize>()
559            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
560        Ok(first - self.offset)
561    }
562
563    /// Returns the maximum patch index
564    pub fn max_index(&self) -> VortexResult<usize> {
565        let last = self
566            .indices
567            .scalar_at(self.indices.len() - 1)?
568            .as_primitive()
569            .as_::<usize>()
570            .ok_or_else(|| vortex_err!("index does not fit in usize"))?;
571        Ok(last - self.offset)
572    }
573
574    /// Filter the patches by a mask, resulting in new patches for the filtered array.
575    pub fn filter(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
576        if mask.len() != self.array_len {
577            vortex_bail!(
578                "Filter mask length {} does not match array length {}",
579                mask.len(),
580                self.array_len
581            );
582        }
583
584        match mask.indices() {
585            AllOr::All => Ok(Some(self.clone())),
586            AllOr::None => Ok(None),
587            AllOr::Some(mask_indices) => {
588                let flat_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
589                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
590                    filter_patches_with_mask(
591                        flat_indices.as_slice::<I>(),
592                        self.offset(),
593                        self.values(),
594                        mask_indices,
595                    )
596                })
597            }
598        }
599    }
600
601    /// Mask the patches, REMOVING the patches where the mask is true.
602    /// Unlike filter, this preserves the patch indices.
603    /// Unlike mask on a single array, this does not set masked values to null.
604    // TODO(joe): make this lazy and remove the ctx.
605    pub fn mask(&self, mask: &Mask, ctx: &mut ExecutionCtx) -> VortexResult<Option<Self>> {
606        if mask.len() != self.array_len {
607            vortex_bail!(
608                "Filter mask length {} does not match array length {}",
609                mask.len(),
610                self.array_len
611            );
612        }
613
614        let filter_mask = match mask.bit_buffer() {
615            AllOr::All => return Ok(None),
616            AllOr::None => return Ok(Some(self.clone())),
617            AllOr::Some(masked) => {
618                let patch_indices = self.indices().clone().execute::<PrimitiveArray>(ctx)?;
619                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
620                    let patch_indices = patch_indices.as_slice::<P>();
621                    Mask::from_buffer(BitBuffer::collect_bool(patch_indices.len(), |i| {
622                        #[allow(clippy::cast_possible_truncation)]
623                        let idx = (patch_indices[i] as usize) - self.offset;
624                        !masked.value(idx)
625                    }))
626                })
627            }
628        };
629
630        if filter_mask.all_false() {
631            return Ok(None);
632        }
633
634        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
635        let filtered_indices = self.indices.filter(filter_mask.clone())?;
636        let filtered_values = self.values.filter(filter_mask)?;
637
638        Ok(Some(Self {
639            array_len: self.array_len,
640            offset: self.offset,
641            indices: filtered_indices,
642            values: filtered_values,
643            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
644            chunk_offsets: None,
645            offset_within_chunk: self.offset_within_chunk,
646        }))
647    }
648
649    /// Slice the patches by a range of the patched array.
650    pub fn slice(&self, range: Range<usize>) -> VortexResult<Option<Self>> {
651        let slice_start_idx = self.search_index(range.start)?.to_index();
652        let slice_end_idx = self.search_index(range.end)?.to_index();
653
654        if slice_start_idx == slice_end_idx {
655            return Ok(None);
656        }
657
658        let values = self.values().slice(slice_start_idx..slice_end_idx)?;
659        let indices = self.indices().slice(slice_start_idx..slice_end_idx)?;
660
661        let chunk_offsets = self
662            .chunk_offsets
663            .as_ref()
664            .map(|chunk_offsets| -> VortexResult<ArrayRef> {
665                let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
666                let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
667                let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
668                chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
669            })
670            .transpose()?;
671
672        let offset_within_chunk = chunk_offsets
673            .as_ref()
674            .map(|chunk_offsets| -> VortexResult<usize> {
675                let base_offset = chunk_offsets
676                    .scalar_at(0)?
677                    .as_primitive()
678                    .as_::<usize>()
679                    .ok_or_else(|| vortex_err!("chunk offset does not fit in usize"))?;
680                Ok(slice_start_idx - base_offset)
681            })
682            .transpose()?;
683
684        Ok(Some(Self {
685            array_len: range.len(),
686            offset: range.start + self.offset(),
687            indices,
688            values,
689            chunk_offsets,
690            offset_within_chunk,
691        }))
692    }
693
694    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
695    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
696
697    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
698        (self.num_patches() as f64 / take_indices.len() as f64)
699            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
700    }
701
702    /// Take the indices from the patches
703    ///
704    /// Any nulls in take_indices are added to the resulting patches.
705    pub fn take_with_nulls(
706        &self,
707        take_indices: &ArrayRef,
708        ctx: &mut ExecutionCtx,
709    ) -> VortexResult<Option<Self>> {
710        if take_indices.is_empty() {
711            return Ok(None);
712        }
713
714        let take_indices = take_indices.to_array().execute::<PrimitiveArray>(ctx)?;
715        if self.is_map_faster_than_search(&take_indices) {
716            self.take_map(take_indices, true, ctx)
717        } else {
718            self.take_search(take_indices, true, ctx)
719        }
720    }
721
722    /// Take the indices from the patches.
723    ///
724    /// Any nulls in take_indices are ignored.
725    pub fn take(
726        &self,
727        take_indices: &ArrayRef,
728        ctx: &mut ExecutionCtx,
729    ) -> VortexResult<Option<Self>> {
730        if take_indices.is_empty() {
731            return Ok(None);
732        }
733
734        let take_indices = take_indices.to_array().execute::<PrimitiveArray>(ctx)?;
735        if self.is_map_faster_than_search(&take_indices) {
736            self.take_map(take_indices, false, ctx)
737        } else {
738            self.take_search(take_indices, false, ctx)
739        }
740    }
741
742    #[expect(
743        clippy::cognitive_complexity,
744        reason = "complexity is from nested match_each_* macros"
745    )]
746    pub fn take_search(
747        &self,
748        take_indices: PrimitiveArray,
749        include_nulls: bool,
750        ctx: &mut ExecutionCtx,
751    ) -> VortexResult<Option<Self>> {
752        let take_indices_validity = take_indices.validity();
753        let patch_indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
754        let chunk_offsets = self
755            .chunk_offsets()
756            .as_ref()
757            .map(|co| co.clone().execute::<PrimitiveArray>(ctx))
758            .transpose()?;
759
760        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
761            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
762                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
763                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
764                    let take_slice = take_indices.as_slice::<TakeT>();
765
766                    if let Some(chunk_offsets) = chunk_offsets {
767                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
768                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
769                            take_indices_with_search_fn(
770                                patch_indices_slice,
771                                take_slice,
772                                take_indices.validity_mask()?,
773                                include_nulls,
774                                |take_idx| {
775                                    self.search_index_chunked_batch(
776                                        patch_indices_slice,
777                                        chunk_offsets,
778                                        take_idx,
779                                    )
780                                },
781                            )?
782                        })
783                    } else {
784                        take_indices_with_search_fn(
785                            patch_indices_slice,
786                            take_slice,
787                            take_indices.validity_mask()?,
788                            include_nulls,
789                            |take_idx| {
790                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
791                                    // If the offset cannot be converted to T, it's larger than all values in this array.
792                                    return Ok(SearchResult::NotFound(patch_indices_slice.len()));
793                                };
794
795                                patch_indices_slice
796                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
797                            },
798                        )?
799                    }
800                })
801            });
802
803        if new_indices.is_empty() {
804            return Ok(None);
805        }
806
807        let new_indices = new_indices.into_array();
808        let new_array_len = take_indices.len();
809        let values_validity = take_indices_validity.take(&new_indices)?;
810
811        Ok(Some(Self {
812            array_len: new_array_len,
813            offset: 0,
814            indices: new_indices.clone(),
815            values: self
816                .values()
817                .take(PrimitiveArray::new(values_indices, values_validity).into_array())?,
818            chunk_offsets: None,
819            offset_within_chunk: Some(0), // Reset when creating new Patches.
820        }))
821    }
822
823    pub fn take_map(
824        &self,
825        take_indices: PrimitiveArray,
826        include_nulls: bool,
827        ctx: &mut ExecutionCtx,
828    ) -> VortexResult<Option<Self>> {
829        let indices = self.indices.clone().execute::<PrimitiveArray>(ctx)?;
830        let new_length = take_indices.len();
831
832        let min_index = self.min_index()?;
833        let max_index = self.max_index()?;
834
835        let Some((new_sparse_indices, value_indices)) =
836            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
837                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
838                    let take_validity = take_indices
839                        .validity()
840                        .execute_mask(take_indices.len(), ctx)?;
841                    let take_nullability = take_indices.validity().nullability();
842                    let take_slice = take_indices.as_slice::<TakeIndices>();
843                    take_map::<_, TakeIndices>(
844                        indices.as_slice::<Indices>(),
845                        take_slice,
846                        take_validity,
847                        take_nullability,
848                        self.offset(),
849                        min_index,
850                        max_index,
851                        include_nulls,
852                    )?
853                })
854            })
855        else {
856            return Ok(None);
857        };
858
859        let taken_values = self.values().take(value_indices)?;
860
861        Ok(Some(Patches {
862            array_len: new_length,
863            offset: 0,
864            indices: new_sparse_indices,
865            values: taken_values,
866            // TODO(0ax1): Chunk offsets are invalid after take is applied.
867            chunk_offsets: None,
868            offset_within_chunk: self.offset_within_chunk,
869        }))
870    }
871
872    /// Apply patches to a mutable buffer and validity mask.
873    ///
874    /// This method applies the patch values to the given buffer at the positions specified by the
875    /// patch indices. For non-null patch values, it updates the buffer and marks the position as
876    /// valid. For null patch values, it marks the position as invalid.
877    ///
878    /// # Safety
879    ///
880    /// - All patch indices after offset adjustment must be valid indices into the buffer.
881    /// - The buffer and validity mask must have the same length.
882    pub unsafe fn apply_to_buffer<P: NativePType>(
883        &self,
884        buffer: &mut [P],
885        validity: &mut MaskMut,
886        ctx: &mut ExecutionCtx,
887    ) {
888        let patch_indices = self
889            .indices
890            .clone()
891            .execute::<PrimitiveArray>(ctx)
892            .vortex_expect("patch indices must be convertible to PrimitiveArray");
893        let patch_values = self
894            .values
895            .clone()
896            .execute::<PrimitiveArray>(ctx)
897            .vortex_expect("patch values must be convertible to PrimitiveArray");
898        let patches_validity = patch_values.validity();
899
900        let patch_values_slice = patch_values.as_slice::<P>();
901        match_each_unsigned_integer_ptype!(patch_indices.ptype(), |I| {
902            let patch_indices_slice = patch_indices.as_slice::<I>();
903
904            // SAFETY:
905            // - `Patches` invariant guarantees indices are sorted and within array bounds.
906            // - `patch_indices` and `patch_values` have equal length (from `Patches` invariant).
907            // - `buffer` and `validity` have equal length (precondition).
908            // - All patch indices are valid after offset adjustment (precondition).
909            unsafe {
910                apply_patches_to_buffer_inner(
911                    buffer,
912                    validity,
913                    patch_indices_slice,
914                    self.offset,
915                    patch_values_slice,
916                    patches_validity,
917                    ctx,
918                );
919            }
920        });
921    }
922
923    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
924    where
925        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
926    {
927        let values = f(self.values)?;
928        if self.indices.len() != values.len() {
929            vortex_bail!(
930                "map_values must preserve length: expected {} received {}",
931                self.indices.len(),
932                values.len()
933            )
934        }
935
936        Ok(Self {
937            array_len: self.array_len,
938            offset: self.offset,
939            indices: self.indices,
940            values,
941            chunk_offsets: self.chunk_offsets,
942            offset_within_chunk: self.offset_within_chunk,
943        })
944    }
945}
946
947/// Helper function to apply patches to a buffer.
948///
949/// # Safety
950///
951/// - All indices in `patch_indices` after subtracting `patch_offset` must be valid indices
952///   into both `buffer` and `validity`.
953/// - `patch_indices` must be sorted in ascending order.
954/// - `patch_indices` and `patch_values` must have the same length.
955/// - `buffer` and `validity` must have the same length.
956unsafe fn apply_patches_to_buffer_inner<P, I>(
957    buffer: &mut [P],
958    validity: &mut MaskMut,
959    patch_indices: &[I],
960    patch_offset: usize,
961    patch_values: &[P],
962    patches_validity: &Validity,
963    ctx: &mut ExecutionCtx,
964) where
965    P: NativePType,
966    I: UnsignedPType,
967{
968    debug_assert!(!patch_indices.is_empty());
969    debug_assert_eq!(patch_indices.len(), patch_values.len());
970    debug_assert_eq!(buffer.len(), validity.len());
971
972    match patches_validity {
973        Validity::NonNullable | Validity::AllValid => {
974            // All patch values are valid, apply them all.
975            for (&i, &value) in patch_indices.iter().zip_eq(patch_values) {
976                let index = i.as_() - patch_offset;
977
978                // SAFETY: `index` is valid because caller guarantees all patch indices are within
979                // bounds after offset adjustment.
980                unsafe {
981                    validity.set_unchecked(index);
982                }
983                buffer[index] = value;
984            }
985        }
986        Validity::AllInvalid => {
987            // All patch values are null, just mark positions as invalid.
988            for &i in patch_indices {
989                let index = i.as_() - patch_offset;
990
991                // SAFETY: `index` is valid because caller guarantees all patch indices are within
992                // bounds after offset adjustment.
993                unsafe {
994                    validity.unset_unchecked(index);
995                }
996            }
997        }
998        Validity::Array(array) => {
999            // Some patch values may be null, check each one.
1000            let bool_array = array
1001                .clone()
1002                .execute::<BoolArray>(ctx)
1003                .vortex_expect("validity array must be convertible to BoolArray");
1004            let mask = bool_array.to_bit_buffer();
1005            for (patch_idx, (&i, &value)) in patch_indices.iter().zip_eq(patch_values).enumerate() {
1006                let index = i.as_() - patch_offset;
1007
1008                // SAFETY: `index` and `patch_idx` are valid because caller guarantees all patch
1009                // indices are within bounds after offset adjustment.
1010                unsafe {
1011                    if mask.value_unchecked(patch_idx) {
1012                        buffer[index] = value;
1013                        validity.set_unchecked(index);
1014                    } else {
1015                        validity.unset_unchecked(index);
1016                    }
1017                }
1018            }
1019        }
1020    }
1021}
1022
1023#[allow(clippy::too_many_arguments)] // private function, can clean up one day
1024fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
1025    indices: &[I],
1026    take_indices: &[T],
1027    take_validity: Mask,
1028    take_nullability: Nullability,
1029    indices_offset: usize,
1030    min_index: usize,
1031    max_index: usize,
1032    include_nulls: bool,
1033) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
1034where
1035    usize: TryFrom<T>,
1036    VortexError: From<<I as TryFrom<usize>>::Error>,
1037{
1038    let offset_i = I::try_from(indices_offset)?;
1039
1040    let sparse_index_to_value_index: HashMap<I, usize> = indices
1041        .iter()
1042        .copied()
1043        .map(|idx| idx - offset_i)
1044        .enumerate()
1045        .map(|(value_index, sparse_index)| (sparse_index, value_index))
1046        .collect();
1047
1048    let mut new_sparse_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1049    let mut value_indices = BufferMut::<u64>::with_capacity(take_indices.len());
1050
1051    for (idx_in_take, &take_idx) in take_indices.iter().enumerate() {
1052        let ti = usize::try_from(take_idx)
1053            .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
1054
1055        // If we have to take nulls the take index doesn't matter, make it 0 for consistency
1056        let is_null = match take_validity.bit_buffer() {
1057            AllOr::All => false,
1058            AllOr::None => true,
1059            AllOr::Some(buf) => !buf.value(idx_in_take),
1060        };
1061        if is_null {
1062            if include_nulls {
1063                new_sparse_indices.push(idx_in_take as u64);
1064                value_indices.push(0);
1065            }
1066        } else if ti >= min_index && ti <= max_index {
1067            let ti_as_i = I::try_from(ti)
1068                .map_err(|_| vortex_err!("take index does not fit in index type"))?;
1069            if let Some(&value_index) = sparse_index_to_value_index.get(&ti_as_i) {
1070                new_sparse_indices.push(idx_in_take as u64);
1071                value_indices.push(value_index as u64);
1072            }
1073        }
1074    }
1075
1076    if new_sparse_indices.is_empty() {
1077        return Ok(None);
1078    }
1079
1080    let new_sparse_indices = new_sparse_indices.into_array();
1081    let values_validity =
1082        Validity::from_mask(take_validity, take_nullability).take(&new_sparse_indices)?;
1083    Ok(Some((
1084        new_sparse_indices,
1085        PrimitiveArray::new(value_indices, values_validity).into_array(),
1086    )))
1087}
1088
1089/// Filter patches with the provided mask (in flattened space).
1090///
1091/// The filter mask may contain indices that are non-patched. The return value of this function
1092/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
1093/// patch values.
1094fn filter_patches_with_mask<T: IntegerPType>(
1095    patch_indices: &[T],
1096    offset: usize,
1097    patch_values: &ArrayRef,
1098    mask_indices: &[usize],
1099) -> VortexResult<Option<Patches>> {
1100    let true_count = mask_indices.len();
1101    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
1102    let mut new_mask_indices = Vec::with_capacity(true_count);
1103
1104    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
1105    // the patches are relatively sparse compared to the overall mask, and so many indices in the
1106    // mask will end up being skipped.
1107    const STRIDE: usize = 4;
1108
1109    let mut mask_idx = 0usize;
1110    let mut true_idx = 0usize;
1111
1112    while mask_idx < patch_indices.len() && true_idx < true_count {
1113        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
1114        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
1115        //  the mask (which covers both patch and non-patch values of the parent array), and so to
1116        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
1117        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
1118        //  fallback to performing a two-way iterator merge.
1119        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
1120            // Load a vector of each into our registers.
1121            let left_min = patch_indices[mask_idx]
1122                .to_usize()
1123                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1124                - offset;
1125            let left_max = patch_indices[mask_idx + STRIDE]
1126                .to_usize()
1127                .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1128                - offset;
1129            let right_min = mask_indices[true_idx];
1130            let right_max = mask_indices[true_idx + STRIDE];
1131
1132            if left_min > right_max {
1133                // Advance right side
1134                true_idx += STRIDE;
1135                continue;
1136            } else if right_min > left_max {
1137                mask_idx += STRIDE;
1138                continue;
1139            } else {
1140                // Fallthrough to direct comparison path.
1141            }
1142        }
1143
1144        // Two-way sorted iterator merge:
1145
1146        let left = patch_indices[mask_idx]
1147            .to_usize()
1148            .ok_or_else(|| vortex_err!("patch index does not fit in usize"))?
1149            - offset;
1150        let right = mask_indices[true_idx];
1151
1152        match left.cmp(&right) {
1153            Ordering::Less => {
1154                mask_idx += 1;
1155            }
1156            Ordering::Greater => {
1157                true_idx += 1;
1158            }
1159            Ordering::Equal => {
1160                // Save the mask index as well as the positional index.
1161                new_mask_indices.push(mask_idx);
1162                new_patch_indices.push(true_idx as u64);
1163
1164                mask_idx += 1;
1165                true_idx += 1;
1166            }
1167        }
1168    }
1169
1170    if new_mask_indices.is_empty() {
1171        return Ok(None);
1172    }
1173
1174    let new_patch_indices = new_patch_indices.into_array();
1175    let new_patch_values =
1176        patch_values.filter(Mask::from_indices(patch_values.len(), new_mask_indices))?;
1177
1178    Ok(Some(Patches::new(
1179        true_count,
1180        0,
1181        new_patch_indices,
1182        new_patch_values,
1183        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
1184        None,
1185    )?))
1186}
1187
1188fn take_indices_with_search_fn<
1189    I: UnsignedPType,
1190    T: IntegerPType,
1191    F: Fn(I) -> VortexResult<SearchResult>,
1192>(
1193    indices: &[I],
1194    take_indices: &[T],
1195    take_validity: Mask,
1196    include_nulls: bool,
1197    search_fn: F,
1198) -> VortexResult<(BufferMut<u64>, BufferMut<u64>)> {
1199    let mut values_indices = BufferMut::with_capacity(take_indices.len());
1200    let mut new_indices = BufferMut::with_capacity(take_indices.len());
1201
1202    for (new_patch_idx, &take_idx) in take_indices.iter().enumerate() {
1203        if !take_validity.value(new_patch_idx) {
1204            if include_nulls {
1205                // For nulls, patch index doesn't matter - use 0 for consistency
1206                values_indices.push(0u64);
1207                new_indices.push(new_patch_idx as u64);
1208            }
1209            continue;
1210        } else {
1211            let search_result = match I::from(take_idx) {
1212                Some(idx) => search_fn(idx)?,
1213                None => SearchResult::NotFound(indices.len()),
1214            };
1215
1216            if let Some(patch_idx) = search_result.to_found() {
1217                values_indices.push(patch_idx as u64);
1218                new_indices.push(new_patch_idx as u64);
1219            }
1220        }
1221    }
1222
1223    Ok((values_indices, new_indices))
1224}
1225
1226#[cfg(test)]
1227mod test {
1228    use vortex_buffer::BufferMut;
1229    use vortex_buffer::buffer;
1230    use vortex_mask::Mask;
1231
1232    use crate::IntoArray;
1233    use crate::LEGACY_SESSION;
1234    use crate::ToCanonical;
1235    use crate::VortexSessionExecute;
1236    use crate::assert_arrays_eq;
1237    use crate::patches::Patches;
1238    use crate::patches::PrimitiveArray;
1239    use crate::search_sorted::SearchResult;
1240    use crate::validity::Validity;
1241
1242    #[test]
1243    fn test_filter() {
1244        let patches = Patches::new(
1245            100,
1246            0,
1247            buffer![10u32, 11, 20].into_array(),
1248            buffer![100, 110, 200].into_array(),
1249            None,
1250        )
1251        .unwrap();
1252
1253        let filtered = patches
1254            .filter(
1255                &Mask::from_indices(100, vec![10, 20, 30]),
1256                &mut LEGACY_SESSION.create_execution_ctx(),
1257            )
1258            .unwrap()
1259            .unwrap();
1260
1261        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([0u64, 1]));
1262        assert_arrays_eq!(filtered.values(), PrimitiveArray::from_iter([100i32, 200]));
1263    }
1264
1265    #[test]
1266    fn take_with_nulls() {
1267        let patches = Patches::new(
1268            20,
1269            0,
1270            buffer![2u64, 9, 15].into_array(),
1271            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1272            None,
1273        )
1274        .unwrap();
1275
1276        let taken = patches
1277            .take(
1278                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1279                    .into_array(),
1280                &mut LEGACY_SESSION.create_execution_ctx(),
1281            )
1282            .unwrap()
1283            .unwrap();
1284        let primitive_values = taken.values().to_primitive();
1285        let primitive_indices = taken.indices().to_primitive();
1286        assert_eq!(taken.array_len(), 2);
1287        assert_arrays_eq!(
1288            primitive_values,
1289            PrimitiveArray::from_option_iter([Some(44i32)])
1290        );
1291        assert_arrays_eq!(primitive_indices, PrimitiveArray::from_iter([0u64]));
1292        assert_eq!(
1293            primitive_values.validity_mask().unwrap(),
1294            Mask::from_iter(vec![true])
1295        );
1296    }
1297
1298    #[test]
1299    fn take_search_with_nulls_chunked() {
1300        let patches = Patches::new(
1301            20,
1302            0,
1303            buffer![2u64, 9, 15].into_array(),
1304            buffer![33_i32, 44, 55].into_array(),
1305            Some(buffer![0u64].into_array()),
1306        )
1307        .unwrap();
1308
1309        let taken = patches
1310            .take_search(
1311                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1312                true,
1313                &mut LEGACY_SESSION.create_execution_ctx(),
1314            )
1315            .unwrap()
1316            .unwrap();
1317
1318        let primitive_values = taken.values().to_primitive();
1319        assert_eq!(taken.array_len(), 2);
1320        assert_arrays_eq!(
1321            primitive_values,
1322            PrimitiveArray::from_option_iter([Some(44i32), None])
1323        );
1324        assert_arrays_eq!(taken.indices(), PrimitiveArray::from_iter([0u64, 1]));
1325
1326        assert_eq!(
1327            primitive_values.validity_mask().unwrap(),
1328            Mask::from_iter([true, false])
1329        );
1330    }
1331
1332    #[test]
1333    fn take_search_chunked_multiple_chunks() {
1334        let patches = Patches::new(
1335            2048,
1336            0,
1337            buffer![100u64, 500, 1200, 1800].into_array(),
1338            buffer![10_i32, 20, 30, 40].into_array(),
1339            Some(buffer![0u64, 2].into_array()),
1340        )
1341        .unwrap();
1342
1343        let taken = patches
1344            .take_search(
1345                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1346                true,
1347                &mut LEGACY_SESSION.create_execution_ctx(),
1348            )
1349            .unwrap()
1350            .unwrap();
1351
1352        assert_eq!(taken.array_len(), 3);
1353        assert_arrays_eq!(
1354            taken.values(),
1355            PrimitiveArray::from_option_iter([Some(20i32), Some(30)])
1356        );
1357    }
1358
1359    #[test]
1360    fn take_search_chunked_indices_with_no_patches() {
1361        let patches = Patches::new(
1362            20,
1363            0,
1364            buffer![2u64, 9, 15].into_array(),
1365            buffer![33_i32, 44, 55].into_array(),
1366            Some(buffer![0u64].into_array()),
1367        )
1368        .unwrap();
1369
1370        let taken = patches
1371            .take_search(
1372                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1373                true,
1374                &mut LEGACY_SESSION.create_execution_ctx(),
1375            )
1376            .unwrap();
1377
1378        assert!(taken.is_none());
1379    }
1380
1381    #[test]
1382    fn take_search_chunked_interleaved() {
1383        let patches = Patches::new(
1384            30,
1385            0,
1386            buffer![5u64, 10, 20, 25].into_array(),
1387            buffer![100_i32, 200, 300, 400].into_array(),
1388            Some(buffer![0u64].into_array()),
1389        )
1390        .unwrap();
1391
1392        let taken = patches
1393            .take_search(
1394                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1395                true,
1396                &mut LEGACY_SESSION.create_execution_ctx(),
1397            )
1398            .unwrap()
1399            .unwrap();
1400
1401        assert_eq!(taken.array_len(), 4);
1402        assert_arrays_eq!(
1403            taken.values(),
1404            PrimitiveArray::from_option_iter([Some(200i32), Some(300)])
1405        );
1406    }
1407
1408    #[test]
1409    fn test_take_search_multiple_chunk_offsets() {
1410        let patches = Patches::new(
1411            1500,
1412            0,
1413            BufferMut::from_iter(0..1500u64).into_array(),
1414            BufferMut::from_iter(0..1500i32).into_array(),
1415            Some(buffer![0u64, 1024u64].into_array()),
1416        )
1417        .unwrap();
1418
1419        let taken = patches
1420            .take_search(
1421                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1422                false,
1423                &mut LEGACY_SESSION.create_execution_ctx(),
1424            )
1425            .unwrap()
1426            .unwrap();
1427
1428        assert_eq!(taken.array_len(), 1500);
1429    }
1430
1431    #[test]
1432    fn test_slice() {
1433        let values = buffer![15_u32, 135, 13531, 42].into_array();
1434        let indices = buffer![10_u64, 11, 50, 100].into_array();
1435
1436        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1437
1438        let sliced = patches.slice(15..100).unwrap().unwrap();
1439        assert_eq!(sliced.array_len(), 100 - 15);
1440        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([13531u32]));
1441    }
1442
1443    #[test]
1444    fn doubly_sliced() {
1445        let values = buffer![15_u32, 135, 13531, 42].into_array();
1446        let indices = buffer![10_u64, 11, 50, 100].into_array();
1447
1448        let patches = Patches::new(101, 0, indices, values, None).unwrap();
1449
1450        let sliced = patches.slice(15..100).unwrap().unwrap();
1451        assert_eq!(sliced.array_len(), 100 - 15);
1452        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([13531u32]));
1453
1454        let doubly_sliced = sliced.slice(35..36).unwrap().unwrap();
1455        assert_arrays_eq!(
1456            doubly_sliced.values(),
1457            PrimitiveArray::from_iter([13531u32])
1458        );
1459    }
1460
1461    #[test]
1462    fn test_mask_all_true() {
1463        let patches = Patches::new(
1464            10,
1465            0,
1466            buffer![2u64, 5, 8].into_array(),
1467            buffer![100i32, 200, 300].into_array(),
1468            None,
1469        )
1470        .unwrap();
1471
1472        let mask = Mask::new_true(10);
1473        let masked = patches
1474            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1475            .unwrap();
1476        assert!(masked.is_none());
1477    }
1478
1479    #[test]
1480    fn test_mask_all_false() {
1481        let patches = Patches::new(
1482            10,
1483            0,
1484            buffer![2u64, 5, 8].into_array(),
1485            buffer![100i32, 200, 300].into_array(),
1486            None,
1487        )
1488        .unwrap();
1489
1490        let mask = Mask::new_false(10);
1491        let masked = patches
1492            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1493            .unwrap()
1494            .unwrap();
1495
1496        // No patch values should be masked
1497        assert_arrays_eq!(
1498            masked.values(),
1499            PrimitiveArray::from_iter([100i32, 200, 300])
1500        );
1501        assert!(masked.values().is_valid(0).unwrap());
1502        assert!(masked.values().is_valid(1).unwrap());
1503        assert!(masked.values().is_valid(2).unwrap());
1504
1505        // Indices should remain unchanged
1506        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1507    }
1508
1509    #[test]
1510    fn test_mask_partial() {
1511        let patches = Patches::new(
1512            10,
1513            0,
1514            buffer![2u64, 5, 8].into_array(),
1515            buffer![100i32, 200, 300].into_array(),
1516            None,
1517        )
1518        .unwrap();
1519
1520        // Mask that removes patches at indices 2 and 8 (but not 5)
1521        let mask = Mask::from_iter([
1522            false, false, true, false, false, false, false, false, true, false,
1523        ]);
1524        let masked = patches
1525            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1526            .unwrap()
1527            .unwrap();
1528
1529        // Only the patch at index 5 should remain
1530        assert_eq!(masked.values().len(), 1);
1531        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32]));
1532
1533        // Only index 5 should remain
1534        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([5u64]));
1535    }
1536
1537    #[test]
1538    fn test_mask_with_offset() {
1539        let patches = Patches::new(
1540            10,
1541            5,                                  // offset
1542            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1543            buffer![100i32, 200, 300].into_array(),
1544            None,
1545        )
1546        .unwrap();
1547
1548        // Mask that sets actual index 2 to null
1549        let mask = Mask::from_iter([
1550            false, false, true, false, false, false, false, false, false, false,
1551        ]);
1552
1553        let masked = patches
1554            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1555            .unwrap()
1556            .unwrap();
1557        assert_eq!(masked.array_len(), 10);
1558        assert_eq!(masked.offset(), 5);
1559        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([10u64, 13]));
1560        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32, 300]));
1561    }
1562
1563    #[test]
1564    fn test_mask_nullable_values() {
1565        let patches = Patches::new(
1566            10,
1567            0,
1568            buffer![2u64, 5, 8].into_array(),
1569            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1570            None,
1571        )
1572        .unwrap();
1573
1574        // Test masking removes patch at index 2
1575        let mask = Mask::from_iter([
1576            false, false, true, false, false, false, false, false, false, false,
1577        ]);
1578        let masked = patches
1579            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1580            .unwrap()
1581            .unwrap();
1582
1583        // Patches at indices 5 and 8 should remain
1584        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([5u64, 8]));
1585
1586        // Values should be the null and 300
1587        let masked_values = masked.values().to_primitive();
1588        assert_eq!(masked_values.len(), 2);
1589        assert!(!masked_values.is_valid(0).unwrap()); // the null value at index 5
1590        assert!(masked_values.is_valid(1).unwrap()); // the 300 value at index 8
1591        assert_eq!(
1592            i32::try_from(&masked_values.scalar_at(1).unwrap()).unwrap(),
1593            300i32
1594        );
1595    }
1596
1597    #[test]
1598    fn test_filter_keep_all() {
1599        let patches = Patches::new(
1600            10,
1601            0,
1602            buffer![2u64, 5, 8].into_array(),
1603            buffer![100i32, 200, 300].into_array(),
1604            None,
1605        )
1606        .unwrap();
1607
1608        // Keep all indices (mask with indices 0-9)
1609        let mask = Mask::from_indices(10, (0..10).collect());
1610        let filtered = patches
1611            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1612            .unwrap()
1613            .unwrap();
1614
1615        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1616        assert_arrays_eq!(
1617            filtered.values(),
1618            PrimitiveArray::from_iter([100i32, 200, 300])
1619        );
1620    }
1621
1622    #[test]
1623    fn test_filter_none() {
1624        let patches = Patches::new(
1625            10,
1626            0,
1627            buffer![2u64, 5, 8].into_array(),
1628            buffer![100i32, 200, 300].into_array(),
1629            None,
1630        )
1631        .unwrap();
1632
1633        // Filter out all (empty mask means keep nothing)
1634        let mask = Mask::from_indices(10, vec![]);
1635        let filtered = patches
1636            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1637            .unwrap();
1638        assert!(filtered.is_none());
1639    }
1640
1641    #[test]
1642    fn test_filter_with_indices() {
1643        let patches = Patches::new(
1644            10,
1645            0,
1646            buffer![2u64, 5, 8].into_array(),
1647            buffer![100i32, 200, 300].into_array(),
1648            None,
1649        )
1650        .unwrap();
1651
1652        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1653        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1654        let filtered = patches
1655            .filter(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1656            .unwrap()
1657            .unwrap();
1658
1659        assert_arrays_eq!(filtered.indices(), PrimitiveArray::from_iter([0u64, 1])); // Adjusted indices
1660        assert_arrays_eq!(filtered.values(), PrimitiveArray::from_iter([100i32, 200]));
1661    }
1662
1663    #[test]
1664    fn test_slice_full_range() {
1665        let patches = Patches::new(
1666            10,
1667            0,
1668            buffer![2u64, 5, 8].into_array(),
1669            buffer![100i32, 200, 300].into_array(),
1670            None,
1671        )
1672        .unwrap();
1673
1674        let sliced = patches.slice(0..10).unwrap().unwrap();
1675
1676        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1677        assert_arrays_eq!(
1678            sliced.values(),
1679            PrimitiveArray::from_iter([100i32, 200, 300])
1680        );
1681    }
1682
1683    #[test]
1684    fn test_slice_partial() {
1685        let patches = Patches::new(
1686            10,
1687            0,
1688            buffer![2u64, 5, 8].into_array(),
1689            buffer![100i32, 200, 300].into_array(),
1690            None,
1691        )
1692        .unwrap();
1693
1694        // Slice from 3 to 8 (includes patch at 5)
1695        let sliced = patches.slice(3..8).unwrap().unwrap();
1696
1697        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([5u64])); // Index stays the same
1698        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([200i32]));
1699        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1700        assert_eq!(sliced.offset(), 3); // New offset
1701    }
1702
1703    #[test]
1704    fn test_slice_no_patches() {
1705        let patches = Patches::new(
1706            10,
1707            0,
1708            buffer![2u64, 5, 8].into_array(),
1709            buffer![100i32, 200, 300].into_array(),
1710            None,
1711        )
1712        .unwrap();
1713
1714        // Slice from 6 to 7 (no patches in this range)
1715        let sliced = patches.slice(6..7).unwrap();
1716        assert!(sliced.is_none());
1717    }
1718
1719    #[test]
1720    fn test_slice_with_offset() {
1721        let patches = Patches::new(
1722            10,
1723            5,                                  // offset
1724            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1725            buffer![100i32, 200, 300].into_array(),
1726            None,
1727        )
1728        .unwrap();
1729
1730        // Slice from 3 to 8 (includes patch at actual index 5)
1731        let sliced = patches.slice(3..8).unwrap().unwrap();
1732
1733        assert_arrays_eq!(sliced.indices(), PrimitiveArray::from_iter([10u64])); // Index stays the same (offset + 5 = 10)
1734        assert_arrays_eq!(sliced.values(), PrimitiveArray::from_iter([200i32]));
1735        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1736    }
1737
1738    #[test]
1739    fn test_patch_values() {
1740        let patches = Patches::new(
1741            10,
1742            0,
1743            buffer![2u64, 5, 8].into_array(),
1744            buffer![100i32, 200, 300].into_array(),
1745            None,
1746        )
1747        .unwrap();
1748
1749        let values = patches.values().to_primitive();
1750        assert_eq!(
1751            i32::try_from(&values.scalar_at(0).unwrap()).unwrap(),
1752            100i32
1753        );
1754        assert_eq!(
1755            i32::try_from(&values.scalar_at(1).unwrap()).unwrap(),
1756            200i32
1757        );
1758        assert_eq!(
1759            i32::try_from(&values.scalar_at(2).unwrap()).unwrap(),
1760            300i32
1761        );
1762    }
1763
1764    #[test]
1765    fn test_indices_range() {
1766        let patches = Patches::new(
1767            10,
1768            0,
1769            buffer![2u64, 5, 8].into_array(),
1770            buffer![100i32, 200, 300].into_array(),
1771            None,
1772        )
1773        .unwrap();
1774
1775        assert_eq!(patches.min_index().unwrap(), 2);
1776        assert_eq!(patches.max_index().unwrap(), 8);
1777    }
1778
1779    #[test]
1780    fn test_search_index() {
1781        let patches = Patches::new(
1782            10,
1783            0,
1784            buffer![2u64, 5, 8].into_array(),
1785            buffer![100i32, 200, 300].into_array(),
1786            None,
1787        )
1788        .unwrap();
1789
1790        // Search for exact indices
1791        assert_eq!(patches.search_index(2).unwrap(), SearchResult::Found(0));
1792        assert_eq!(patches.search_index(5).unwrap(), SearchResult::Found(1));
1793        assert_eq!(patches.search_index(8).unwrap(), SearchResult::Found(2));
1794
1795        // Search for non-patch indices
1796        assert_eq!(patches.search_index(0).unwrap(), SearchResult::NotFound(0));
1797        assert_eq!(patches.search_index(3).unwrap(), SearchResult::NotFound(1));
1798        assert_eq!(patches.search_index(6).unwrap(), SearchResult::NotFound(2));
1799        assert_eq!(patches.search_index(9).unwrap(), SearchResult::NotFound(3));
1800    }
1801
1802    #[test]
1803    fn test_mask_boundary_patches() {
1804        // Test masking patches at array boundaries
1805        let patches = Patches::new(
1806            10,
1807            0,
1808            buffer![0u64, 9].into_array(),
1809            buffer![100i32, 200].into_array(),
1810            None,
1811        )
1812        .unwrap();
1813
1814        let mask = Mask::from_iter([
1815            true, false, false, false, false, false, false, false, false, false,
1816        ]);
1817        let masked = patches
1818            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1819            .unwrap();
1820        assert!(masked.is_some());
1821        let masked = masked.unwrap();
1822        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([9u64]));
1823        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([200i32]));
1824    }
1825
1826    #[test]
1827    fn test_mask_all_patches_removed() {
1828        // Test when all patches are masked out
1829        let patches = Patches::new(
1830            10,
1831            0,
1832            buffer![2u64, 5, 8].into_array(),
1833            buffer![100i32, 200, 300].into_array(),
1834            None,
1835        )
1836        .unwrap();
1837
1838        // Mask that removes all patches
1839        let mask = Mask::from_iter([
1840            false, false, true, false, false, true, false, false, true, false,
1841        ]);
1842        let masked = patches
1843            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1844            .unwrap();
1845        assert!(masked.is_none());
1846    }
1847
1848    #[test]
1849    fn test_mask_no_patches_removed() {
1850        // Test when no patches are masked
1851        let patches = Patches::new(
1852            10,
1853            0,
1854            buffer![2u64, 5, 8].into_array(),
1855            buffer![100i32, 200, 300].into_array(),
1856            None,
1857        )
1858        .unwrap();
1859
1860        // Mask that doesn't affect any patches
1861        let mask = Mask::from_iter([
1862            true, false, false, true, false, false, true, false, false, true,
1863        ]);
1864        let masked = patches
1865            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1866            .unwrap()
1867            .unwrap();
1868
1869        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64, 5, 8]));
1870        assert_arrays_eq!(
1871            masked.values(),
1872            PrimitiveArray::from_iter([100i32, 200, 300])
1873        );
1874    }
1875
1876    #[test]
1877    fn test_mask_single_patch() {
1878        // Test with a single patch
1879        let patches = Patches::new(
1880            5,
1881            0,
1882            buffer![2u64].into_array(),
1883            buffer![42i32].into_array(),
1884            None,
1885        )
1886        .unwrap();
1887
1888        // Mask that removes the single patch
1889        let mask = Mask::from_iter([false, false, true, false, false]);
1890        let masked = patches
1891            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1892            .unwrap();
1893        assert!(masked.is_none());
1894
1895        // Mask that keeps the single patch
1896        let mask = Mask::from_iter([true, false, false, true, false]);
1897        let masked = patches
1898            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1899            .unwrap()
1900            .unwrap();
1901        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([2u64]));
1902    }
1903
1904    #[test]
1905    fn test_mask_contiguous_patches() {
1906        // Test with contiguous patches
1907        let patches = Patches::new(
1908            10,
1909            0,
1910            buffer![3u64, 4, 5, 6].into_array(),
1911            buffer![100i32, 200, 300, 400].into_array(),
1912            None,
1913        )
1914        .unwrap();
1915
1916        // Mask that removes middle patches
1917        let mask = Mask::from_iter([
1918            false, false, false, false, true, true, false, false, false, false,
1919        ]);
1920        let masked = patches
1921            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1922            .unwrap()
1923            .unwrap();
1924
1925        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([3u64, 6]));
1926        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([100i32, 400]));
1927    }
1928
1929    #[test]
1930    fn test_mask_with_large_offset() {
1931        // Test with a large offset that shifts all indices
1932        let patches = Patches::new(
1933            20,
1934            15,
1935            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1936            buffer![100i32, 200, 300].into_array(),
1937            None,
1938        )
1939        .unwrap();
1940
1941        // Mask that removes the patch at actual index 2
1942        let mask = Mask::from_iter([
1943            false, false, true, false, false, false, false, false, false, false, false, false,
1944            false, false, false, false, false, false, false, false,
1945        ]);
1946        let masked = patches
1947            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1948            .unwrap()
1949            .unwrap();
1950
1951        assert_arrays_eq!(masked.indices(), PrimitiveArray::from_iter([16u64, 19]));
1952        assert_arrays_eq!(masked.values(), PrimitiveArray::from_iter([100i32, 300]));
1953    }
1954
1955    #[test]
1956    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1957    fn test_mask_wrong_length() {
1958        let patches = Patches::new(
1959            10,
1960            0,
1961            buffer![2u64, 5, 8].into_array(),
1962            buffer![100i32, 200, 300].into_array(),
1963            None,
1964        )
1965        .unwrap();
1966
1967        // Mask with wrong length
1968        let mask = Mask::from_iter([false, false, true, false, false]);
1969        patches
1970            .mask(&mask, &mut LEGACY_SESSION.create_execution_ctx())
1971            .unwrap();
1972    }
1973
1974    #[test]
1975    fn test_chunk_offsets_search() {
1976        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1977        let values = buffer![10i32, 20, 30, 40].into_array();
1978        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1979        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets)).unwrap();
1980
1981        assert!(patches.chunk_offsets.is_some());
1982
1983        // chunk 0: patches at 100, 200
1984        assert_eq!(patches.search_index(100).unwrap(), SearchResult::Found(0));
1985        assert_eq!(patches.search_index(200).unwrap(), SearchResult::Found(1));
1986
1987        // chunks 1, 2: no patches
1988        assert_eq!(
1989            patches.search_index(1500).unwrap(),
1990            SearchResult::NotFound(2)
1991        );
1992        assert_eq!(
1993            patches.search_index(2000).unwrap(),
1994            SearchResult::NotFound(2)
1995        );
1996
1997        // chunk 3: patches at 3000, 3100
1998        assert_eq!(patches.search_index(3000).unwrap(), SearchResult::Found(2));
1999        assert_eq!(patches.search_index(3100).unwrap(), SearchResult::Found(3));
2000
2001        assert_eq!(
2002            patches.search_index(1024).unwrap(),
2003            SearchResult::NotFound(2)
2004        );
2005    }
2006
2007    #[test]
2008    fn test_chunk_offsets_with_slice() {
2009        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2010        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2011        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2012        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2013
2014        let sliced = patches.slice(1000..2200).unwrap().unwrap();
2015
2016        assert!(sliced.chunk_offsets.is_some());
2017        assert_eq!(sliced.offset(), 1000);
2018
2019        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(0));
2020        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2021        assert_eq!(sliced.search_index(1100).unwrap(), SearchResult::Found(4));
2022
2023        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(1));
2024    }
2025
2026    #[test]
2027    fn test_chunk_offsets_with_slice_after_first_chunk() {
2028        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
2029        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
2030        let chunk_offsets = buffer![0u64, 2, 6].into_array();
2031        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2032
2033        let sliced = patches.slice(1300..2200).unwrap().unwrap();
2034
2035        assert!(sliced.chunk_offsets.is_some());
2036        assert_eq!(sliced.offset(), 1300);
2037
2038        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2039        assert_eq!(sliced.search_index(200).unwrap(), SearchResult::Found(1));
2040        assert_eq!(sliced.search_index(500).unwrap(), SearchResult::Found(2));
2041        assert_eq!(sliced.search_index(250).unwrap(), SearchResult::NotFound(2));
2042        assert_eq!(sliced.search_index(900).unwrap(), SearchResult::NotFound(4));
2043    }
2044
2045    #[test]
2046    fn test_chunk_offsets_slice_empty_result() {
2047        let indices = buffer![100u64, 200, 3000, 3100].into_array();
2048        let values = buffer![10i32, 20, 30, 40].into_array();
2049        let chunk_offsets = buffer![0u64, 2].into_array();
2050        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets)).unwrap();
2051
2052        let sliced = patches.slice(1000..2000).unwrap();
2053        assert!(sliced.is_none());
2054    }
2055
2056    #[test]
2057    fn test_chunk_offsets_slice_single_patch() {
2058        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
2059        let values = buffer![10i32, 20, 30, 40].into_array();
2060        let chunk_offsets = buffer![0u64, 1, 3].into_array();
2061        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2062
2063        let sliced = patches.slice(1100..1250).unwrap().unwrap();
2064
2065        assert_eq!(sliced.num_patches(), 1);
2066        assert_eq!(sliced.offset(), 1100);
2067        assert_eq!(sliced.search_index(100).unwrap(), SearchResult::Found(0)); // 1200 - 1100 = 100
2068        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::NotFound(0));
2069        assert_eq!(sliced.search_index(150).unwrap(), SearchResult::NotFound(1));
2070    }
2071
2072    #[test]
2073    fn test_chunk_offsets_slice_across_chunks() {
2074        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
2075        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2076        let chunk_offsets = buffer![0u64, 2, 4].into_array();
2077        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2078
2079        let sliced = patches.slice(150..2150).unwrap().unwrap();
2080
2081        assert_eq!(sliced.num_patches(), 4);
2082        assert_eq!(sliced.offset(), 150);
2083
2084        assert_eq!(sliced.search_index(50).unwrap(), SearchResult::Found(0)); // 200 - 150 = 50
2085        assert_eq!(sliced.search_index(950).unwrap(), SearchResult::Found(1)); // 1100 - 150 = 950
2086        assert_eq!(sliced.search_index(1050).unwrap(), SearchResult::Found(2)); // 1200 - 150 = 1050
2087        assert_eq!(sliced.search_index(1950).unwrap(), SearchResult::Found(3)); // 2100 - 150 = 1950
2088    }
2089
2090    #[test]
2091    fn test_chunk_offsets_boundary_searches() {
2092        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
2093        let values = buffer![10i32, 20, 30, 40, 50].into_array();
2094        let chunk_offsets = buffer![0u64, 1, 4].into_array();
2095        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2096
2097        assert_eq!(patches.search_index(1023).unwrap(), SearchResult::Found(0));
2098        assert_eq!(patches.search_index(1024).unwrap(), SearchResult::Found(1));
2099        assert_eq!(patches.search_index(1025).unwrap(), SearchResult::Found(2));
2100        assert_eq!(patches.search_index(2047).unwrap(), SearchResult::Found(3));
2101        assert_eq!(patches.search_index(2048).unwrap(), SearchResult::Found(4));
2102
2103        assert_eq!(
2104            patches.search_index(1022).unwrap(),
2105            SearchResult::NotFound(0)
2106        );
2107        assert_eq!(
2108            patches.search_index(2046).unwrap(),
2109            SearchResult::NotFound(3)
2110        );
2111    }
2112
2113    #[test]
2114    fn test_chunk_offsets_slice_edge_cases() {
2115        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
2116        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2117        let chunk_offsets = buffer![0u64, 3, 5].into_array();
2118        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets)).unwrap();
2119
2120        // Slice at the very beginning
2121        let sliced = patches.slice(0..10).unwrap().unwrap();
2122        assert_eq!(sliced.num_patches(), 2);
2123        assert_eq!(sliced.search_index(0).unwrap(), SearchResult::Found(0));
2124        assert_eq!(sliced.search_index(1).unwrap(), SearchResult::Found(1));
2125
2126        // Slice at the very end
2127        let sliced = patches.slice(2040..3000).unwrap().unwrap();
2128        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
2129        assert_eq!(sliced.search_index(7).unwrap(), SearchResult::Found(0)); // 2047 - 2040
2130        assert_eq!(sliced.search_index(8).unwrap(), SearchResult::Found(1)); // 2048 - 2040
2131    }
2132
2133    #[test]
2134    fn test_chunk_offsets_slice_nested() {
2135        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
2136        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
2137        let chunk_offsets = buffer![0u64].into_array();
2138        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets)).unwrap();
2139
2140        let sliced1 = patches.slice(150..550).unwrap().unwrap();
2141        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
2142
2143        let sliced2 = sliced1.slice(100..250).unwrap().unwrap();
2144        assert_eq!(sliced2.num_patches(), 1); // 300
2145        assert_eq!(sliced2.offset(), 250);
2146
2147        assert_eq!(sliced2.search_index(50).unwrap(), SearchResult::Found(0)); // 300 - 250
2148        assert_eq!(
2149            sliced2.search_index(150).unwrap(),
2150            SearchResult::NotFound(1)
2151        );
2152    }
2153
2154    #[test]
2155    fn test_index_larger_than_length() {
2156        let chunk_offsets = buffer![0u64].into_array();
2157        let indices = buffer![1023u64].into_array();
2158        let values = buffer![42i32].into_array();
2159        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets)).unwrap();
2160        assert_eq!(
2161            patches.search_index(2048).unwrap(),
2162            SearchResult::NotFound(1)
2163        );
2164    }
2165}