vortex_array/
patches.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::cmp::Ordering;
5use std::fmt::Debug;
6use std::hash::Hash;
7use std::ops::Range;
8
9use arrow_buffer::BooleanBuffer;
10use itertools::Itertools;
11use num_traits::NumCast;
12use vortex_buffer::BufferMut;
13use vortex_dtype::Nullability::NonNullable;
14use vortex_dtype::{
15    DType, IntegerPType, NativePType, PType, UnsignedPType, match_each_integer_ptype,
16    match_each_unsigned_integer_ptype,
17};
18use vortex_error::{
19    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
20};
21use vortex_mask::{AllOr, Mask};
22use vortex_scalar::{PValue, Scalar};
23use vortex_utils::aliases::hash_map::HashMap;
24
25use crate::arrays::PrimitiveArray;
26use crate::compute::{cast, filter, is_sorted, take};
27use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
28use crate::vtable::ValidityHelper;
29use crate::{Array, ArrayRef, IntoArray, ToCanonical};
30
31/// One patch index offset is stored for each chunk.
32/// This allows for constant time patch index lookups.
33const PATCH_CHUNK_SIZE: usize = 1024;
34
35#[derive(Copy, Clone, prost::Message)]
36pub struct PatchesMetadata {
37    #[prost(uint64, tag = "1")]
38    len: u64,
39    #[prost(uint64, tag = "2")]
40    offset: u64,
41    #[prost(enumeration = "PType", tag = "3")]
42    indices_ptype: i32,
43    #[prost(uint64, optional, tag = "4")]
44    chunk_offsets_len: Option<u64>,
45    #[prost(enumeration = "PType", optional, tag = "5")]
46    chunk_offsets_ptype: Option<i32>,
47    #[prost(uint64, optional, tag = "6")]
48    offset_within_chunk: Option<u64>,
49}
50
51impl PatchesMetadata {
52    #[inline]
53    pub fn new(
54        len: usize,
55        offset: usize,
56        indices_ptype: PType,
57        chunk_offsets_len: Option<usize>,
58        chunk_offsets_ptype: Option<PType>,
59        offset_within_chunk: Option<usize>,
60    ) -> Self {
61        Self {
62            len: len as u64,
63            offset: offset as u64,
64            indices_ptype: indices_ptype as i32,
65            chunk_offsets_len: chunk_offsets_len.map(|len| len as u64),
66            chunk_offsets_ptype: chunk_offsets_ptype.map(|pt| pt as i32),
67            offset_within_chunk: offset_within_chunk.map(|len| len as u64),
68        }
69    }
70
71    #[inline]
72    pub fn len(&self) -> usize {
73        usize::try_from(self.len).vortex_expect("len is a valid usize")
74    }
75
76    #[inline]
77    pub fn is_empty(&self) -> bool {
78        self.len == 0
79    }
80
81    #[inline]
82    pub fn offset(&self) -> usize {
83        usize::try_from(self.offset).vortex_expect("offset is a valid usize")
84    }
85
86    #[inline]
87    pub fn chunk_offsets_dtype(&self) -> Option<DType> {
88        self.chunk_offsets_ptype
89            .map(|t| PType::try_from(t).vortex_expect(&format!("invalid i32 value {t} for PType")))
90            .map(|ptype| DType::Primitive(ptype, NonNullable))
91    }
92
93    #[inline]
94    pub fn indices_dtype(&self) -> DType {
95        assert!(
96            self.indices_ptype().is_unsigned_int(),
97            "Patch indices must be unsigned integers"
98        );
99        DType::Primitive(self.indices_ptype(), NonNullable)
100    }
101}
102
103/// A helper for working with patched arrays.
104#[derive(Debug, Clone)]
105pub struct Patches {
106    array_len: usize,
107    offset: usize,
108    indices: ArrayRef,
109    values: ArrayRef,
110    /// Stores the patch index offset for each chunk.
111    chunk_offsets: Option<ArrayRef>,
112    /// Chunk offsets are only sliced off in case the slice is fully
113    /// outside of the chunk range.
114    ///
115    /// Though the range for indices and values is sliced in terms of
116    /// individual elements, not chunks. To account for that we do a
117    /// saturating sub when adjusting the indices based on the chunk offset.
118    //
119    /// `offset_within_chunk` is necessary in order to keep track of how many
120    /// elements were sliced off within the chunk.
121    offset_within_chunk: Option<usize>,
122}
123
124impl Patches {
125    pub fn new(
126        array_len: usize,
127        offset: usize,
128        indices: ArrayRef,
129        values: ArrayRef,
130        chunk_offsets: Option<ArrayRef>,
131    ) -> Self {
132        assert_eq!(
133            indices.len(),
134            values.len(),
135            "Patch indices and values must have the same length"
136        );
137        assert!(
138            indices.dtype().is_unsigned_int() && !indices.dtype().is_nullable(),
139            "Patch indices must be non-nullable unsigned integers, got {:?}",
140            indices.dtype()
141        );
142        assert!(
143            indices.len() <= array_len,
144            "Patch indices must be shorter than the array length"
145        );
146        assert!(!indices.is_empty(), "Patch indices must not be empty");
147        let max = usize::try_from(&indices.scalar_at(indices.len() - 1))
148            .vortex_expect("indices must be a number");
149        assert!(
150            max - offset < array_len,
151            "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
152        );
153
154        debug_assert!(
155            is_sorted(indices.as_ref())
156                .unwrap_or(Some(false))
157                .unwrap_or(false),
158            "Patch indices must be sorted"
159        );
160
161        Self {
162            array_len,
163            offset,
164            indices,
165            values,
166            chunk_offsets: chunk_offsets.clone(),
167            // Initialize with `Some(0)` only if `chunk_offsets` are set.
168            offset_within_chunk: chunk_offsets.map(|_| 0),
169        }
170    }
171
172    /// Construct new patches without validating any of the arguments
173    ///
174    /// # Safety
175    ///
176    /// Users have to assert that
177    /// * Indices and values have the same length
178    /// * Indices is an unsigned integer type
179    /// * Indices must be sorted
180    /// * Last value in indices is smaller than array_len
181    pub unsafe fn new_unchecked(
182        array_len: usize,
183        offset: usize,
184        indices: ArrayRef,
185        values: ArrayRef,
186        chunk_offsets: Option<ArrayRef>,
187        offset_within_chunk: Option<usize>,
188    ) -> Self {
189        Self {
190            array_len,
191            offset,
192            indices,
193            values,
194            chunk_offsets,
195            offset_within_chunk,
196        }
197    }
198
199    #[inline]
200    pub fn array_len(&self) -> usize {
201        self.array_len
202    }
203
204    #[inline]
205    pub fn num_patches(&self) -> usize {
206        self.indices.len()
207    }
208
209    #[inline]
210    pub fn dtype(&self) -> &DType {
211        self.values.dtype()
212    }
213
214    #[inline]
215    pub fn indices(&self) -> &ArrayRef {
216        &self.indices
217    }
218
219    #[inline]
220    pub fn into_indices(self) -> ArrayRef {
221        self.indices
222    }
223
224    #[inline]
225    pub fn indices_mut(&mut self) -> &mut ArrayRef {
226        &mut self.indices
227    }
228
229    #[inline]
230    pub fn values(&self) -> &ArrayRef {
231        &self.values
232    }
233
234    #[inline]
235    pub fn into_values(self) -> ArrayRef {
236        self.values
237    }
238
239    #[inline]
240    pub fn values_mut(&mut self) -> &mut ArrayRef {
241        &mut self.values
242    }
243
244    #[inline]
245    pub fn offset(&self) -> usize {
246        self.offset
247    }
248
249    #[inline]
250    pub fn chunk_offsets(&self) -> &Option<ArrayRef> {
251        &self.chunk_offsets
252    }
253
254    #[inline]
255    pub fn chunk_offset_at(&self, idx: usize) -> usize {
256        let Some(chunk_offsets) = &self.chunk_offsets else {
257            vortex_panic!("chunk_offsets must be set to retrieve offset at index")
258        };
259
260        chunk_offsets
261            .scalar_at(idx)
262            .as_primitive()
263            .as_::<usize>()
264            .vortex_expect("chunk offset must be usize")
265    }
266
267    #[inline]
268    pub fn indices_ptype(&self) -> PType {
269        PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
270    }
271
272    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
273        if self.indices.len() > len {
274            vortex_bail!(
275                "Patch indices {} are longer than the array length {}",
276                self.indices.len(),
277                len
278            );
279        }
280        if self.values.dtype() != dtype {
281            vortex_bail!(
282                "Patch values dtype {} does not match array dtype {}",
283                self.values.dtype(),
284                dtype
285            );
286        }
287        let chunk_offsets_len = self.chunk_offsets.as_ref().map(|co| co.len());
288        let chunk_offsets_ptype = self.chunk_offsets.as_ref().map(|co| co.dtype().as_ptype());
289
290        Ok(PatchesMetadata::new(
291            self.indices.len(),
292            self.offset,
293            self.indices.dtype().as_ptype(),
294            chunk_offsets_len,
295            chunk_offsets_ptype,
296            self.offset_within_chunk,
297        ))
298    }
299
300    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
301        // SAFETY: casting does not affect the relationship between the indices and values
302        unsafe {
303            Ok(Self::new_unchecked(
304                self.array_len,
305                self.offset,
306                self.indices,
307                cast(&self.values, values_dtype)?,
308                self.chunk_offsets,
309                self.offset_within_chunk,
310            ))
311        }
312    }
313
314    /// Get the patched value at a given index if it exists.
315    pub fn get_patched(&self, index: usize) -> Option<Scalar> {
316        self.search_index(index)
317            .to_found()
318            .map(|patch_idx| self.values().scalar_at(patch_idx))
319    }
320
321    /// Searches for `index` in the indices array.
322    ///
323    /// Chooses between chunked search when [`Self::chunk_offsets`] is
324    /// available, and binary search otherwise. The `index` parameter is
325    /// adjusted by [`Self::offset`] for both.
326    ///
327    /// # Arguments
328    /// * `index` - The index to search for
329    ///
330    /// # Returns
331    /// * [`SearchResult::Found(patch_idx)`] - If a patch exists at this index, returns the
332    ///   position in the patches array
333    /// * [`SearchResult::NotFound(insertion_point)`] - If no patch exists, returns where
334    ///   a patch at this index would be inserted to maintain sorted order
335    pub fn search_index(&self, index: usize) -> SearchResult {
336        if self.chunk_offsets.is_some() {
337            return self.search_index_chunked(index);
338        }
339
340        Self::search_index_binary_search(&self.indices, index + self.offset)
341    }
342
343    /// Binary searches for `needle` in the indices array.
344    ///
345    /// # Returns
346    /// [`SearchResult::Found`] with the position if needle exists, or [`SearchResult::NotFound`]
347    /// with the insertion point if not found.
348    fn search_index_binary_search(indices: &dyn Array, needle: usize) -> SearchResult {
349        if indices.is_canonical() {
350            let primitive = indices.to_primitive();
351            match_each_integer_ptype!(primitive.ptype(), |T| {
352                let Ok(needle) = T::try_from(needle) else {
353                    // If the needle is not of type T, then it cannot possibly be in this array.
354                    //
355                    // The needle is a non-negative integer (a usize); therefore, it must be larger
356                    // than all values in this array.
357                    return SearchResult::NotFound(primitive.len());
358                };
359                return primitive
360                    .as_slice::<T>()
361                    .search_sorted(&needle, SearchSortedSide::Left);
362            });
363        }
364        indices
365            .as_primitive_typed()
366            .search_sorted(&PValue::U64(needle as u64), SearchSortedSide::Left)
367    }
368
369    /// Constant time searches for `index` in the indices array.
370    ///
371    /// First determines which chunk the target index falls into, then performs
372    /// a binary search within that chunk's range.
373    ///
374    /// Returns a [`SearchResult`] indicating either the exact patch index if found,
375    /// or the insertion point if not found.
376    ///
377    /// # Panics
378    /// Panics if `chunk_offsets` or `offset_within_chunk` are not set.
379    fn search_index_chunked(&self, index: usize) -> SearchResult {
380        let Some(chunk_offsets) = &self.chunk_offsets else {
381            vortex_panic!("chunk_offsets is required to be set")
382        };
383
384        let Some(offset_within_chunk) = self.offset_within_chunk else {
385            vortex_panic!("offset_within_chunk is required to be set")
386        };
387
388        if index >= self.array_len() {
389            return SearchResult::NotFound(self.indices().len());
390        }
391
392        let chunk_idx = (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE;
393
394        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
395        let base_offset = self.chunk_offset_at(0);
396
397        let patches_start_idx = (self.chunk_offset_at(chunk_idx) - base_offset)
398            // Chunk offsets are only sliced off in case the slice is fully
399            // outside of the chunk range.
400            //
401            // Though the range for indices and values is sliced in terms of
402            // individual elements, not chunks. To account for that we do a
403            // saturating sub when adjusting the indices based on the chunk offset.
404            .saturating_sub(offset_within_chunk);
405
406        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
407            self.chunk_offset_at(chunk_idx + 1) - base_offset - offset_within_chunk
408        } else {
409            self.indices.len()
410        };
411
412        let chunk_indices = self.indices.slice(patches_start_idx..patches_end_idx);
413        let result = Self::search_index_binary_search(&chunk_indices, index + self.offset);
414
415        match result {
416            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
417            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
418        }
419    }
420
421    /// Batch version of `search_index`.
422    ///
423    /// In contrast to `search_index`, this function requires `indices` as
424    /// well as `chunk_offsets` to be passed as slices. This is to avoid
425    /// redundant canonicalization and `scalar_at` lookups across calls.
426    fn search_index_chunked_batch<T, O>(
427        &self,
428        indices: &[T],
429        chunk_offsets: &[O],
430        index: T,
431    ) -> SearchResult
432    where
433        T: UnsignedPType,
434        O: UnsignedPType,
435        usize: TryFrom<T>,
436        usize: TryFrom<O>,
437    {
438        let Some(offset_within_chunk) = self.offset_within_chunk else {
439            vortex_panic!("offset_within_chunk is required to be set")
440        };
441
442        let chunk_idx = {
443            let Ok(index) = usize::try_from(index) else {
444                // If the needle cannot be converted to usize, it's larger than all values in this array.
445                return SearchResult::NotFound(indices.len());
446            };
447
448            if index >= self.array_len() {
449                return SearchResult::NotFound(self.indices().len());
450            }
451
452            (index + self.offset % PATCH_CHUNK_SIZE) / PATCH_CHUNK_SIZE
453        };
454
455        // Patch index offsets are absolute and need to be offset by the first chunk of the current slice.
456        let Ok(chunk_offset) = usize::try_from(chunk_offsets[chunk_idx] - chunk_offsets[0]) else {
457            vortex_panic!("chunk_offset failed to convert to usize")
458        };
459
460        let patches_start_idx = chunk_offset
461            // Chunk offsets are only sliced off in case the slice is fully
462            // outside of the chunk range.
463            //
464            // Though the range for indices and values is sliced in terms of
465            // individual elements, not chunks. To account for that we do a
466            // saturating sub when adjusting the indices based on the chunk offset.
467            .saturating_sub(offset_within_chunk);
468
469        let patches_end_idx = if chunk_idx < chunk_offsets.len() - 1 {
470            let base_offset_end = chunk_offsets[chunk_idx + 1];
471
472            let Some(offset_within_chunk) = O::from(offset_within_chunk) else {
473                vortex_panic!("offset_within_chunk failed to convert to O");
474            };
475
476            let Ok(patches_end_idx) =
477                usize::try_from(base_offset_end - chunk_offsets[0] - offset_within_chunk)
478            else {
479                vortex_panic!("patches_end_idx failed to convert to usize")
480            };
481
482            patches_end_idx
483        } else {
484            self.indices.len()
485        };
486
487        let Some(offset) = T::from(self.offset) else {
488            // If the offset cannot be converted to T, it's larger than all values in this array.
489            return SearchResult::NotFound(indices.len());
490        };
491
492        let chunk_indices = &indices[patches_start_idx..patches_end_idx];
493        let result = chunk_indices.search_sorted(&(index + offset), SearchSortedSide::Left);
494
495        match result {
496            SearchResult::Found(idx) => SearchResult::Found(patches_start_idx + idx),
497            SearchResult::NotFound(idx) => SearchResult::NotFound(patches_start_idx + idx),
498        }
499    }
500
501    /// Returns the minimum patch index
502    pub fn min_index(&self) -> usize {
503        let first = self
504            .indices
505            .scalar_at(0)
506            .as_primitive()
507            .as_::<usize>()
508            .vortex_expect("non-null");
509        first - self.offset
510    }
511
512    /// Returns the maximum patch index
513    pub fn max_index(&self) -> usize {
514        let last = self
515            .indices
516            .scalar_at(self.indices.len() - 1)
517            .as_primitive()
518            .as_::<usize>()
519            .vortex_expect("non-null");
520        last - self.offset
521    }
522
523    /// Filter the patches by a mask, resulting in new patches for the filtered array.
524    pub fn filter(&self, mask: &Mask) -> VortexResult<Option<Self>> {
525        if mask.len() != self.array_len {
526            vortex_bail!(
527                "Filter mask length {} does not match array length {}",
528                mask.len(),
529                self.array_len
530            );
531        }
532
533        match mask.indices() {
534            AllOr::All => Ok(Some(self.clone())),
535            AllOr::None => Ok(None),
536            AllOr::Some(mask_indices) => {
537                let flat_indices = self.indices().to_primitive();
538                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
539                    filter_patches_with_mask(
540                        flat_indices.as_slice::<I>(),
541                        self.offset(),
542                        self.values(),
543                        mask_indices,
544                    )
545                })
546            }
547        }
548    }
549
550    /// Mask the patches, REMOVING the patches where the mask is true.
551    /// Unlike filter, this preserves the patch indices.
552    /// Unlike mask on a single array, this does not set masked values to null.
553    pub fn mask(&self, mask: &Mask) -> VortexResult<Option<Self>> {
554        if mask.len() != self.array_len {
555            vortex_bail!(
556                "Filter mask length {} does not match array length {}",
557                mask.len(),
558                self.array_len
559            );
560        }
561
562        let filter_mask = match mask.boolean_buffer() {
563            AllOr::All => return Ok(None),
564            AllOr::None => return Ok(Some(self.clone())),
565            AllOr::Some(masked) => {
566                let patch_indices = self.indices().to_primitive();
567                match_each_unsigned_integer_ptype!(patch_indices.ptype(), |P| {
568                    let patch_indices = patch_indices.as_slice::<P>();
569                    Mask::from_buffer(BooleanBuffer::collect_bool(patch_indices.len(), |i| {
570                        #[allow(clippy::cast_possible_truncation)]
571                        let idx = (patch_indices[i] as usize) - self.offset;
572                        !masked.value(idx)
573                    }))
574                })
575            }
576        };
577
578        if filter_mask.all_false() {
579            return Ok(None);
580        }
581
582        // SAFETY: filtering indices/values with same mask maintains their 1:1 relationship
583        let filtered_indices = filter(&self.indices, &filter_mask)?;
584        let filtered_values = filter(&self.values, &filter_mask)?;
585
586        Ok(Some(Self {
587            array_len: self.array_len,
588            offset: self.offset,
589            indices: filtered_indices,
590            values: filtered_values,
591            // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
592            chunk_offsets: None,
593            offset_within_chunk: self.offset_within_chunk,
594        }))
595    }
596
597    /// Slice the patches by a range of the patched array.
598    pub fn slice(&self, range: Range<usize>) -> Option<Self> {
599        let slice_start_idx = self.search_index(range.start).to_index();
600        let slice_end_idx = self.search_index(range.end).to_index();
601
602        if slice_start_idx == slice_end_idx {
603            return None;
604        }
605
606        let values = self.values().slice(slice_start_idx..slice_end_idx);
607        let indices = self.indices().slice(slice_start_idx..slice_end_idx);
608
609        let chunk_offsets = self.chunk_offsets.as_ref().map(|chunk_offsets| {
610            let chunk_relative_offset = self.offset % PATCH_CHUNK_SIZE;
611            let chunk_start_idx = (chunk_relative_offset + range.start) / PATCH_CHUNK_SIZE;
612            let chunk_end_idx = (chunk_relative_offset + range.end).div_ceil(PATCH_CHUNK_SIZE);
613            chunk_offsets.slice(chunk_start_idx..chunk_end_idx)
614        });
615
616        let offset_within_chunk = chunk_offsets.as_ref().map(|chunk_offsets| {
617            let base_offset = chunk_offsets
618                .scalar_at(0)
619                .as_primitive()
620                .as_::<usize>()
621                .vortex_expect("chunk offset must be usize");
622            slice_start_idx - base_offset
623        });
624
625        Some(Self {
626            array_len: range.len(),
627            offset: range.start + self.offset(),
628            indices,
629            values,
630            chunk_offsets,
631            offset_within_chunk,
632        })
633    }
634
635    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
636    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
637
638    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
639        (self.num_patches() as f64 / take_indices.len() as f64)
640            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
641    }
642
643    /// Take the indices from the patches
644    ///
645    /// Any nulls in take_indices are added to the resulting patches.
646    pub fn take_with_nulls(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
647        if take_indices.is_empty() {
648            return Ok(None);
649        }
650
651        let take_indices = take_indices.to_primitive();
652        if self.is_map_faster_than_search(&take_indices) {
653            self.take_map(take_indices, true)
654        } else {
655            self.take_search(take_indices, true)
656        }
657    }
658
659    /// Take the indices from the patches.
660    ///
661    /// Any nulls in take_indices are ignored.
662    pub fn take(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
663        if take_indices.is_empty() {
664            return Ok(None);
665        }
666
667        let take_indices = take_indices.to_primitive();
668        if self.is_map_faster_than_search(&take_indices) {
669            self.take_map(take_indices, false)
670        } else {
671            self.take_search(take_indices, false)
672        }
673    }
674
675    #[allow(clippy::cognitive_complexity)]
676    pub fn take_search(
677        &self,
678        take_indices: PrimitiveArray,
679        include_nulls: bool,
680    ) -> VortexResult<Option<Self>> {
681        let take_indices_validity = take_indices.validity();
682        let patch_indices = self.indices.to_primitive();
683        let chunk_offsets = self.chunk_offsets().as_ref().map(|co| co.to_primitive());
684
685        let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) =
686            match_each_unsigned_integer_ptype!(patch_indices.ptype(), |PatchT| {
687                let patch_indices_slice = patch_indices.as_slice::<PatchT>();
688                match_each_integer_ptype!(take_indices.ptype(), |TakeT| {
689                    let take_slice = take_indices.as_slice::<TakeT>();
690
691                    if let Some(chunk_offsets) = chunk_offsets {
692                        match_each_unsigned_integer_ptype!(chunk_offsets.ptype(), |OffsetT| {
693                            let chunk_offsets = chunk_offsets.as_slice::<OffsetT>();
694                            take_indices_with_search_fn(
695                                patch_indices_slice,
696                                take_slice,
697                                take_indices.validity_mask(),
698                                include_nulls,
699                                |take_idx| {
700                                    self.search_index_chunked_batch(
701                                        patch_indices_slice,
702                                        chunk_offsets,
703                                        take_idx,
704                                    )
705                                },
706                            )
707                        })
708                    } else {
709                        take_indices_with_search_fn(
710                            patch_indices_slice,
711                            take_slice,
712                            take_indices.validity_mask(),
713                            include_nulls,
714                            |take_idx| {
715                                let Some(offset) = <PatchT as NumCast>::from(self.offset) else {
716                                    // If the offset cannot be converted to T, it's larger than all values in this array.
717                                    return SearchResult::NotFound(patch_indices_slice.len());
718                                };
719
720                                patch_indices_slice
721                                    .search_sorted(&(take_idx + offset), SearchSortedSide::Left)
722                            },
723                        )
724                    }
725                })
726            });
727
728        if new_indices.is_empty() {
729            return Ok(None);
730        }
731
732        let new_indices = new_indices.into_array();
733        let new_array_len = take_indices.len();
734        let num_chunks = new_array_len.div_ceil(PATCH_CHUNK_SIZE);
735        let values_validity = take_indices_validity.take(&new_indices)?;
736
737        // Indices are rewritten during take such that they are strictly
738        // monotonically increasing, starting from 0. Therefore, we know
739        // that there's 1024 entries per chunk, except for the last.
740        let mut chunk_offsets_buf = BufferMut::<u64>::with_capacity(num_chunks);
741        for chunk_idx in 0..num_chunks {
742            chunk_offsets_buf.push((chunk_idx * PATCH_CHUNK_SIZE) as u64);
743        }
744
745        Ok(Some(Self {
746            array_len: new_array_len,
747            offset: 0,
748            indices: new_indices.clone(),
749            values: take(
750                self.values(),
751                &PrimitiveArray::new(values_indices, values_validity).into_array(),
752            )?,
753            chunk_offsets: Some(chunk_offsets_buf.into_array()),
754            offset_within_chunk: Some(0), // Reset when creating new Patches.
755        }))
756    }
757
758    pub fn take_map(
759        &self,
760        take_indices: PrimitiveArray,
761        include_nulls: bool,
762    ) -> VortexResult<Option<Self>> {
763        let indices = self.indices.to_primitive();
764        let new_length = take_indices.len();
765
766        let Some((new_sparse_indices, value_indices)) =
767            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
768                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
769                    take_map::<_, TakeIndices>(
770                        indices.as_slice::<Indices>(),
771                        take_indices,
772                        self.offset(),
773                        self.min_index(),
774                        self.max_index(),
775                        include_nulls,
776                    )?
777                })
778            })
779        else {
780            return Ok(None);
781        };
782
783        let taken_values = take(self.values(), &value_indices)?;
784
785        Ok(Some(Patches {
786            array_len: new_length,
787            offset: 0,
788            indices: new_sparse_indices,
789            values: taken_values,
790            // TODO(0ax1): Chunk offsets are invalid after take is applied.
791            chunk_offsets: None,
792            offset_within_chunk: self.offset_within_chunk,
793        }))
794    }
795
796    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
797    where
798        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
799    {
800        let values = f(self.values)?;
801        if self.indices.len() != values.len() {
802            vortex_bail!(
803                "map_values must preserve length: expected {} received {}",
804                self.indices.len(),
805                values.len()
806            )
807        }
808
809        Ok(Self {
810            array_len: self.array_len,
811            offset: self.offset,
812            indices: self.indices,
813            values,
814            chunk_offsets: self.chunk_offsets,
815            offset_within_chunk: self.offset_within_chunk,
816        })
817    }
818}
819
820fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
821    indices: &[I],
822    take_indices: PrimitiveArray,
823    indices_offset: usize,
824    min_index: usize,
825    max_index: usize,
826    include_nulls: bool,
827) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
828where
829    usize: TryFrom<T>,
830    VortexError: From<<I as TryFrom<usize>>::Error>,
831{
832    let take_indices_validity = take_indices.validity();
833    let take_indices = take_indices.as_slice::<T>();
834    let offset_i = I::try_from(indices_offset)?;
835
836    let sparse_index_to_value_index: HashMap<I, usize> = indices
837        .iter()
838        .copied()
839        .map(|idx| idx - offset_i)
840        .enumerate()
841        .map(|(value_index, sparse_index)| (sparse_index, value_index))
842        .collect();
843
844    let (new_sparse_indices, value_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
845        .iter()
846        .copied()
847        .map(usize::try_from)
848        .process_results(|iter| {
849            iter.enumerate()
850                .filter_map(|(idx_in_take, ti)| {
851                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
852                    if include_nulls && take_indices_validity.is_null(idx_in_take) {
853                        Some((idx_in_take as u64, 0))
854                    } else if ti < min_index || ti > max_index {
855                        None
856                    } else {
857                        sparse_index_to_value_index
858                            .get(
859                                &I::try_from(ti)
860                                    .vortex_expect("take index is between min and max index"),
861                            )
862                            .map(|value_index| (idx_in_take as u64, *value_index as u64))
863                    }
864                })
865                .unzip()
866        })
867        .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
868
869    if new_sparse_indices.is_empty() {
870        return Ok(None);
871    }
872
873    let new_sparse_indices = new_sparse_indices.into_array();
874    let values_validity = take_indices_validity.take(&new_sparse_indices)?;
875    Ok(Some((
876        new_sparse_indices,
877        PrimitiveArray::new(value_indices, values_validity).into_array(),
878    )))
879}
880
881/// Filter patches with the provided mask (in flattened space).
882///
883/// The filter mask may contain indices that are non-patched. The return value of this function
884/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
885/// patch values.
886fn filter_patches_with_mask<T: IntegerPType>(
887    patch_indices: &[T],
888    offset: usize,
889    patch_values: &dyn Array,
890    mask_indices: &[usize],
891) -> VortexResult<Option<Patches>> {
892    let true_count = mask_indices.len();
893    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
894    let mut new_mask_indices = Vec::with_capacity(true_count);
895
896    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
897    // the patches are relatively sparse compared to the overall mask, and so many indices in the
898    // mask will end up being skipped.
899    const STRIDE: usize = 4;
900
901    let mut mask_idx = 0usize;
902    let mut true_idx = 0usize;
903
904    while mask_idx < patch_indices.len() && true_idx < true_count {
905        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
906        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
907        //  the mask (which covers both patch and non-patch values of the parent array), and so to
908        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
909        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
910        //  fallback to performing a two-way iterator merge.
911        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
912            // Load a vector of each into our registers.
913            let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min") - offset;
914            let left_max = patch_indices[mask_idx + STRIDE]
915                .to_usize()
916                .vortex_expect("left_max")
917                - offset;
918            let right_min = mask_indices[true_idx];
919            let right_max = mask_indices[true_idx + STRIDE];
920
921            if left_min > right_max {
922                // Advance right side
923                true_idx += STRIDE;
924                continue;
925            } else if right_min > left_max {
926                mask_idx += STRIDE;
927                continue;
928            } else {
929                // Fallthrough to direct comparison path.
930            }
931        }
932
933        // Two-way sorted iterator merge:
934
935        let left = patch_indices[mask_idx].to_usize().vortex_expect("left") - offset;
936        let right = mask_indices[true_idx];
937
938        match left.cmp(&right) {
939            Ordering::Less => {
940                mask_idx += 1;
941            }
942            Ordering::Greater => {
943                true_idx += 1;
944            }
945            Ordering::Equal => {
946                // Save the mask index as well as the positional index.
947                new_mask_indices.push(mask_idx);
948                new_patch_indices.push(true_idx as u64);
949
950                mask_idx += 1;
951                true_idx += 1;
952            }
953        }
954    }
955
956    if new_mask_indices.is_empty() {
957        return Ok(None);
958    }
959
960    let new_patch_indices = new_patch_indices.into_array();
961    let new_patch_values = filter(
962        patch_values,
963        &Mask::from_indices(patch_values.len(), new_mask_indices),
964    )?;
965
966    Ok(Some(Patches::new(
967        true_count,
968        0,
969        new_patch_indices,
970        new_patch_values,
971        // TODO(0ax1): Chunk offsets are invalid after a filter is applied.
972        None,
973    )))
974}
975
976fn take_indices_with_search_fn<I: UnsignedPType, T: IntegerPType, F: Fn(I) -> SearchResult>(
977    indices: &[I],
978    take_indices: &[T],
979    take_validity: Mask,
980    include_nulls: bool,
981    search_fn: F,
982) -> (BufferMut<u64>, BufferMut<u64>) {
983    take_indices
984        .iter()
985        .enumerate()
986        .filter_map(|(new_patch_idx, &take_idx)| {
987            if include_nulls && !take_validity.value(new_patch_idx) {
988                // For nulls, patch index doesn't matter - use 0 for consistency
989                Some((0u64, new_patch_idx as u64))
990            } else {
991                let search_result = I::from(take_idx)
992                    .map(&search_fn)
993                    .unwrap_or(SearchResult::NotFound(indices.len()));
994
995                search_result
996                    .to_found()
997                    .map(|patch_idx| (patch_idx as u64, new_patch_idx as u64))
998            }
999        })
1000        .unzip()
1001}
1002
1003#[cfg(test)]
1004mod test {
1005    use vortex_buffer::{BufferMut, buffer};
1006    use vortex_mask::Mask;
1007
1008    use crate::arrays::PrimitiveArray;
1009    use crate::patches::Patches;
1010    use crate::search_sorted::SearchResult;
1011    use crate::validity::Validity;
1012    use crate::{IntoArray, ToCanonical};
1013
1014    #[test]
1015    fn test_filter() {
1016        let patches = Patches::new(
1017            100,
1018            0,
1019            buffer![10u32, 11, 20].into_array(),
1020            buffer![100, 110, 200].into_array(),
1021            None,
1022        );
1023
1024        let filtered = patches
1025            .filter(&Mask::from_indices(100, vec![10, 20, 30]))
1026            .unwrap()
1027            .unwrap();
1028
1029        let indices = filtered.indices().to_primitive();
1030        let values = filtered.values().to_primitive();
1031        assert_eq!(indices.as_slice::<u64>(), &[0, 1]);
1032        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1033    }
1034
1035    #[test]
1036    fn take_with_nulls() {
1037        let patches = Patches::new(
1038            20,
1039            0,
1040            buffer![2u64, 9, 15].into_array(),
1041            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
1042            None,
1043        );
1044
1045        let taken = patches
1046            .take(
1047                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
1048                    .into_array(),
1049            )
1050            .unwrap()
1051            .unwrap();
1052        let primitive_values = taken.values().to_primitive();
1053        let primitive_indices = taken.indices().to_primitive();
1054        assert_eq!(taken.array_len(), 2);
1055        assert_eq!(primitive_values.as_slice::<i32>(), [44]);
1056        assert_eq!(primitive_indices.as_slice::<u64>(), [0]);
1057        assert_eq!(
1058            primitive_values.validity_mask(),
1059            Mask::from_iter(vec![true])
1060        );
1061    }
1062
1063    #[test]
1064    fn take_search_with_nulls_chunked() {
1065        let patches = Patches::new(
1066            20,
1067            0,
1068            buffer![2u64, 9, 15].into_array(),
1069            buffer![33_i32, 44, 55].into_array(),
1070            Some(buffer![0u64].into_array()),
1071        );
1072
1073        let taken = patches
1074            .take_search(
1075                PrimitiveArray::new(buffer![9, 0], Validity::from_iter([true, false])),
1076                true,
1077            )
1078            .unwrap()
1079            .unwrap();
1080
1081        let primitive_values = taken.values().to_primitive();
1082        let primitive_indices = taken.indices().to_primitive();
1083        assert_eq!(taken.array_len(), 2);
1084        assert_eq!(primitive_values.as_slice::<i32>(), [44, 33]);
1085        assert_eq!(primitive_indices.as_slice::<u64>(), [0, 1]);
1086
1087        assert_eq!(
1088            primitive_values.validity_mask(),
1089            Mask::from_iter([true, false])
1090        );
1091
1092        let chunk_offsets = taken.chunk_offsets().as_ref().unwrap().to_primitive();
1093        assert_eq!(chunk_offsets.as_slice::<u64>(), &[0]);
1094    }
1095
1096    #[test]
1097    fn take_search_chunked_multiple_chunks() {
1098        let patches = Patches::new(
1099            2048,
1100            0,
1101            buffer![100u64, 500, 1200, 1800].into_array(),
1102            buffer![10_i32, 20, 30, 40].into_array(),
1103            Some(buffer![0u64, 2].into_array()),
1104        );
1105
1106        let taken = patches
1107            .take_search(
1108                PrimitiveArray::new(buffer![500, 1200, 999], Validity::AllValid),
1109                true,
1110            )
1111            .unwrap()
1112            .unwrap();
1113
1114        let primitive_values = taken.values().to_primitive();
1115        assert_eq!(taken.array_len(), 3);
1116        assert_eq!(primitive_values.as_slice::<i32>(), [20, 30]);
1117
1118        let chunk_offsets = taken.chunk_offsets().as_ref().unwrap().to_primitive();
1119        assert_eq!(chunk_offsets.as_slice::<u64>(), &[0]);
1120    }
1121
1122    #[test]
1123    fn take_search_chunked_indices_with_no_patches() {
1124        let patches = Patches::new(
1125            20,
1126            0,
1127            buffer![2u64, 9, 15].into_array(),
1128            buffer![33_i32, 44, 55].into_array(),
1129            Some(buffer![0u64].into_array()),
1130        );
1131
1132        let taken = patches
1133            .take_search(
1134                PrimitiveArray::new(buffer![3, 4, 5], Validity::AllValid),
1135                true,
1136            )
1137            .unwrap();
1138
1139        assert!(taken.is_none());
1140    }
1141
1142    #[test]
1143    fn take_search_chunked_interleaved() {
1144        let patches = Patches::new(
1145            30,
1146            0,
1147            buffer![5u64, 10, 20, 25].into_array(),
1148            buffer![100_i32, 200, 300, 400].into_array(),
1149            Some(buffer![0u64].into_array()),
1150        );
1151
1152        let taken = patches
1153            .take_search(
1154                PrimitiveArray::new(buffer![10, 15, 20, 99], Validity::AllValid),
1155                true,
1156            )
1157            .unwrap()
1158            .unwrap();
1159
1160        let primitive_values = taken.values().to_primitive();
1161        assert_eq!(taken.array_len(), 4);
1162        assert_eq!(primitive_values.as_slice::<i32>(), [200, 300]);
1163
1164        let chunk_offsets = taken.chunk_offsets().as_ref().unwrap().to_primitive();
1165        assert_eq!(chunk_offsets.as_slice::<u64>(), &[0]);
1166    }
1167
1168    #[test]
1169    fn test_take_search_multiple_chunk_offsets() {
1170        let patches = Patches::new(
1171            1500,
1172            0,
1173            BufferMut::from_iter(0..1500u64).into_array(),
1174            BufferMut::from_iter(0..1500i32).into_array(),
1175            Some(buffer![0u64, 1024u64].into_array()),
1176        );
1177
1178        let taken = patches
1179            .take_search(
1180                PrimitiveArray::new(BufferMut::from_iter(0..1500u64), Validity::AllValid),
1181                false,
1182            )
1183            .unwrap()
1184            .unwrap();
1185
1186        assert_eq!(taken.array_len(), 1500);
1187        assert_eq!(
1188            taken
1189                .chunk_offsets()
1190                .as_ref()
1191                .unwrap()
1192                .to_primitive()
1193                .as_slice::<u64>(),
1194            &[0, 1024]
1195        );
1196    }
1197
1198    #[test]
1199    fn test_slice() {
1200        let values = buffer![15_u32, 135, 13531, 42].into_array();
1201        let indices = buffer![10_u64, 11, 50, 100].into_array();
1202
1203        let patches = Patches::new(101, 0, indices, values, None);
1204
1205        let sliced = patches.slice(15..100).unwrap();
1206        assert_eq!(sliced.array_len(), 100 - 15);
1207        let primitive = sliced.values().to_primitive();
1208
1209        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1210    }
1211
1212    #[test]
1213    fn doubly_sliced() {
1214        let values = buffer![15_u32, 135, 13531, 42].into_array();
1215        let indices = buffer![10_u64, 11, 50, 100].into_array();
1216
1217        let patches = Patches::new(101, 0, indices, values, None);
1218
1219        let sliced = patches.slice(15..100).unwrap();
1220        assert_eq!(sliced.array_len(), 100 - 15);
1221        let primitive = sliced.values().to_primitive();
1222
1223        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1224
1225        let doubly_sliced = sliced.slice(35..36).unwrap();
1226        let primitive_doubly_sliced = doubly_sliced.values().to_primitive();
1227
1228        assert_eq!(primitive_doubly_sliced.as_slice::<u32>(), &[13531]);
1229    }
1230
1231    #[test]
1232    fn test_mask_all_true() {
1233        let patches = Patches::new(
1234            10,
1235            0,
1236            buffer![2u64, 5, 8].into_array(),
1237            buffer![100i32, 200, 300].into_array(),
1238            None,
1239        );
1240
1241        let mask = Mask::new_true(10);
1242        let masked = patches.mask(&mask).unwrap();
1243        assert!(masked.is_none());
1244    }
1245
1246    #[test]
1247    fn test_mask_all_false() {
1248        let patches = Patches::new(
1249            10,
1250            0,
1251            buffer![2u64, 5, 8].into_array(),
1252            buffer![100i32, 200, 300].into_array(),
1253            None,
1254        );
1255
1256        let mask = Mask::new_false(10);
1257        let masked = patches.mask(&mask).unwrap().unwrap();
1258
1259        // No patch values should be masked
1260        let masked_values = masked.values().to_primitive();
1261        assert_eq!(masked_values.as_slice::<i32>(), &[100, 200, 300]);
1262        assert!(masked_values.is_valid(0));
1263        assert!(masked_values.is_valid(1));
1264        assert!(masked_values.is_valid(2));
1265
1266        // Indices should remain unchanged
1267        let indices = masked.indices().to_primitive();
1268        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1269    }
1270
1271    #[test]
1272    fn test_mask_partial() {
1273        let patches = Patches::new(
1274            10,
1275            0,
1276            buffer![2u64, 5, 8].into_array(),
1277            buffer![100i32, 200, 300].into_array(),
1278            None,
1279        );
1280
1281        // Mask that removes patches at indices 2 and 8 (but not 5)
1282        let mask = Mask::from_iter([
1283            false, false, true, false, false, false, false, false, true, false,
1284        ]);
1285        let masked = patches.mask(&mask).unwrap().unwrap();
1286
1287        // Only the patch at index 5 should remain
1288        let masked_values = masked.values().to_primitive();
1289        assert_eq!(masked_values.len(), 1);
1290        assert_eq!(masked_values.as_slice::<i32>(), &[200]);
1291
1292        // Only index 5 should remain
1293        let indices = masked.indices().to_primitive();
1294        assert_eq!(indices.as_slice::<u64>(), &[5]);
1295    }
1296
1297    #[test]
1298    fn test_mask_with_offset() {
1299        let patches = Patches::new(
1300            10,
1301            5,                                  // offset
1302            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1303            buffer![100i32, 200, 300].into_array(),
1304            None,
1305        );
1306
1307        // Mask that sets actual index 2 to null
1308        let mask = Mask::from_iter([
1309            false, false, true, false, false, false, false, false, false, false,
1310        ]);
1311
1312        let masked = patches.mask(&mask).unwrap().unwrap();
1313        assert_eq!(masked.array_len(), 10);
1314        assert_eq!(masked.offset(), 5);
1315        let indices = masked.indices().to_primitive();
1316        assert_eq!(indices.as_slice::<u64>(), &[10, 13]);
1317        let masked_values = masked.values().to_primitive();
1318        assert_eq!(masked_values.as_slice::<i32>(), &[200, 300]);
1319    }
1320
1321    #[test]
1322    fn test_mask_nullable_values() {
1323        let patches = Patches::new(
1324            10,
1325            0,
1326            buffer![2u64, 5, 8].into_array(),
1327            PrimitiveArray::from_option_iter([Some(100i32), None, Some(300)]).into_array(),
1328            None,
1329        );
1330
1331        // Test masking removes patch at index 2
1332        let mask = Mask::from_iter([
1333            false, false, true, false, false, false, false, false, false, false,
1334        ]);
1335        let masked = patches.mask(&mask).unwrap().unwrap();
1336
1337        // Patches at indices 5 and 8 should remain
1338        let indices = masked.indices().to_primitive();
1339        assert_eq!(indices.as_slice::<u64>(), &[5, 8]);
1340
1341        // Values should be the null and 300
1342        let masked_values = masked.values().to_primitive();
1343        assert_eq!(masked_values.len(), 2);
1344        assert!(!masked_values.is_valid(0)); // the null value at index 5
1345        assert!(masked_values.is_valid(1)); // the 300 value at index 8
1346        assert_eq!(i32::try_from(&masked_values.scalar_at(1)).unwrap(), 300i32);
1347    }
1348
1349    #[test]
1350    fn test_filter_keep_all() {
1351        let patches = Patches::new(
1352            10,
1353            0,
1354            buffer![2u64, 5, 8].into_array(),
1355            buffer![100i32, 200, 300].into_array(),
1356            None,
1357        );
1358
1359        // Keep all indices (mask with indices 0-9)
1360        let mask = Mask::from_indices(10, (0..10).collect());
1361        let filtered = patches.filter(&mask).unwrap().unwrap();
1362
1363        let indices = filtered.indices().to_primitive();
1364        let values = filtered.values().to_primitive();
1365        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1366        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1367    }
1368
1369    #[test]
1370    fn test_filter_none() {
1371        let patches = Patches::new(
1372            10,
1373            0,
1374            buffer![2u64, 5, 8].into_array(),
1375            buffer![100i32, 200, 300].into_array(),
1376            None,
1377        );
1378
1379        // Filter out all (empty mask means keep nothing)
1380        let mask = Mask::from_indices(10, vec![]);
1381        let filtered = patches.filter(&mask).unwrap();
1382        assert!(filtered.is_none());
1383    }
1384
1385    #[test]
1386    fn test_filter_with_indices() {
1387        let patches = Patches::new(
1388            10,
1389            0,
1390            buffer![2u64, 5, 8].into_array(),
1391            buffer![100i32, 200, 300].into_array(),
1392            None,
1393        );
1394
1395        // Keep indices 2, 5, 9 (so patches at 2 and 5 remain)
1396        let mask = Mask::from_indices(10, vec![2, 5, 9]);
1397        let filtered = patches.filter(&mask).unwrap().unwrap();
1398
1399        let indices = filtered.indices().to_primitive();
1400        let values = filtered.values().to_primitive();
1401        assert_eq!(indices.as_slice::<u64>(), &[0, 1]); // Adjusted indices
1402        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1403    }
1404
1405    #[test]
1406    fn test_slice_full_range() {
1407        let patches = Patches::new(
1408            10,
1409            0,
1410            buffer![2u64, 5, 8].into_array(),
1411            buffer![100i32, 200, 300].into_array(),
1412            None,
1413        );
1414
1415        let sliced = patches.slice(0..10).unwrap();
1416
1417        let indices = sliced.indices().to_primitive();
1418        let values = sliced.values().to_primitive();
1419        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1420        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1421    }
1422
1423    #[test]
1424    fn test_slice_partial() {
1425        let patches = Patches::new(
1426            10,
1427            0,
1428            buffer![2u64, 5, 8].into_array(),
1429            buffer![100i32, 200, 300].into_array(),
1430            None,
1431        );
1432
1433        // Slice from 3 to 8 (includes patch at 5)
1434        let sliced = patches.slice(3..8).unwrap();
1435
1436        let indices = sliced.indices().to_primitive();
1437        let values = sliced.values().to_primitive();
1438        assert_eq!(indices.as_slice::<u64>(), &[5]); // Index stays the same
1439        assert_eq!(values.as_slice::<i32>(), &[200]);
1440        assert_eq!(sliced.array_len(), 5); // 8 - 3 = 5
1441        assert_eq!(sliced.offset(), 3); // New offset
1442    }
1443
1444    #[test]
1445    fn test_slice_no_patches() {
1446        let patches = Patches::new(
1447            10,
1448            0,
1449            buffer![2u64, 5, 8].into_array(),
1450            buffer![100i32, 200, 300].into_array(),
1451            None,
1452        );
1453
1454        // Slice from 6 to 7 (no patches in this range)
1455        let sliced = patches.slice(6..7);
1456        assert!(sliced.is_none());
1457    }
1458
1459    #[test]
1460    fn test_slice_with_offset() {
1461        let patches = Patches::new(
1462            10,
1463            5,                                  // offset
1464            buffer![7u64, 10, 13].into_array(), // actual indices are 2, 5, 8
1465            buffer![100i32, 200, 300].into_array(),
1466            None,
1467        );
1468
1469        // Slice from 3 to 8 (includes patch at actual index 5)
1470        let sliced = patches.slice(3..8).unwrap();
1471
1472        let indices = sliced.indices().to_primitive();
1473        let values = sliced.values().to_primitive();
1474        assert_eq!(indices.as_slice::<u64>(), &[10]); // Index stays the same (offset + 5 = 10)
1475        assert_eq!(values.as_slice::<i32>(), &[200]);
1476        assert_eq!(sliced.offset(), 8); // New offset = 5 + 3
1477    }
1478
1479    #[test]
1480    fn test_patch_values() {
1481        let patches = Patches::new(
1482            10,
1483            0,
1484            buffer![2u64, 5, 8].into_array(),
1485            buffer![100i32, 200, 300].into_array(),
1486            None,
1487        );
1488
1489        let values = patches.values().to_primitive();
1490        assert_eq!(i32::try_from(&values.scalar_at(0)).unwrap(), 100i32);
1491        assert_eq!(i32::try_from(&values.scalar_at(1)).unwrap(), 200i32);
1492        assert_eq!(i32::try_from(&values.scalar_at(2)).unwrap(), 300i32);
1493    }
1494
1495    #[test]
1496    fn test_indices_range() {
1497        let patches = Patches::new(
1498            10,
1499            0,
1500            buffer![2u64, 5, 8].into_array(),
1501            buffer![100i32, 200, 300].into_array(),
1502            None,
1503        );
1504
1505        assert_eq!(patches.min_index(), 2);
1506        assert_eq!(patches.max_index(), 8);
1507    }
1508
1509    #[test]
1510    fn test_search_index() {
1511        let patches = Patches::new(
1512            10,
1513            0,
1514            buffer![2u64, 5, 8].into_array(),
1515            buffer![100i32, 200, 300].into_array(),
1516            None,
1517        );
1518
1519        // Search for exact indices
1520        assert_eq!(patches.search_index(2), SearchResult::Found(0));
1521        assert_eq!(patches.search_index(5), SearchResult::Found(1));
1522        assert_eq!(patches.search_index(8), SearchResult::Found(2));
1523
1524        // Search for non-patch indices
1525        assert_eq!(patches.search_index(0), SearchResult::NotFound(0));
1526        assert_eq!(patches.search_index(3), SearchResult::NotFound(1));
1527        assert_eq!(patches.search_index(6), SearchResult::NotFound(2));
1528        assert_eq!(patches.search_index(9), SearchResult::NotFound(3));
1529    }
1530
1531    #[test]
1532    fn test_mask_boundary_patches() {
1533        // Test masking patches at array boundaries
1534        let patches = Patches::new(
1535            10,
1536            0,
1537            buffer![0u64, 9].into_array(),
1538            buffer![100i32, 200].into_array(),
1539            None,
1540        );
1541
1542        let mask = Mask::from_iter([
1543            true, false, false, false, false, false, false, false, false, false,
1544        ]);
1545        let masked = patches.mask(&mask).unwrap();
1546        assert!(masked.is_some());
1547        let masked = masked.unwrap();
1548        let indices = masked.indices().to_primitive();
1549        assert_eq!(indices.as_slice::<u64>(), &[9]);
1550        let values = masked.values().to_primitive();
1551        assert_eq!(values.as_slice::<i32>(), &[200]);
1552    }
1553
1554    #[test]
1555    fn test_mask_all_patches_removed() {
1556        // Test when all patches are masked out
1557        let patches = Patches::new(
1558            10,
1559            0,
1560            buffer![2u64, 5, 8].into_array(),
1561            buffer![100i32, 200, 300].into_array(),
1562            None,
1563        );
1564
1565        // Mask that removes all patches
1566        let mask = Mask::from_iter([
1567            false, false, true, false, false, true, false, false, true, false,
1568        ]);
1569        let masked = patches.mask(&mask).unwrap();
1570        assert!(masked.is_none());
1571    }
1572
1573    #[test]
1574    fn test_mask_no_patches_removed() {
1575        // Test when no patches are masked
1576        let patches = Patches::new(
1577            10,
1578            0,
1579            buffer![2u64, 5, 8].into_array(),
1580            buffer![100i32, 200, 300].into_array(),
1581            None,
1582        );
1583
1584        // Mask that doesn't affect any patches
1585        let mask = Mask::from_iter([
1586            true, false, false, true, false, false, true, false, false, true,
1587        ]);
1588        let masked = patches.mask(&mask).unwrap().unwrap();
1589
1590        let indices = masked.indices().to_primitive();
1591        assert_eq!(indices.as_slice::<u64>(), &[2, 5, 8]);
1592        let values = masked.values().to_primitive();
1593        assert_eq!(values.as_slice::<i32>(), &[100, 200, 300]);
1594    }
1595
1596    #[test]
1597    fn test_mask_single_patch() {
1598        // Test with a single patch
1599        let patches = Patches::new(
1600            5,
1601            0,
1602            buffer![2u64].into_array(),
1603            buffer![42i32].into_array(),
1604            None,
1605        );
1606
1607        // Mask that removes the single patch
1608        let mask = Mask::from_iter([false, false, true, false, false]);
1609        let masked = patches.mask(&mask).unwrap();
1610        assert!(masked.is_none());
1611
1612        // Mask that keeps the single patch
1613        let mask = Mask::from_iter([true, false, false, true, false]);
1614        let masked = patches.mask(&mask).unwrap().unwrap();
1615        let indices = masked.indices().to_primitive();
1616        assert_eq!(indices.as_slice::<u64>(), &[2]);
1617    }
1618
1619    #[test]
1620    fn test_mask_contiguous_patches() {
1621        // Test with contiguous patches
1622        let patches = Patches::new(
1623            10,
1624            0,
1625            buffer![3u64, 4, 5, 6].into_array(),
1626            buffer![100i32, 200, 300, 400].into_array(),
1627            None,
1628        );
1629
1630        // Mask that removes middle patches
1631        let mask = Mask::from_iter([
1632            false, false, false, false, true, true, false, false, false, false,
1633        ]);
1634        let masked = patches.mask(&mask).unwrap().unwrap();
1635
1636        let indices = masked.indices().to_primitive();
1637        assert_eq!(indices.as_slice::<u64>(), &[3, 6]);
1638        let values = masked.values().to_primitive();
1639        assert_eq!(values.as_slice::<i32>(), &[100, 400]);
1640    }
1641
1642    #[test]
1643    fn test_mask_with_large_offset() {
1644        // Test with a large offset that shifts all indices
1645        let patches = Patches::new(
1646            20,
1647            15,
1648            buffer![16u64, 17, 19].into_array(), // actual indices are 1, 2, 4
1649            buffer![100i32, 200, 300].into_array(),
1650            None,
1651        );
1652
1653        // Mask that removes the patch at actual index 2
1654        let mask = Mask::from_iter([
1655            false, false, true, false, false, false, false, false, false, false, false, false,
1656            false, false, false, false, false, false, false, false,
1657        ]);
1658        let masked = patches.mask(&mask).unwrap().unwrap();
1659
1660        let indices = masked.indices().to_primitive();
1661        assert_eq!(indices.as_slice::<u64>(), &[16, 19]);
1662        let values = masked.values().to_primitive();
1663        assert_eq!(values.as_slice::<i32>(), &[100, 300]);
1664    }
1665
1666    #[test]
1667    #[should_panic(expected = "Filter mask length 5 does not match array length 10")]
1668    fn test_mask_wrong_length() {
1669        let patches = Patches::new(
1670            10,
1671            0,
1672            buffer![2u64, 5, 8].into_array(),
1673            buffer![100i32, 200, 300].into_array(),
1674            None,
1675        );
1676
1677        // Mask with wrong length
1678        let mask = Mask::from_iter([false, false, true, false, false]);
1679        let _ = patches.mask(&mask).unwrap();
1680    }
1681
1682    #[test]
1683    fn test_chunk_offsets_search() {
1684        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1685        let values = buffer![10i32, 20, 30, 40].into_array();
1686        let chunk_offsets = buffer![0u64, 2, 2, 3].into_array();
1687        let patches = Patches::new(4096, 0, indices, values, Some(chunk_offsets));
1688
1689        assert!(patches.chunk_offsets.is_some());
1690
1691        // chunk 0: patches at 100, 200
1692        assert_eq!(patches.search_index(100), SearchResult::Found(0));
1693        assert_eq!(patches.search_index(200), SearchResult::Found(1));
1694
1695        // chunks 1, 2: no patches
1696        assert_eq!(patches.search_index(1500), SearchResult::NotFound(2));
1697        assert_eq!(patches.search_index(2000), SearchResult::NotFound(2));
1698
1699        // chunk 3: patches at 3000, 3100
1700        assert_eq!(patches.search_index(3000), SearchResult::Found(2));
1701        assert_eq!(patches.search_index(3100), SearchResult::Found(3));
1702
1703        assert_eq!(patches.search_index(1024), SearchResult::NotFound(2));
1704    }
1705
1706    #[test]
1707    fn test_chunk_offsets_with_slice() {
1708        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1709        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1710        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1711        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1712
1713        let sliced = patches.slice(1000..2200).unwrap();
1714
1715        assert!(sliced.chunk_offsets.is_some());
1716        assert_eq!(sliced.offset(), 1000);
1717
1718        assert_eq!(sliced.search_index(200), SearchResult::Found(0));
1719        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1720        assert_eq!(sliced.search_index(1100), SearchResult::Found(4));
1721
1722        assert_eq!(sliced.search_index(250), SearchResult::NotFound(1));
1723    }
1724
1725    #[test]
1726    fn test_chunk_offsets_with_slice_after_first_chunk() {
1727        let indices = buffer![100u64, 500, 1200, 1300, 1500, 1800, 2100, 2500].into_array();
1728        let values = buffer![10i32, 20, 30, 35, 40, 45, 50, 60].into_array();
1729        let chunk_offsets = buffer![0u64, 2, 6].into_array();
1730        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1731
1732        let sliced = patches.slice(1300..2200).unwrap();
1733
1734        assert!(sliced.chunk_offsets.is_some());
1735        assert_eq!(sliced.offset(), 1300);
1736
1737        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1738        assert_eq!(sliced.search_index(200), SearchResult::Found(1));
1739        assert_eq!(sliced.search_index(500), SearchResult::Found(2));
1740        assert_eq!(sliced.search_index(250), SearchResult::NotFound(2));
1741        assert_eq!(sliced.search_index(900), SearchResult::NotFound(4));
1742    }
1743
1744    #[test]
1745    fn test_chunk_offsets_slice_empty_result() {
1746        let indices = buffer![100u64, 200, 3000, 3100].into_array();
1747        let values = buffer![10i32, 20, 30, 40].into_array();
1748        let chunk_offsets = buffer![0u64, 2].into_array();
1749        let patches = Patches::new(4000, 0, indices, values, Some(chunk_offsets));
1750
1751        let sliced = patches.slice(1000..2000);
1752        assert!(sliced.is_none());
1753    }
1754
1755    #[test]
1756    fn test_chunk_offsets_slice_single_patch() {
1757        let indices = buffer![100u64, 1200, 1300, 2500].into_array();
1758        let values = buffer![10i32, 20, 30, 40].into_array();
1759        let chunk_offsets = buffer![0u64, 1, 3].into_array();
1760        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1761
1762        let sliced = patches.slice(1100..1250).unwrap();
1763
1764        assert_eq!(sliced.num_patches(), 1);
1765        assert_eq!(sliced.offset(), 1100);
1766        assert_eq!(sliced.search_index(100), SearchResult::Found(0)); // 1200 - 1100 = 100
1767        assert_eq!(sliced.search_index(50), SearchResult::NotFound(0));
1768        assert_eq!(sliced.search_index(150), SearchResult::NotFound(1));
1769    }
1770
1771    #[test]
1772    fn test_chunk_offsets_slice_across_chunks() {
1773        let indices = buffer![100u64, 200, 1100, 1200, 2100, 2200].into_array();
1774        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1775        let chunk_offsets = buffer![0u64, 2, 4].into_array();
1776        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1777
1778        let sliced = patches.slice(150..2150).unwrap();
1779
1780        assert_eq!(sliced.num_patches(), 4);
1781        assert_eq!(sliced.offset(), 150);
1782
1783        assert_eq!(sliced.search_index(50), SearchResult::Found(0)); // 200 - 150 = 50
1784        assert_eq!(sliced.search_index(950), SearchResult::Found(1)); // 1100 - 150 = 950
1785        assert_eq!(sliced.search_index(1050), SearchResult::Found(2)); // 1200 - 150 = 1050
1786        assert_eq!(sliced.search_index(1950), SearchResult::Found(3)); // 2100 - 150 = 1950
1787    }
1788
1789    #[test]
1790    fn test_chunk_offsets_boundary_searches() {
1791        let indices = buffer![1023u64, 1024, 1025, 2047, 2048].into_array();
1792        let values = buffer![10i32, 20, 30, 40, 50].into_array();
1793        let chunk_offsets = buffer![0u64, 1, 4].into_array();
1794        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1795
1796        assert_eq!(patches.search_index(1023), SearchResult::Found(0));
1797        assert_eq!(patches.search_index(1024), SearchResult::Found(1));
1798        assert_eq!(patches.search_index(1025), SearchResult::Found(2));
1799        assert_eq!(patches.search_index(2047), SearchResult::Found(3));
1800        assert_eq!(patches.search_index(2048), SearchResult::Found(4));
1801
1802        assert_eq!(patches.search_index(1022), SearchResult::NotFound(0));
1803        assert_eq!(patches.search_index(2046), SearchResult::NotFound(3));
1804    }
1805
1806    #[test]
1807    fn test_chunk_offsets_slice_edge_cases() {
1808        let indices = buffer![0u64, 1, 1023, 1024, 2047, 2048].into_array();
1809        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1810        let chunk_offsets = buffer![0u64, 3, 5].into_array();
1811        let patches = Patches::new(3000, 0, indices, values, Some(chunk_offsets));
1812
1813        // Slice at the very beginning
1814        let sliced = patches.slice(0..10).unwrap();
1815        assert_eq!(sliced.num_patches(), 2);
1816        assert_eq!(sliced.search_index(0), SearchResult::Found(0));
1817        assert_eq!(sliced.search_index(1), SearchResult::Found(1));
1818
1819        // Slice at the very end
1820        let sliced = patches.slice(2040..3000).unwrap();
1821        assert_eq!(sliced.num_patches(), 2); // patches at 2047 and 2048
1822        assert_eq!(sliced.search_index(7), SearchResult::Found(0)); // 2047 - 2040
1823        assert_eq!(sliced.search_index(8), SearchResult::Found(1)); // 2048 - 2040
1824    }
1825
1826    #[test]
1827    fn test_chunk_offsets_slice_nested() {
1828        let indices = buffer![100u64, 200, 300, 400, 500, 600].into_array();
1829        let values = buffer![10i32, 20, 30, 40, 50, 60].into_array();
1830        let chunk_offsets = buffer![0u64].into_array();
1831        let patches = Patches::new(1000, 0, indices, values, Some(chunk_offsets));
1832
1833        let sliced1 = patches.slice(150..550).unwrap();
1834        assert_eq!(sliced1.num_patches(), 4); // 200, 300, 400, 500
1835
1836        let sliced2 = sliced1.slice(100..250).unwrap();
1837        assert_eq!(sliced2.num_patches(), 1); // 300
1838        assert_eq!(sliced2.offset(), 250);
1839
1840        assert_eq!(sliced2.search_index(50), SearchResult::Found(0)); // 300 - 250
1841        assert_eq!(sliced2.search_index(150), SearchResult::NotFound(1));
1842    }
1843
1844    #[test]
1845    fn test_index_larger_than_length() {
1846        let chunk_offsets = buffer![0u64].into_array();
1847        let indices = buffer![1023u64].into_array();
1848        let values = buffer![42i32].into_array();
1849        let patches = Patches::new(1024, 0, indices, values, Some(chunk_offsets));
1850        assert_eq!(patches.search_index(2048), SearchResult::NotFound(1));
1851    }
1852}