Skip to main content

lance_select/
mask.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashSet;
5use std::io::Write;
6use std::ops::{Range, RangeBounds, RangeInclusive};
7use std::{collections::BTreeMap, io::Read};
8
9use arrow_array::{Array, BinaryArray, GenericBinaryArray};
10use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer};
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use itertools::Itertools;
13use lance_core::deepsize::DeepSizeOf;
14use roaring::{MultiOps, RoaringBitmap, RoaringTreemap};
15
16use lance_core::cache::{CacheCodecImpl, CacheEntryReader, CacheEntryWriter};
17use lance_core::utils::address::RowAddress;
18use lance_core::{Error, Result};
19
20mod nullable;
21
22pub use nullable::{NullableRowAddrMask, NullableRowAddrSet};
23
24/// A mask that selects or deselects rows based on an allow-list or block-list.
25#[derive(Clone, Debug, DeepSizeOf, PartialEq)]
26pub enum RowAddrMask {
27    AllowList(RowAddrTreeMap),
28    BlockList(RowAddrTreeMap),
29}
30
31impl Default for RowAddrMask {
32    fn default() -> Self {
33        // Empty block list means all rows are allowed
34        Self::BlockList(RowAddrTreeMap::new())
35    }
36}
37
38impl RowAddrMask {
39    // Create a mask allowing all rows, this is an alias for [default]
40    pub fn all_rows() -> Self {
41        Self::default()
42    }
43
44    // Create a mask that doesn't allow anything
45    pub fn allow_nothing() -> Self {
46        Self::AllowList(RowAddrTreeMap::new())
47    }
48
49    // Create a mask from an allow list
50    pub fn from_allowed(allow_list: RowAddrTreeMap) -> Self {
51        Self::AllowList(allow_list)
52    }
53
54    // Create a mask from a block list
55    pub fn from_block(block_list: RowAddrTreeMap) -> Self {
56        Self::BlockList(block_list)
57    }
58
59    pub fn block_list(&self) -> Option<&RowAddrTreeMap> {
60        match self {
61            Self::BlockList(block_list) => Some(block_list),
62            _ => None,
63        }
64    }
65
66    pub fn allow_list(&self) -> Option<&RowAddrTreeMap> {
67        match self {
68            Self::AllowList(allow_list) => Some(allow_list),
69            _ => None,
70        }
71    }
72
73    /// True if the row_id is selected by the mask, false otherwise
74    pub fn selected(&self, row_id: u64) -> bool {
75        match self {
76            Self::AllowList(allow_list) => allow_list.contains(row_id),
77            Self::BlockList(block_list) => !block_list.contains(row_id),
78        }
79    }
80
81    /// True if every row_id is selected. Lets callers (e.g. the FTS wand
82    /// loop) skip per-row mask checks entirely, which in turn lets the
83    /// deferred-row_id scoring path skip loading the row_id column.
84    pub fn is_select_all(&self) -> bool {
85        matches!(self, Self::BlockList(b) if b.is_empty())
86    }
87
88    /// Return the indices of the input row ids that were valid
89    pub fn selected_indices<'a>(&self, row_ids: impl Iterator<Item = &'a u64> + 'a) -> Vec<u64> {
90        row_ids
91            .enumerate()
92            .filter_map(|(idx, row_id)| {
93                if self.selected(*row_id) {
94                    Some(idx as u64)
95                } else {
96                    None
97                }
98            })
99            .collect()
100    }
101
102    /// Also block the given addrs
103    pub fn also_block(self, block_list: RowAddrTreeMap) -> Self {
104        match self {
105            Self::AllowList(allow_list) => Self::AllowList(allow_list - block_list),
106            Self::BlockList(existing) => Self::BlockList(existing | block_list),
107        }
108    }
109
110    /// Also allow the given addrs
111    pub fn also_allow(self, allow_list: RowAddrTreeMap) -> Self {
112        match self {
113            Self::AllowList(existing) => Self::AllowList(existing | allow_list),
114            Self::BlockList(block_list) => Self::BlockList(block_list - allow_list),
115        }
116    }
117
118    /// Convert a mask into an arrow array
119    ///
120    /// A row addr mask is not very arrow-compatible.  We can't make it a batch with
121    /// two columns because the block list and allow list will have different lengths.  Also,
122    /// there is no Arrow type for compressed bitmaps.
123    ///
124    /// However, we need to shove it into some kind of Arrow container to pass it along the
125    /// datafusion stream.  Perhaps, in the future, we can add row addr masks as first class
126    /// types in datafusion, and this can be passed along as a mask / selection vector.
127    ///
128    /// We serialize this as a variable length binary array with two items.  The first item
129    /// is the block list and the second item is the allow list.
130    pub fn into_arrow(&self) -> Result<BinaryArray> {
131        // NOTE: This serialization format must be stable as it is used in IPC.
132        let (block_list, allow_list) = match self {
133            Self::AllowList(allow_list) => (None, Some(allow_list)),
134            Self::BlockList(block_list) => (Some(block_list), None),
135        };
136
137        let block_list_length = block_list
138            .as_ref()
139            .map(|bl| bl.serialized_size())
140            .unwrap_or(0);
141        let allow_list_length = allow_list
142            .as_ref()
143            .map(|al| al.serialized_size())
144            .unwrap_or(0);
145        let lengths = vec![block_list_length, allow_list_length];
146        let offsets = OffsetBuffer::from_lengths(lengths);
147        let mut value_bytes = vec![0; block_list_length + allow_list_length];
148        let mut validity = vec![false, false];
149        if let Some(block_list) = &block_list {
150            validity[0] = true;
151            block_list.serialize_into(&mut value_bytes[0..])?;
152        }
153        if let Some(allow_list) = &allow_list {
154            validity[1] = true;
155            allow_list.serialize_into(&mut value_bytes[block_list_length..])?;
156        }
157        let values = Buffer::from(value_bytes);
158        let nulls = NullBuffer::from(validity);
159        Ok(BinaryArray::try_new(offsets, values, Some(nulls))?)
160    }
161
162    /// Deserialize a row address mask from Arrow
163    pub fn from_arrow(array: &GenericBinaryArray<i32>) -> Result<Self> {
164        let block_list = if array.is_null(0) {
165            None
166        } else {
167            Some(RowAddrTreeMap::deserialize_from(array.value(0)))
168        }
169        .transpose()?;
170
171        let allow_list = if array.is_null(1) {
172            None
173        } else {
174            Some(RowAddrTreeMap::deserialize_from(array.value(1)))
175        }
176        .transpose()?;
177
178        let res = match (block_list, allow_list) {
179            (Some(bl), None) => Self::BlockList(bl),
180            (None, Some(al)) => Self::AllowList(al),
181            (Some(block), Some(allow)) => Self::AllowList(allow).also_block(block),
182            (None, None) => Self::all_rows(),
183        };
184        Ok(res)
185    }
186
187    /// Return the maximum number of row addresses that could be selected by this mask
188    ///
189    /// Will be None if this is a BlockList (unbounded)
190    pub fn max_len(&self) -> Option<u64> {
191        match self {
192            Self::AllowList(selection) => selection.len(),
193            Self::BlockList(_) => None,
194        }
195    }
196
197    /// Iterate over the row addresses that are selected by the mask
198    ///
199    /// This is only possible if this is an AllowList and the maps don't contain
200    /// any "full fragment" blocks.
201    pub fn iter_addrs(&self) -> Option<Box<dyn Iterator<Item = RowAddress> + '_>> {
202        match self {
203            Self::AllowList(allow_list) => {
204                if let Some(allow_iter) = allow_list.row_addrs() {
205                    Some(Box::new(allow_iter))
206                } else {
207                    None
208                }
209            }
210            Self::BlockList(_) => None, // Can't iterate over block list
211        }
212    }
213}
214
215impl std::ops::Not for RowAddrMask {
216    type Output = Self;
217
218    fn not(self) -> Self::Output {
219        match self {
220            Self::AllowList(allow_list) => Self::BlockList(allow_list),
221            Self::BlockList(block_list) => Self::AllowList(block_list),
222        }
223    }
224}
225
226impl std::ops::BitAnd for RowAddrMask {
227    type Output = Self;
228
229    fn bitand(self, rhs: Self) -> Self::Output {
230        match (self, rhs) {
231            (Self::AllowList(a), Self::AllowList(b)) => Self::AllowList(a & b),
232            (Self::AllowList(allow), Self::BlockList(block))
233            | (Self::BlockList(block), Self::AllowList(allow)) => Self::AllowList(allow - block),
234            (Self::BlockList(a), Self::BlockList(b)) => Self::BlockList(a | b),
235        }
236    }
237}
238
239impl std::ops::BitOr for RowAddrMask {
240    type Output = Self;
241
242    fn bitor(self, rhs: Self) -> Self::Output {
243        match (self, rhs) {
244            (Self::AllowList(a), Self::AllowList(b)) => Self::AllowList(a | b),
245            (Self::AllowList(allow), Self::BlockList(block))
246            | (Self::BlockList(block), Self::AllowList(allow)) => Self::BlockList(block - allow),
247            (Self::BlockList(a), Self::BlockList(b)) => Self::BlockList(a & b),
248        }
249    }
250}
251
252/// Common operations over a set of rows (either row ids or row addresses).
253///
254/// The concrete representation can be address-based (`RowAddrTreeMap`) or
255/// id-based (for example a future `RowIdSet`), but the semantics are the same:
256/// a set of unique rows.
257pub trait RowSetOps: Clone + Sized {
258    /// Logical row handle (`u64` for both row ids and row addresses).
259    type Row;
260
261    /// Returns true if the set is empty.
262    fn is_empty(&self) -> bool;
263
264    /// Returns the number of rows in the set, if it is known.
265    ///
266    /// Implementations that cannot always compute an exact size (for example
267    /// because of "full fragment" markers) should return `None`.
268    fn len(&self) -> Option<u64>;
269
270    /// Remove a value from the row set.
271    fn remove(&mut self, row: Self::Row) -> bool;
272
273    /// Returns whether this set contains the given row.
274    fn contains(&self, row: Self::Row) -> bool;
275
276    /// Returns the union of `other` and init self.
277    fn union_all(other: &[&Self]) -> Self;
278
279    /// Builds a row set from an iterator of rows.
280    fn from_sorted_iter<I>(iter: I) -> Result<Self>
281    where
282        I: IntoIterator<Item = Self::Row>;
283}
284
285/// A collection of row addresses.
286///
287/// Note: For stable row id mode, this may be split into a separate structure in the future.
288///
289/// These row ids may either be stable-style (where they can be an incrementing
290/// u64 sequence) or address style, where they are a fragment id and a row offset.
291/// When address style, this supports setting entire fragments as selected,
292/// without needing to enumerate all the ids in the fragment.
293///
294/// This is similar to a [RoaringTreemap] but it is optimized for the case where
295/// entire fragments are selected or deselected.
296#[derive(Clone, Debug, Default, PartialEq, DeepSizeOf)]
297pub struct RowAddrTreeMap {
298    /// The contents of the set. If there is a pair (k, Full) then the entire
299    /// fragment k is selected. If there is a pair (k, Partial(v)) then the
300    /// fragment k has the selected rows in v.
301    inner: BTreeMap<u32, RowAddrSelection>,
302}
303
304#[derive(Clone, Debug, PartialEq)]
305pub enum RowAddrSelection {
306    Full,
307    Partial(RoaringBitmap),
308}
309
310impl DeepSizeOf for RowAddrSelection {
311    fn deep_size_of_children(&self, _context: &mut lance_core::deepsize::Context) -> usize {
312        match self {
313            Self::Full => 0,
314            Self::Partial(bitmap) => bitmap.serialized_size(),
315        }
316    }
317}
318
319impl RowAddrSelection {
320    fn union_all(selections: &[&Self]) -> Self {
321        let mut is_full = false;
322
323        let res = Self::Partial(
324            selections
325                .iter()
326                .filter_map(|selection| match selection {
327                    Self::Full => {
328                        is_full = true;
329                        None
330                    }
331                    Self::Partial(bitmap) => Some(bitmap),
332                })
333                .union(),
334        );
335
336        if is_full { Self::Full } else { res }
337    }
338}
339
340impl RowSetOps for RowAddrTreeMap {
341    type Row = u64;
342
343    fn is_empty(&self) -> bool {
344        self.inner.is_empty()
345    }
346
347    fn len(&self) -> Option<u64> {
348        self.inner
349            .values()
350            .map(|row_addr_selection| match row_addr_selection {
351                RowAddrSelection::Full => None,
352                RowAddrSelection::Partial(indices) => Some(indices.len()),
353            })
354            .try_fold(0_u64, |acc, next| next.map(|next| next + acc))
355    }
356
357    fn remove(&mut self, row: Self::Row) -> bool {
358        let upper = (row >> 32) as u32;
359        let lower = row as u32;
360        match self.inner.get_mut(&upper) {
361            None => false,
362            Some(RowAddrSelection::Full) => {
363                let mut set = RoaringBitmap::full();
364                set.remove(lower);
365                self.inner.insert(upper, RowAddrSelection::Partial(set));
366                true
367            }
368            Some(RowAddrSelection::Partial(lower_set)) => {
369                let removed = lower_set.remove(lower);
370                if lower_set.is_empty() {
371                    self.inner.remove(&upper);
372                }
373                removed
374            }
375        }
376    }
377
378    fn contains(&self, row: Self::Row) -> bool {
379        let upper = (row >> 32) as u32;
380        let lower = row as u32;
381        match self.inner.get(&upper) {
382            None => false,
383            Some(RowAddrSelection::Full) => true,
384            Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower),
385        }
386    }
387
388    fn union_all(other: &[&Self]) -> Self {
389        let mut new_map = BTreeMap::new();
390
391        for map in other {
392            for (fragment, selection) in &map.inner {
393                new_map
394                    .entry(fragment)
395                    // I hate this allocation, but I can't think of a better way
396                    .or_insert_with(|| Vec::with_capacity(other.len()))
397                    .push(selection);
398            }
399        }
400
401        let new_map = new_map
402            .into_iter()
403            .map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections)))
404            .collect();
405
406        Self { inner: new_map }
407    }
408
409    #[track_caller]
410    fn from_sorted_iter<I>(iter: I) -> Result<Self>
411    where
412        I: IntoIterator<Item = Self::Row>,
413    {
414        let mut iter = iter.into_iter().peekable();
415        let mut inner = BTreeMap::new();
416
417        while let Some(row_id) = iter.peek() {
418            let fragment_id = (row_id >> 32) as u32;
419            let next_bitmap_iter = iter
420                .peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id)
421                .map(|row_id| row_id as u32);
422            let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else {
423                return Err(Error::internal(
424                    "RowAddrTreeMap::from_sorted_iter called with non-sorted input",
425                ));
426            };
427            inner.insert(fragment_id, RowAddrSelection::Partial(bitmap));
428        }
429
430        Ok(Self { inner })
431    }
432}
433
434impl RowAddrTreeMap {
435    /// Create an empty set
436    pub fn new() -> Self {
437        Self::default()
438    }
439
440    /// An iterator of row addrs
441    ///
442    /// If there are any "full fragment" items then this can't be calculated and None
443    /// is returned
444    pub fn row_addrs(&self) -> Option<impl Iterator<Item = RowAddress> + '_> {
445        let inner_iters = self
446            .inner
447            .iter()
448            .filter_map(|(frag_id, row_addr_selection)| match row_addr_selection {
449                RowAddrSelection::Full => None,
450                RowAddrSelection::Partial(bitmap) => Some(
451                    bitmap
452                        .iter()
453                        .map(|row_offset| RowAddress::new_from_parts(*frag_id, row_offset)),
454                ),
455            })
456            .collect::<Vec<_>>();
457        if inner_iters.len() != self.inner.len() {
458            None
459        } else {
460            Some(inner_iters.into_iter().flatten())
461        }
462    }
463
464    /// Insert a single value into the set
465    ///
466    /// Returns true if the value was not already in the set.
467    ///
468    /// ```rust
469    /// use lance_select::{RowAddrTreeMap, RowSetOps};
470    ///
471    /// let mut set = RowAddrTreeMap::new();
472    /// assert_eq!(set.insert(10), true);
473    /// assert_eq!(set.insert(10), false);
474    /// assert_eq!(set.contains(10), true);
475    /// ```
476    pub fn insert(&mut self, value: u64) -> bool {
477        let fragment = (value >> 32) as u32;
478        let row_addr = value as u32;
479        match self.inner.get_mut(&fragment) {
480            None => {
481                let mut set = RoaringBitmap::new();
482                set.insert(row_addr);
483                self.inner.insert(fragment, RowAddrSelection::Partial(set));
484                true
485            }
486            Some(RowAddrSelection::Full) => false,
487            Some(RowAddrSelection::Partial(set)) => set.insert(row_addr),
488        }
489    }
490
491    /// Insert a range of values into the set
492    pub fn insert_range<R: RangeBounds<u64>>(&mut self, range: R) -> u64 {
493        // Separate the start and end into high and low bits.
494        let (mut start_high, mut start_low) = match range.start_bound() {
495            std::ops::Bound::Included(&start) => ((start >> 32) as u32, start as u32),
496            std::ops::Bound::Excluded(&start) => {
497                let start = start.saturating_add(1);
498                ((start >> 32) as u32, start as u32)
499            }
500            std::ops::Bound::Unbounded => (0, 0),
501        };
502
503        let (end_high, end_low) = match range.end_bound() {
504            std::ops::Bound::Included(&end) => ((end >> 32) as u32, end as u32),
505            std::ops::Bound::Excluded(&end) => {
506                let end = end.saturating_sub(1);
507                ((end >> 32) as u32, end as u32)
508            }
509            std::ops::Bound::Unbounded => (u32::MAX, u32::MAX),
510        };
511
512        let mut count = 0;
513
514        while start_high <= end_high {
515            let start = start_low;
516            let end = if start_high == end_high {
517                end_low
518            } else {
519                u32::MAX
520            };
521            let fragment = start_high;
522            match self.inner.get_mut(&fragment) {
523                None => {
524                    let mut set = RoaringBitmap::new();
525                    count += set.insert_range(start..=end);
526                    self.inner.insert(fragment, RowAddrSelection::Partial(set));
527                }
528                Some(RowAddrSelection::Full) => {}
529                Some(RowAddrSelection::Partial(set)) => {
530                    count += set.insert_range(start..=end);
531                }
532            }
533            start_high += 1;
534            start_low = 0;
535        }
536
537        count
538    }
539
540    /// Add a bitmap for a single fragment
541    pub fn insert_bitmap(&mut self, fragment: u32, bitmap: RoaringBitmap) {
542        self.inner
543            .insert(fragment, RowAddrSelection::Partial(bitmap));
544    }
545
546    /// Add a whole fragment to the set
547    pub fn insert_fragment(&mut self, fragment_id: u32) {
548        self.inner.insert(fragment_id, RowAddrSelection::Full);
549    }
550
551    pub fn get_fragment_bitmap(&self, fragment_id: u32) -> Option<&RoaringBitmap> {
552        match self.inner.get(&fragment_id) {
553            None => None,
554            Some(RowAddrSelection::Full) => None,
555            Some(RowAddrSelection::Partial(set)) => Some(set),
556        }
557    }
558
559    /// Get the selection for a fragment
560    pub fn get(&self, fragment_id: &u32) -> Option<&RowAddrSelection> {
561        self.inner.get(fragment_id)
562    }
563
564    /// Iterate over (fragment_id, selection) pairs
565    pub fn iter(&self) -> impl Iterator<Item = (&u32, &RowAddrSelection)> {
566        self.inner.iter()
567    }
568
569    pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator<Item = u32>) {
570        let frag_id_set = frag_ids.into_iter().collect::<HashSet<_>>();
571        self.inner
572            .retain(|frag_id, _| frag_id_set.contains(frag_id));
573    }
574
575    /// Compute the serialized size of the set.
576    pub fn serialized_size(&self) -> usize {
577        // Starts at 4 because of the u32 num_entries
578        let mut size = 4;
579        for set in self.inner.values() {
580            // Each entry is 8 bytes for the fragment id and the bitmap size
581            size += 8;
582            if let RowAddrSelection::Partial(set) = set {
583                size += set.serialized_size();
584            }
585        }
586        size
587    }
588
589    /// Serialize the set into the given buffer
590    ///
591    /// The serialization format is stable and used for index serialization
592    ///
593    /// The serialization format is:
594    /// * u32: num_entries
595    ///
596    /// for each entry:
597    ///   * u32: fragment_id
598    ///   * u32: bitmap size
599    ///   * \[u8\]: bitmap
600    ///
601    /// If bitmap size is zero then the entire fragment is selected.
602    pub fn serialize_into<W: Write>(&self, mut writer: W) -> Result<()> {
603        writer.write_u32::<byteorder::LittleEndian>(self.inner.len() as u32)?;
604        for (fragment, set) in &self.inner {
605            writer.write_u32::<byteorder::LittleEndian>(*fragment)?;
606            if let RowAddrSelection::Partial(set) = set {
607                writer.write_u32::<byteorder::LittleEndian>(set.serialized_size() as u32)?;
608                set.serialize_into(&mut writer)?;
609            } else {
610                writer.write_u32::<byteorder::LittleEndian>(0)?;
611            }
612        }
613        Ok(())
614    }
615
616    /// Deserialize the set from the given buffer
617    pub fn deserialize_from<R: Read>(mut reader: R) -> Result<Self> {
618        let num_entries = reader.read_u32::<byteorder::LittleEndian>()?;
619        let mut inner = BTreeMap::new();
620        for _ in 0..num_entries {
621            let fragment = reader.read_u32::<byteorder::LittleEndian>()?;
622            let bitmap_size = reader.read_u32::<byteorder::LittleEndian>()?;
623            if bitmap_size == 0 {
624                inner.insert(fragment, RowAddrSelection::Full);
625            } else {
626                let mut buffer = vec![0; bitmap_size as usize];
627                reader.read_exact(&mut buffer)?;
628                let set = RoaringBitmap::deserialize_from(&buffer[..])?;
629                inner.insert(fragment, RowAddrSelection::Partial(set));
630            }
631        }
632        Ok(Self { inner })
633    }
634
635    /// Apply a mask to the row addrs
636    ///
637    /// For AllowList: only keep rows that are in the selection and not null
638    /// For BlockList: remove rows that are blocked (not null) and remove nulls
639    pub fn mask(&mut self, mask: &RowAddrMask) {
640        match mask {
641            RowAddrMask::AllowList(allow_list) => {
642                *self &= allow_list;
643            }
644            RowAddrMask::BlockList(block_list) => {
645                *self -= block_list;
646            }
647        }
648    }
649
650    /// Convert the set into an iterator of row addrs
651    ///
652    /// # Safety
653    ///
654    /// This is unsafe because if any of the inner RowAddrSelection elements
655    /// is not a Partial then the iterator will panic because we don't know
656    /// the size of the bitmap.
657    pub unsafe fn into_addr_iter(self) -> impl Iterator<Item = u64> {
658        self.inner
659            .into_iter()
660            .flat_map(|(fragment, selection)| match selection {
661                RowAddrSelection::Full => panic!("Size of full fragment is unknown"),
662                RowAddrSelection::Partial(bitmap) => bitmap.into_iter().map(move |val| {
663                    let fragment = fragment as u64;
664                    let row_offset = val as u64;
665                    (fragment << 32) | row_offset
666                }),
667            })
668    }
669
670    /// Iterate the selected row addresses as `(fragment_id, run)` pairs.
671    ///
672    /// Range-shaped counterpart to [`Self::into_addr_iter`]. A contiguous
673    /// run of N selected rows yields one item, not N. Uses roaring's
674    /// `Iter::next_range`, which walks each underlying container's runs
675    /// rather than its individual bits, so dense ranges cost
676    /// O(num_containers) (roughly num_rows / 65536) instead of O(num_rows).
677    ///
678    /// # Safety
679    /// Same contract as [`Self::into_addr_iter`]: panics if any entry is
680    /// `Full`, since the fragment size is unknown at this layer.
681    pub unsafe fn iter_runs(&self) -> impl Iterator<Item = (u32, RangeInclusive<u32>)> + '_ {
682        self.inner
683            .iter()
684            .flat_map(|(&fragment, selection)| match selection {
685                RowAddrSelection::Full => panic!("Size of full fragment is unknown"),
686                RowAddrSelection::Partial(bitmap) => {
687                    let mut iter = bitmap.iter();
688                    std::iter::from_fn(move || iter.next_range().map(|r| (fragment, r)))
689                }
690            })
691    }
692}
693
694impl CacheCodecImpl for RowAddrTreeMap {
695    const TYPE_ID: &'static str = "lance.RowAddrTreeMap";
696    const CURRENT_VERSION: u32 = 1;
697
698    fn serialize(&self, w: &mut CacheEntryWriter<'_>) -> Result<()> {
699        // A roaring bitmap has its own stable, portable serialization; it is
700        // the whole body, so write it raw rather than length-prefixed.
701        self.serialize_into(w.raw_writer())
702    }
703
704    fn deserialize(r: &mut CacheEntryReader<'_>) -> Result<Self> {
705        Self::deserialize_from(r.body().as_ref())
706    }
707}
708
709impl std::ops::BitOr<Self> for RowAddrTreeMap {
710    type Output = Self;
711
712    fn bitor(mut self, rhs: Self) -> Self::Output {
713        self |= rhs;
714        self
715    }
716}
717
718impl std::ops::BitOr<&Self> for RowAddrTreeMap {
719    type Output = Self;
720
721    fn bitor(mut self, rhs: &Self) -> Self::Output {
722        self |= rhs;
723        self
724    }
725}
726
727impl std::ops::BitOrAssign<Self> for RowAddrTreeMap {
728    fn bitor_assign(&mut self, rhs: Self) {
729        *self |= &rhs;
730    }
731}
732
733impl std::ops::BitOrAssign<&Self> for RowAddrTreeMap {
734    fn bitor_assign(&mut self, rhs: &Self) {
735        for (fragment, rhs_set) in &rhs.inner {
736            let lhs_set = self.inner.get_mut(fragment);
737            if let Some(lhs_set) = lhs_set {
738                match lhs_set {
739                    RowAddrSelection::Full => {
740                        // If the fragment is already selected then there is nothing to do
741                    }
742                    RowAddrSelection::Partial(lhs_bitmap) => match rhs_set {
743                        RowAddrSelection::Full => {
744                            *lhs_set = RowAddrSelection::Full;
745                        }
746                        RowAddrSelection::Partial(rhs_set) => {
747                            *lhs_bitmap |= rhs_set;
748                        }
749                    },
750                }
751            } else {
752                self.inner.insert(*fragment, rhs_set.clone());
753            }
754        }
755    }
756}
757
758impl std::ops::BitAnd<Self> for RowAddrTreeMap {
759    type Output = Self;
760
761    fn bitand(mut self, rhs: Self) -> Self::Output {
762        self &= &rhs;
763        self
764    }
765}
766
767impl std::ops::BitAnd<&Self> for RowAddrTreeMap {
768    type Output = Self;
769
770    fn bitand(mut self, rhs: &Self) -> Self::Output {
771        self &= rhs;
772        self
773    }
774}
775
776impl std::ops::BitAndAssign<Self> for RowAddrTreeMap {
777    fn bitand_assign(&mut self, rhs: Self) {
778        *self &= &rhs;
779    }
780}
781
782impl std::ops::BitAndAssign<&Self> for RowAddrTreeMap {
783    fn bitand_assign(&mut self, rhs: &Self) {
784        // Remove fragment that aren't on the RHS
785        self.inner
786            .retain(|fragment, _| rhs.inner.contains_key(fragment));
787
788        // For fragments that are on the RHS, intersect the bitmaps
789        for (fragment, mut lhs_set) in &mut self.inner {
790            match (&mut lhs_set, rhs.inner.get(fragment)) {
791                (_, None) => {} // Already handled by retain
792                (_, Some(RowAddrSelection::Full)) => {
793                    // Everything selected on RHS, so can leave LHS untouched.
794                }
795                (RowAddrSelection::Partial(lhs_set), Some(RowAddrSelection::Partial(rhs_set))) => {
796                    *lhs_set &= rhs_set;
797                }
798                (RowAddrSelection::Full, Some(RowAddrSelection::Partial(rhs_set))) => {
799                    *lhs_set = RowAddrSelection::Partial(rhs_set.clone());
800                }
801            }
802        }
803        // Some bitmaps might now be empty. If they are, we should remove them.
804        self.inner.retain(|_, set| match set {
805            RowAddrSelection::Partial(set) => !set.is_empty(),
806            RowAddrSelection::Full => true,
807        });
808    }
809}
810
811impl std::ops::Sub<Self> for RowAddrTreeMap {
812    type Output = Self;
813
814    fn sub(mut self, rhs: Self) -> Self {
815        self -= &rhs;
816        self
817    }
818}
819
820impl std::ops::Sub<&Self> for RowAddrTreeMap {
821    type Output = Self;
822
823    fn sub(mut self, rhs: &Self) -> Self {
824        self -= rhs;
825        self
826    }
827}
828
829impl std::ops::SubAssign<&Self> for RowAddrTreeMap {
830    fn sub_assign(&mut self, rhs: &Self) {
831        for (fragment, rhs_set) in &rhs.inner {
832            match self.inner.get_mut(fragment) {
833                None => {}
834                Some(RowAddrSelection::Full) => {
835                    // If the fragment is already selected then there is nothing to do
836                    match rhs_set {
837                        RowAddrSelection::Full => {
838                            self.inner.remove(fragment);
839                        }
840                        RowAddrSelection::Partial(rhs_set) => {
841                            // This generally won't be hit.
842                            let mut set = RoaringBitmap::full();
843                            set -= rhs_set;
844                            self.inner.insert(*fragment, RowAddrSelection::Partial(set));
845                        }
846                    }
847                }
848                Some(RowAddrSelection::Partial(lhs_set)) => match rhs_set {
849                    RowAddrSelection::Full => {
850                        self.inner.remove(fragment);
851                    }
852                    RowAddrSelection::Partial(rhs_set) => {
853                        *lhs_set -= rhs_set;
854                        if lhs_set.is_empty() {
855                            self.inner.remove(fragment);
856                        }
857                    }
858                },
859            }
860        }
861    }
862}
863
864impl FromIterator<u64> for RowAddrTreeMap {
865    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
866        let mut inner = BTreeMap::new();
867        for row_addr in iter {
868            let upper = (row_addr >> 32) as u32;
869            let lower = row_addr as u32;
870            match inner.get_mut(&upper) {
871                None => {
872                    let mut set = RoaringBitmap::new();
873                    set.insert(lower);
874                    inner.insert(upper, RowAddrSelection::Partial(set));
875                }
876                Some(RowAddrSelection::Full) => {
877                    // If the fragment is already selected then there is nothing to do
878                }
879                Some(RowAddrSelection::Partial(set)) => {
880                    set.insert(lower);
881                }
882            }
883        }
884        Self { inner }
885    }
886}
887
888impl<'a> FromIterator<&'a u64> for RowAddrTreeMap {
889    fn from_iter<T: IntoIterator<Item = &'a u64>>(iter: T) -> Self {
890        Self::from_iter(iter.into_iter().copied())
891    }
892}
893
894impl From<Range<u64>> for RowAddrTreeMap {
895    fn from(range: Range<u64>) -> Self {
896        let mut map = Self::default();
897        map.insert_range(range);
898        map
899    }
900}
901
902impl From<RangeInclusive<u64>> for RowAddrTreeMap {
903    fn from(range: RangeInclusive<u64>) -> Self {
904        let mut map = Self::default();
905        map.insert_range(range);
906        map
907    }
908}
909
910impl From<RoaringTreemap> for RowAddrTreeMap {
911    fn from(roaring: RoaringTreemap) -> Self {
912        let mut inner = BTreeMap::new();
913        for (fragment, set) in roaring.bitmaps() {
914            inner.insert(fragment, RowAddrSelection::Partial(set.clone()));
915        }
916        Self { inner }
917    }
918}
919
920impl Extend<u64> for RowAddrTreeMap {
921    fn extend<T: IntoIterator<Item = u64>>(&mut self, iter: T) {
922        for row_addr in iter {
923            let upper = (row_addr >> 32) as u32;
924            let lower = row_addr as u32;
925            match self.inner.get_mut(&upper) {
926                None => {
927                    let mut set = RoaringBitmap::new();
928                    set.insert(lower);
929                    self.inner.insert(upper, RowAddrSelection::Partial(set));
930                }
931                Some(RowAddrSelection::Full) => {
932                    // If the fragment is already selected then there is nothing to do
933                }
934                Some(RowAddrSelection::Partial(set)) => {
935                    set.insert(lower);
936                }
937            }
938        }
939    }
940}
941
942impl<'a> Extend<&'a u64> for RowAddrTreeMap {
943    fn extend<T: IntoIterator<Item = &'a u64>>(&mut self, iter: T) {
944        self.extend(iter.into_iter().copied())
945    }
946}
947
948// Extending with RowAddrTreeMap is basically a cumulative set union
949impl Extend<Self> for RowAddrTreeMap {
950    fn extend<T: IntoIterator<Item = Self>>(&mut self, iter: T) {
951        for other in iter {
952            for (fragment, set) in other.inner {
953                match self.inner.get_mut(&fragment) {
954                    None => {
955                        self.inner.insert(fragment, set);
956                    }
957                    Some(RowAddrSelection::Full) => {
958                        // If the fragment is already selected then there is nothing to do
959                    }
960                    Some(RowAddrSelection::Partial(lhs_set)) => match set {
961                        RowAddrSelection::Full => {
962                            self.inner.insert(fragment, RowAddrSelection::Full);
963                        }
964                        RowAddrSelection::Partial(rhs_set) => {
965                            *lhs_set |= rhs_set;
966                        }
967                    },
968                }
969            }
970        }
971    }
972}
973
974pub fn bitmap_to_ranges(bitmap: &RoaringBitmap) -> Vec<Range<u64>> {
975    let mut ranges = Vec::new();
976    let mut iter = bitmap.iter();
977    while let Some(r) = iter.next_range() {
978        ranges.push(*r.start() as u64..(*r.end() as u64 + 1));
979    }
980    ranges
981}
982
983pub fn ranges_to_bitmap(ranges: &[Range<u64>], sorted: bool) -> RoaringBitmap {
984    if ranges.is_empty() {
985        return RoaringBitmap::new();
986    }
987    if sorted {
988        let sample_size = ranges.len().min(10);
989        let avg_len: u64 = ranges
990            .iter()
991            .take(sample_size)
992            .map(|r| r.end - r.start)
993            .sum::<u64>()
994            / sample_size as u64;
995        // from_sorted_iter appends each value in O(1) but must visit every u32.
996        // insert_range bulk-fills containers but does a binary search per call.
997        // Crossover is ~6: below that, iterating all values is cheaper.
998        if avg_len <= 6 {
999            return RoaringBitmap::from_sorted_iter(
1000                ranges.iter().flat_map(|r| r.start as u32..r.end as u32),
1001            )
1002            .unwrap();
1003        }
1004    }
1005    let mut bm = RoaringBitmap::new();
1006    for r in ranges {
1007        bm.insert_range(r.start as u32..r.end as u32);
1008    }
1009    bm
1010}
1011
1012/// A set of stable row ids backed by a 64-bit Roaring bitmap.
1013///
1014/// This is a thin wrapper around [`RoaringTreemap`]. It represents a
1015/// collection of unique row ids and provides the common row-set
1016/// operations defined by [`RowSetOps`].
1017#[derive(Clone, Debug, Default, PartialEq)]
1018pub struct RowIdSet {
1019    inner: RoaringTreemap,
1020}
1021
1022impl RowIdSet {
1023    /// Creates an empty set of row ids.
1024    pub fn new() -> Self {
1025        Self::default()
1026    }
1027    /// Returns an iterator over the contained row ids in ascending order.
1028    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
1029        self.inner.iter()
1030    }
1031    /// Returns the union of `self` and `other`.
1032    pub fn union(mut self, other: &Self) -> Self {
1033        self.inner |= &other.inner;
1034        self
1035    }
1036    /// Returns the set difference `self \\ other`.
1037    pub fn difference(mut self, other: &Self) -> Self {
1038        self.inner -= &other.inner;
1039        self
1040    }
1041}
1042
1043impl RowSetOps for RowIdSet {
1044    type Row = u64;
1045    fn is_empty(&self) -> bool {
1046        self.inner.is_empty()
1047    }
1048    fn len(&self) -> Option<u64> {
1049        Some(self.inner.len())
1050    }
1051    fn remove(&mut self, row: Self::Row) -> bool {
1052        self.inner.remove(row)
1053    }
1054    fn contains(&self, row: Self::Row) -> bool {
1055        self.inner.contains(row)
1056    }
1057    fn union_all(other: &[&Self]) -> Self {
1058        let mut result = other
1059            .first()
1060            .map_or(Self::default(), |&first| first.clone());
1061        for set in other {
1062            result.inner |= &set.inner;
1063        }
1064        result
1065    }
1066    #[track_caller]
1067    fn from_sorted_iter<I>(iter: I) -> Result<Self>
1068    where
1069        I: IntoIterator<Item = Self::Row>,
1070    {
1071        let mut inner = RoaringTreemap::new();
1072        let mut last: Option<u64> = None;
1073        for value in iter {
1074            if let Some(prev) = last
1075                && value < prev
1076            {
1077                return Err(Error::internal(
1078                    "RowIdSet::from_sorted_iter called with non-sorted input",
1079                ));
1080            }
1081            inner.insert(value);
1082            last = Some(value);
1083        }
1084        Ok(Self { inner })
1085    }
1086}
1087
1088/// A mask over stable row ids based on an allow-list or block-list.
1089///
1090/// The semantics mirror [`RowAddrMask`], but operate on stable
1091/// row ids instead of physical row addresses.
1092#[derive(Clone, Debug, PartialEq)]
1093pub enum RowIdMask {
1094    /// Only the ids in the set are selected.
1095    AllowList(RowIdSet),
1096    /// All ids are selected except those in the set.
1097    BlockList(RowIdSet),
1098}
1099
1100impl Default for RowIdMask {
1101    fn default() -> Self {
1102        // Empty block list means all rows are allowed
1103        Self::BlockList(RowIdSet::default())
1104    }
1105}
1106impl RowIdMask {
1107    /// Create a mask allowing all rows, this is an alias for [`Default`].
1108    pub fn all_rows() -> Self {
1109        Self::default()
1110    }
1111    /// Create a mask that doesn't allow any row id.
1112    pub fn allow_nothing() -> Self {
1113        Self::AllowList(RowIdSet::default())
1114    }
1115    /// Create a mask from an allow list.
1116    pub fn from_allowed(allow_list: RowIdSet) -> Self {
1117        Self::AllowList(allow_list)
1118    }
1119    /// Create a mask from a block list.
1120    pub fn from_block(block_list: RowIdSet) -> Self {
1121        Self::BlockList(block_list)
1122    }
1123    /// True if the row id is selected by the mask, false otherwise.
1124    pub fn selected(&self, row_id: u64) -> bool {
1125        match self {
1126            Self::AllowList(allow_list) => allow_list.contains(row_id),
1127            Self::BlockList(block_list) => !block_list.contains(row_id),
1128        }
1129    }
1130    /// Return the indices of the input row ids that are selected by the mask.
1131    pub fn selected_indices<'a>(&self, row_ids: impl Iterator<Item = &'a u64> + 'a) -> Vec<u64> {
1132        row_ids
1133            .enumerate()
1134            .filter_map(|(idx, row_id)| {
1135                if self.selected(*row_id) {
1136                    Some(idx as u64)
1137                } else {
1138                    None
1139                }
1140            })
1141            .collect()
1142    }
1143    /// Also block the given ids.
1144    ///
1145    /// * `AllowList(a)` -> `AllowList(a \\ block_list)`
1146    /// * `BlockList(b)` -> `BlockList(b union block_list)`
1147    pub fn also_block(self, block_list: RowIdSet) -> Self {
1148        match self {
1149            Self::AllowList(allow_list) => Self::AllowList(allow_list.difference(&block_list)),
1150            Self::BlockList(existing) => Self::BlockList(existing.union(&block_list)),
1151        }
1152    }
1153    /// Also allow the given ids.
1154    ///
1155    /// * `AllowList(a)` -> `AllowList(a union allow_list)`
1156    /// * `BlockList(b)` -> `BlockList(b \\ allow_list)`
1157    pub fn also_allow(self, allow_list: RowIdSet) -> Self {
1158        match self {
1159            Self::AllowList(existing) => Self::AllowList(existing.union(&allow_list)),
1160            Self::BlockList(block_list) => Self::BlockList(block_list.difference(&allow_list)),
1161        }
1162    }
1163    /// Return the maximum number of row ids that could be selected by this mask.
1164    ///
1165    /// Will be `None` if this is a `BlockList` (unbounded).
1166    pub fn max_len(&self) -> Option<u64> {
1167        match self {
1168            Self::AllowList(selection) => selection.len(),
1169            Self::BlockList(_) => None,
1170        }
1171    }
1172    /// Iterate over the row ids that are selected by the mask.
1173    ///
1174    /// This is only possible if this is an `AllowList`. For a `BlockList`
1175    /// the domain of possible row ids is unbounded.
1176    pub fn iter_ids(&self) -> Option<Box<dyn Iterator<Item = u64> + '_>> {
1177        match self {
1178            Self::AllowList(allow_list) => Some(Box::new(allow_list.iter())),
1179            Self::BlockList(_) => None,
1180        }
1181    }
1182}
1183
1184#[cfg(test)]
1185mod tests {
1186    use super::*;
1187    use proptest::{prop_assert, prop_assert_eq};
1188
1189    fn rows(ids: &[u64]) -> RowAddrTreeMap {
1190        RowAddrTreeMap::from_iter(ids)
1191    }
1192
1193    fn assert_mask_selects(mask: &RowAddrMask, selected: &[u64], not_selected: &[u64]) {
1194        for &id in selected {
1195            assert!(mask.selected(id), "Expected row {} to be selected", id);
1196        }
1197        for &id in not_selected {
1198            assert!(!mask.selected(id), "Expected row {} to NOT be selected", id);
1199        }
1200    }
1201
1202    fn selected_in_range(mask: &RowAddrMask, range: std::ops::Range<u64>) -> Vec<u64> {
1203        range.filter(|val| mask.selected(*val)).collect()
1204    }
1205
1206    #[test]
1207    fn test_row_addr_mask_construction() {
1208        let full_mask = RowAddrMask::all_rows();
1209        assert_eq!(full_mask.max_len(), None);
1210        assert_mask_selects(&full_mask, &[0, 1, 4 << 32 | 3], &[]);
1211        assert_eq!(full_mask.allow_list(), None);
1212        assert_eq!(full_mask.block_list(), Some(&RowAddrTreeMap::default()));
1213        assert!(full_mask.iter_addrs().is_none());
1214
1215        let empty_mask = RowAddrMask::allow_nothing();
1216        assert_eq!(empty_mask.max_len(), Some(0));
1217        assert_mask_selects(&empty_mask, &[], &[0, 1, 4 << 32 | 3]);
1218        assert_eq!(empty_mask.allow_list(), Some(&RowAddrTreeMap::default()));
1219        assert_eq!(empty_mask.block_list(), None);
1220        let iter = empty_mask.iter_addrs();
1221        assert!(iter.is_some());
1222        assert_eq!(iter.unwrap().count(), 0);
1223
1224        let allow_list = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1225        assert_eq!(allow_list.max_len(), Some(3));
1226        assert_mask_selects(&allow_list, &[10, 20, 30], &[0, 15, 25, 40]);
1227        assert_eq!(allow_list.allow_list(), Some(&rows(&[10, 20, 30])));
1228        assert_eq!(allow_list.block_list(), None);
1229        let iter = allow_list.iter_addrs();
1230        assert!(iter.is_some());
1231        let ids: Vec<u64> = iter.unwrap().map(|addr| addr.into()).collect();
1232        assert_eq!(ids, vec![10, 20, 30]);
1233
1234        let mut full_frag = RowAddrTreeMap::default();
1235        full_frag.insert_fragment(2);
1236        let allow_list = RowAddrMask::from_allowed(full_frag);
1237        assert_eq!(allow_list.max_len(), None);
1238        assert_mask_selects(&allow_list, &[(2 << 32) + 5], &[(3 << 32) + 5]);
1239        assert!(allow_list.iter_addrs().is_none());
1240    }
1241
1242    #[test]
1243    fn test_selected_indices() {
1244        // Allow list
1245        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 40]));
1246        assert!(mask.selected_indices(std::iter::empty()).is_empty());
1247        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[1, 3]);
1248
1249        // Block list
1250        let mask = RowAddrMask::from_block(rows(&[10, 20, 40]));
1251        assert!(mask.selected_indices(std::iter::empty()).is_empty());
1252        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[0, 2]);
1253    }
1254
1255    #[test]
1256    fn test_also_allow() {
1257        // Allow list
1258        let mask = RowAddrMask::from_allowed(rows(&[10, 20]));
1259        let new_mask = mask.also_allow(rows(&[20, 30, 40]));
1260        assert_eq!(new_mask, RowAddrMask::from_allowed(rows(&[10, 20, 30, 40])));
1261
1262        // Block list
1263        let mask = RowAddrMask::from_block(rows(&[10, 20, 30]));
1264        let new_mask = mask.also_allow(rows(&[20, 40]));
1265        assert_eq!(new_mask, RowAddrMask::from_block(rows(&[10, 30])));
1266    }
1267
1268    #[test]
1269    fn test_also_block() {
1270        // Allow list
1271        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1272        let new_mask = mask.also_block(rows(&[20, 40]));
1273        assert_eq!(new_mask, RowAddrMask::from_allowed(rows(&[10, 30])));
1274
1275        // Block list
1276        let mask = RowAddrMask::from_block(rows(&[10, 20]));
1277        let new_mask = mask.also_block(rows(&[20, 30, 40]));
1278        assert_eq!(new_mask, RowAddrMask::from_block(rows(&[10, 20, 30, 40])));
1279    }
1280
1281    #[test]
1282    fn test_iter_ids() {
1283        // Allow list
1284        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1285        let expected: Vec<_> = [10, 20, 30].into_iter().map(RowAddress::from).collect();
1286        assert_eq!(mask.iter_addrs().unwrap().collect::<Vec<_>>(), expected);
1287
1288        // Allow list with full fragment
1289        let mut inner = RowAddrTreeMap::default();
1290        inner.insert_fragment(10);
1291        let mask = RowAddrMask::from_allowed(inner);
1292        assert!(mask.iter_addrs().is_none());
1293
1294        // Block list
1295        let mask = RowAddrMask::from_block(rows(&[10, 20, 30]));
1296        assert!(mask.iter_addrs().is_none());
1297    }
1298
1299    #[test]
1300    fn test_row_addr_mask_not() {
1301        let allow_list = RowAddrMask::from_allowed(rows(&[1, 2, 3]));
1302        let block_list = !allow_list.clone();
1303        assert_eq!(block_list, RowAddrMask::from_block(rows(&[1, 2, 3])));
1304        // Can roundtrip by negating again
1305        assert_eq!(!block_list, allow_list);
1306    }
1307
1308    #[test]
1309    fn test_ops() {
1310        let mask = RowAddrMask::default();
1311        assert_mask_selects(&mask, &[1, 5], &[]);
1312
1313        let block_list = mask.also_block(rows(&[0, 5, 15]));
1314        assert_mask_selects(&block_list, &[1], &[5]);
1315
1316        let allow_list = RowAddrMask::from_allowed(rows(&[0, 2, 5]));
1317        assert_mask_selects(&allow_list, &[5], &[1]);
1318
1319        let combined = block_list & allow_list;
1320        assert_mask_selects(&combined, &[2], &[0, 5]);
1321
1322        let other = RowAddrMask::from_allowed(rows(&[3]));
1323        let combined = combined | other;
1324        assert_mask_selects(&combined, &[2, 3], &[0, 5]);
1325
1326        let block_list = RowAddrMask::from_block(rows(&[0]));
1327        let allow_list = RowAddrMask::from_allowed(rows(&[3]));
1328
1329        let combined = block_list | allow_list;
1330        assert_mask_selects(&combined, &[1], &[]);
1331    }
1332
1333    #[test]
1334    fn test_logical_and() {
1335        let allow1 = RowAddrMask::from_allowed(rows(&[0, 1]));
1336        let block1 = RowAddrMask::from_block(rows(&[1, 2]));
1337        let allow2 = RowAddrMask::from_allowed(rows(&[1, 2, 3, 4]));
1338        let block2 = RowAddrMask::from_block(rows(&[3, 4]));
1339
1340        fn check(lhs: &RowAddrMask, rhs: &RowAddrMask, expected: &[u64]) {
1341            for mask in [lhs.clone() & rhs.clone(), rhs.clone() & lhs.clone()] {
1342                assert_eq!(selected_in_range(&mask, 0..10), expected);
1343            }
1344        }
1345
1346        // Allow & Allow
1347        check(&allow1, &allow1, &[0, 1]);
1348        check(&allow1, &allow2, &[1]);
1349
1350        // Block & Block
1351        check(&block1, &block1, &[0, 3, 4, 5, 6, 7, 8, 9]);
1352        check(&block1, &block2, &[0, 5, 6, 7, 8, 9]);
1353
1354        // Allow & Block
1355        check(&allow1, &block1, &[0]);
1356        check(&allow1, &block2, &[0, 1]);
1357        check(&allow2, &block1, &[3, 4]);
1358        check(&allow2, &block2, &[1, 2]);
1359    }
1360
1361    #[test]
1362    fn test_logical_or() {
1363        let allow1 = RowAddrMask::from_allowed(rows(&[5, 6, 7, 8, 9]));
1364        let block1 = RowAddrMask::from_block(rows(&[5, 6]));
1365        let mixed1 = allow1.clone().also_block(rows(&[5, 6]));
1366        let allow2 = RowAddrMask::from_allowed(rows(&[2, 3, 4, 5, 6, 7, 8]));
1367        let block2 = RowAddrMask::from_block(rows(&[4, 5]));
1368        let mixed2 = allow2.clone().also_block(rows(&[4, 5]));
1369
1370        fn check(lhs: &RowAddrMask, rhs: &RowAddrMask, expected: &[u64]) {
1371            for mask in [lhs.clone() | rhs.clone(), rhs.clone() | lhs.clone()] {
1372                assert_eq!(selected_in_range(&mask, 0..10), expected);
1373            }
1374        }
1375
1376        check(&allow1, &allow1, &[5, 6, 7, 8, 9]);
1377        check(&block1, &block1, &[0, 1, 2, 3, 4, 7, 8, 9]);
1378        check(&mixed1, &mixed1, &[7, 8, 9]);
1379        check(&allow2, &allow2, &[2, 3, 4, 5, 6, 7, 8]);
1380        check(&block2, &block2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1381        check(&mixed2, &mixed2, &[2, 3, 6, 7, 8]);
1382
1383        check(&allow1, &block1, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1384        check(&allow1, &mixed1, &[5, 6, 7, 8, 9]);
1385        check(&allow1, &allow2, &[2, 3, 4, 5, 6, 7, 8, 9]);
1386        check(&allow1, &block2, &[0, 1, 2, 3, 5, 6, 7, 8, 9]);
1387        check(&allow1, &mixed2, &[2, 3, 5, 6, 7, 8, 9]);
1388        check(&block1, &mixed1, &[0, 1, 2, 3, 4, 7, 8, 9]);
1389        check(&block1, &allow2, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1390        check(&block1, &block2, &[0, 1, 2, 3, 4, 6, 7, 8, 9]);
1391        check(&block1, &mixed2, &[0, 1, 2, 3, 4, 6, 7, 8, 9]);
1392        check(&mixed1, &allow2, &[2, 3, 4, 5, 6, 7, 8, 9]);
1393        check(&mixed1, &block2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1394        check(&mixed1, &mixed2, &[2, 3, 6, 7, 8, 9]);
1395        check(&allow2, &block2, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1396        check(&allow2, &mixed2, &[2, 3, 4, 5, 6, 7, 8]);
1397        check(&block2, &mixed2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1398    }
1399
1400    #[test]
1401    fn test_deserialize_legacy_format() {
1402        // Test that we can deserialize the old format where both allow_list
1403        // and block_list could be present in the serialized form.
1404        //
1405        // The old format (before this PR) used a struct with both allow_list and block_list
1406        // fields. The new format uses an enum. The deserialization code should handle
1407        // the case where both lists are present by converting to AllowList(allow - block).
1408
1409        // Create the RowIdTreeMaps and serialize them directly
1410        let allow = rows(&[1, 2, 3, 4, 5, 10, 15]);
1411        let block = rows(&[2, 4, 15]);
1412
1413        // Serialize using the stable RowIdTreeMap serialization format
1414        let block_bytes = {
1415            let mut buf = Vec::with_capacity(block.serialized_size());
1416            block.serialize_into(&mut buf).unwrap();
1417            buf
1418        };
1419        let allow_bytes = {
1420            let mut buf = Vec::with_capacity(allow.serialized_size());
1421            allow.serialize_into(&mut buf).unwrap();
1422            buf
1423        };
1424
1425        // Construct a binary array with both values present (simulating old format)
1426        let old_format_array =
1427            BinaryArray::from_opt_vec(vec![Some(&block_bytes), Some(&allow_bytes)]);
1428
1429        // Deserialize - should handle this by creating AllowList(allow - block)
1430        let deserialized = RowAddrMask::from_arrow(&old_format_array).unwrap();
1431
1432        // The expected result: AllowList([1, 2, 3, 4, 5, 10, 15] - [2, 4, 15]) = [1, 3, 5, 10]
1433        assert_mask_selects(&deserialized, &[1, 3, 5, 10], &[2, 4, 15]);
1434        assert!(
1435            deserialized.allow_list().is_some(),
1436            "Should deserialize to AllowList variant"
1437        );
1438    }
1439
1440    #[test]
1441    fn test_roundtrip_arrow() {
1442        let row_addrs = rows(&[1, 2, 3, 100, 2000]);
1443
1444        // Allow list
1445        let original = RowAddrMask::from_allowed(row_addrs.clone());
1446        let array = original.into_arrow().unwrap();
1447        assert_eq!(RowAddrMask::from_arrow(&array).unwrap(), original);
1448
1449        // Block list
1450        let original = RowAddrMask::from_block(row_addrs);
1451        let array = original.into_arrow().unwrap();
1452        assert_eq!(RowAddrMask::from_arrow(&array).unwrap(), original);
1453    }
1454
1455    #[test]
1456    fn test_deserialize_legacy_empty_lists() {
1457        // Case 1: Both None (should become all_rows)
1458        let array = BinaryArray::from_opt_vec(vec![None, None]);
1459        let mask = RowAddrMask::from_arrow(&array).unwrap();
1460        assert_mask_selects(&mask, &[0, 100, u64::MAX], &[]);
1461
1462        // Case 2: Only block list (no allow list)
1463        let block = rows(&[5, 10]);
1464        let block_bytes = {
1465            let mut buf = Vec::with_capacity(block.serialized_size());
1466            block.serialize_into(&mut buf).unwrap();
1467            buf
1468        };
1469        let array = BinaryArray::from_opt_vec(vec![Some(&block_bytes[..]), None]);
1470        let mask = RowAddrMask::from_arrow(&array).unwrap();
1471        assert_mask_selects(&mask, &[0, 15], &[5, 10]);
1472
1473        // Case 3: Only allow list (no block list)
1474        let allow = rows(&[5, 10]);
1475        let allow_bytes = {
1476            let mut buf = Vec::with_capacity(allow.serialized_size());
1477            allow.serialize_into(&mut buf).unwrap();
1478            buf
1479        };
1480        let array = BinaryArray::from_opt_vec(vec![None, Some(&allow_bytes[..])]);
1481        let mask = RowAddrMask::from_arrow(&array).unwrap();
1482        assert_mask_selects(&mask, &[5, 10], &[0, 15]);
1483    }
1484
1485    #[test]
1486    fn test_map_insert() {
1487        let mut map = RowAddrTreeMap::default();
1488
1489        assert!(!map.contains(20));
1490        assert!(map.insert(20));
1491        assert!(map.contains(20));
1492        assert!(!map.insert(20)); // Inserting again should be no-op
1493
1494        let bitmap = map.get_fragment_bitmap(0);
1495        assert!(bitmap.is_some());
1496        let bitmap = bitmap.unwrap();
1497        assert_eq!(bitmap.len(), 1);
1498
1499        assert!(map.get_fragment_bitmap(1).is_none());
1500
1501        map.insert_fragment(0);
1502        assert!(map.contains(0));
1503        assert!(!map.insert(0)); // Inserting into full fragment should be no-op
1504        assert!(map.get_fragment_bitmap(0).is_none());
1505    }
1506
1507    #[test]
1508    fn test_map_insert_range() {
1509        let ranges = &[
1510            (0..10),
1511            (40..500),
1512            ((u32::MAX as u64 - 10)..(u32::MAX as u64 + 20)),
1513        ];
1514
1515        for range in ranges {
1516            let mut mask = RowAddrTreeMap::default();
1517
1518            let count = mask.insert_range(range.clone());
1519            let expected = range.end - range.start;
1520            assert_eq!(count, expected);
1521
1522            let count = mask.insert_range(range.clone());
1523            assert_eq!(count, 0);
1524
1525            let new_range = range.start + 5..range.end + 5;
1526            let count = mask.insert_range(new_range.clone());
1527            assert_eq!(count, 5);
1528        }
1529
1530        let mut mask = RowAddrTreeMap::default();
1531        let count = mask.insert_range(..10);
1532        assert_eq!(count, 10);
1533        assert!(mask.contains(0));
1534
1535        let count = mask.insert_range(20..=24);
1536        assert_eq!(count, 5);
1537
1538        mask.insert_fragment(0);
1539        let count = mask.insert_range(100..200);
1540        assert_eq!(count, 0);
1541    }
1542
1543    #[test]
1544    fn test_map_remove() {
1545        let mut mask = RowAddrTreeMap::default();
1546
1547        assert!(!mask.remove(20));
1548
1549        mask.insert(20);
1550        assert!(mask.contains(20));
1551        assert!(mask.remove(20));
1552        assert!(!mask.contains(20));
1553
1554        mask.insert_range(10..=20);
1555        assert!(mask.contains(15));
1556        assert!(mask.remove(15));
1557        assert!(!mask.contains(15));
1558
1559        // We don't test removing from a full fragment, because that would take
1560        // a lot of memory.
1561    }
1562
1563    #[test]
1564    fn test_map_mask() {
1565        let mask = rows(&[0, 1, 2]);
1566        let mask2 = rows(&[0, 2, 3]);
1567
1568        let allow_list = RowAddrMask::AllowList(mask2.clone());
1569        let mut actual = mask.clone();
1570        actual.mask(&allow_list);
1571        assert_eq!(actual, rows(&[0, 2]));
1572
1573        let block_list = RowAddrMask::BlockList(mask2);
1574        let mut actual = mask;
1575        actual.mask(&block_list);
1576        assert_eq!(actual, rows(&[1]));
1577    }
1578
1579    #[test]
1580    #[should_panic(expected = "Size of full fragment is unknown")]
1581    fn test_map_insert_full_fragment_row() {
1582        let mut mask = RowAddrTreeMap::default();
1583        mask.insert_fragment(0);
1584
1585        unsafe {
1586            let _ = mask.into_addr_iter().collect::<Vec<u64>>();
1587        }
1588    }
1589
1590    #[test]
1591    fn test_map_into_addr_iter() {
1592        let mut mask = RowAddrTreeMap::default();
1593        mask.insert(0);
1594        mask.insert(1);
1595        mask.insert(1 << 32 | 5);
1596        mask.insert(2 << 32 | 10);
1597
1598        let expected = vec![0u64, 1, 1 << 32 | 5, 2 << 32 | 10];
1599        let actual: Vec<u64> = unsafe { mask.into_addr_iter().collect() };
1600        assert_eq!(actual, expected);
1601    }
1602
1603    #[test]
1604    fn test_map_iter_runs() {
1605        // Three contiguous regions across two fragments.
1606        let mut mask = RowAddrTreeMap::default();
1607        mask.insert_range(0..3);
1608        mask.insert_range(10..15);
1609        mask.insert_range((1u64 << 32) + 100..(1u64 << 32) + 103);
1610
1611        // SAFETY: only Partial entries.
1612        let runs: Vec<(u32, RangeInclusive<u32>)> = unsafe { mask.iter_runs().collect() };
1613        assert_eq!(runs, vec![(0, 0..=2), (0, 10..=14), (1, 100..=102)]);
1614    }
1615
1616    #[test]
1617    fn test_map_iter_runs_matches_into_addr_iter() {
1618        // Confirm iter_runs and into_addr_iter agree on a non-trivial shape.
1619        let mut mask = RowAddrTreeMap::default();
1620        mask.insert_range(5..7);
1621        mask.insert_range(11..12);
1622        mask.insert_range(20..25);
1623        mask.insert_range((1u64 << 32)..(1u64 << 32) + 3);
1624
1625        let from_runs: Vec<u64> = unsafe { mask.iter_runs() }
1626            .flat_map(|(frag, run)| {
1627                let frag = u64::from(frag);
1628                (*run.start()..=*run.end()).map(move |v| (frag << 32) | u64::from(v))
1629            })
1630            .collect();
1631        let from_bits: Vec<u64> = unsafe { mask.clone().into_addr_iter() }.collect();
1632        assert_eq!(from_runs, from_bits);
1633    }
1634
1635    #[test]
1636    fn test_map_from() {
1637        let map = RowAddrTreeMap::from(10..12);
1638        assert!(map.contains(10));
1639        assert!(map.contains(11));
1640        assert!(!map.contains(12));
1641        assert!(!map.contains(3));
1642
1643        let map = RowAddrTreeMap::from(10..=12);
1644        assert!(map.contains(10));
1645        assert!(map.contains(11));
1646        assert!(map.contains(12));
1647        assert!(!map.contains(3));
1648    }
1649
1650    #[test]
1651    fn test_map_from_roaring() {
1652        let bitmap = RoaringTreemap::from_iter(&[0, 1, 1 << 32]);
1653        let map = RowAddrTreeMap::from(bitmap);
1654        assert!(map.contains(0) && map.contains(1) && map.contains(1 << 32));
1655        assert!(!map.contains(2));
1656    }
1657
1658    #[test]
1659    fn test_map_extend() {
1660        let mut map = RowAddrTreeMap::default();
1661        map.insert(0);
1662        map.insert_fragment(1);
1663
1664        let other_rows = [0, 2, 1 << 32 | 10, 3 << 32 | 5];
1665        map.extend(other_rows.iter().copied());
1666
1667        assert!(map.contains(0));
1668        assert!(map.contains(2));
1669        assert!(map.contains(1 << 32 | 5));
1670        assert!(map.contains(1 << 32 | 10));
1671        assert!(map.contains(3 << 32 | 5));
1672        assert!(!map.contains(3));
1673    }
1674
1675    #[test]
1676    fn test_map_extend_other_maps() {
1677        let mut map = RowAddrTreeMap::default();
1678        map.insert(0);
1679        map.insert_fragment(1);
1680        map.insert(4 << 32);
1681
1682        let mut other_map = rows(&[0, 2, 1 << 32 | 10, 3 << 32 | 5]);
1683        other_map.insert_fragment(4);
1684        map.extend(std::iter::once(other_map));
1685
1686        for id in [
1687            0,
1688            2,
1689            1 << 32 | 5,
1690            1 << 32 | 10,
1691            3 << 32 | 5,
1692            4 << 32,
1693            4 << 32 | 7,
1694        ] {
1695            assert!(map.contains(id), "Expected {} to be contained", id);
1696        }
1697        assert!(!map.contains(3));
1698    }
1699
1700    proptest::proptest! {
1701        #[test]
1702        fn test_map_serialization_roundtrip(
1703            values in proptest::collection::vec(
1704                (0..u32::MAX, proptest::option::of(proptest::collection::vec(0..u32::MAX, 0..1000))),
1705                0..10
1706            )
1707        ) {
1708            let mut mask = RowAddrTreeMap::default();
1709            for (fragment, rows) in values {
1710                if let Some(rows) = rows {
1711                    let bitmap = RoaringBitmap::from_iter(rows);
1712                    mask.insert_bitmap(fragment, bitmap);
1713                } else {
1714                    mask.insert_fragment(fragment);
1715                }
1716            }
1717
1718            let mut data = Vec::new();
1719            mask.serialize_into(&mut data).unwrap();
1720            let deserialized = RowAddrTreeMap::deserialize_from(data.as_slice()).unwrap();
1721            prop_assert_eq!(mask, deserialized);
1722        }
1723
1724        #[test]
1725        fn test_map_intersect(
1726            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1727            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1728            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1729            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1730        ) {
1731            let mut left = RowAddrTreeMap::default();
1732            for fragment in left_full_fragments.clone() {
1733                left.insert_fragment(fragment);
1734            }
1735            left.extend(left_rows.iter().copied());
1736
1737            let mut right = RowAddrTreeMap::default();
1738            for fragment in right_full_fragments.clone() {
1739                right.insert_fragment(fragment);
1740            }
1741            right.extend(right_rows.iter().copied());
1742
1743            let mut expected = RowAddrTreeMap::default();
1744            for fragment in &left_full_fragments {
1745                if right_full_fragments.contains(fragment) {
1746                    expected.insert_fragment(*fragment);
1747                }
1748            }
1749
1750            let left_in_right = left_rows.iter().filter(|row| {
1751                right_rows.contains(row)
1752                    || right_full_fragments.contains(&((*row >> 32) as u32))
1753            });
1754            expected.extend(left_in_right);
1755            let right_in_left = right_rows.iter().filter(|row| {
1756                left_rows.contains(row)
1757                    || left_full_fragments.contains(&((*row >> 32) as u32))
1758            });
1759            expected.extend(right_in_left);
1760
1761            let actual = left & right;
1762            prop_assert_eq!(expected, actual);
1763        }
1764
1765        #[test]
1766        fn test_map_union(
1767            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1768            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1769            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1770            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1771        ) {
1772            let mut left = RowAddrTreeMap::default();
1773            for fragment in left_full_fragments.clone() {
1774                left.insert_fragment(fragment);
1775            }
1776            left.extend(left_rows.iter().copied());
1777
1778            let mut right = RowAddrTreeMap::default();
1779            for fragment in right_full_fragments.clone() {
1780                right.insert_fragment(fragment);
1781            }
1782            right.extend(right_rows.iter().copied());
1783
1784            let mut expected = RowAddrTreeMap::default();
1785            for fragment in left_full_fragments {
1786                expected.insert_fragment(fragment);
1787            }
1788            for fragment in right_full_fragments {
1789                expected.insert_fragment(fragment);
1790            }
1791
1792            let combined_rows = left_rows.iter().chain(right_rows.iter());
1793            expected.extend(combined_rows);
1794
1795            let actual = left | right;
1796            for actual_key_val in &actual.inner {
1797                proptest::prop_assert!(expected.inner.contains_key(actual_key_val.0));
1798                let expected_val = expected.inner.get(actual_key_val.0).unwrap();
1799                prop_assert_eq!(
1800                    actual_key_val.1,
1801                    expected_val,
1802                    "error on key {}",
1803                    actual_key_val.0
1804                );
1805            }
1806            prop_assert_eq!(expected, actual);
1807        }
1808
1809        #[test]
1810        fn test_map_subassign_rows(
1811            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1812            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1813            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1814        ) {
1815            let mut left = RowAddrTreeMap::default();
1816            for fragment in left_full_fragments {
1817                left.insert_fragment(fragment);
1818            }
1819            left.extend(left_rows.iter().copied());
1820
1821            let mut right = RowAddrTreeMap::default();
1822            right.extend(right_rows.iter().copied());
1823
1824            let mut expected = left.clone();
1825            for row in right_rows {
1826                expected.remove(row);
1827            }
1828
1829            left -= &right;
1830            prop_assert_eq!(expected, left);
1831        }
1832
1833        #[test]
1834        fn test_map_subassign_frags(
1835            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1836            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1837            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1838        ) {
1839            let mut left = RowAddrTreeMap::default();
1840            for fragment in left_full_fragments {
1841                left.insert_fragment(fragment);
1842            }
1843            left.extend(left_rows.iter().copied());
1844
1845            let mut right = RowAddrTreeMap::default();
1846            for fragment in right_full_fragments.clone() {
1847                right.insert_fragment(fragment);
1848            }
1849
1850            let mut expected = left.clone();
1851            for fragment in right_full_fragments {
1852                expected.inner.remove(&fragment);
1853            }
1854
1855            left -= &right;
1856            prop_assert_eq!(expected, left);
1857        }
1858
1859        #[test]
1860        fn test_from_sorted_iter(
1861            mut rows in proptest::collection::vec(0..u64::MAX, 0..1000)
1862        ) {
1863            rows.sort();
1864            let num_rows = rows.len();
1865            let mask = RowAddrTreeMap::from_sorted_iter(rows).unwrap();
1866            prop_assert_eq!(mask.len(), Some(num_rows as u64));
1867        }
1868
1869
1870    }
1871
1872    #[test]
1873    fn test_row_addr_selection_deep_size_of() {
1874        use lance_core::deepsize::DeepSizeOf;
1875
1876        // Test Full variant - should have minimal size (just the enum discriminant)
1877        let full = RowAddrSelection::Full;
1878        let full_size = full.deep_size_of();
1879        // Full variant has no heap allocations beyond the enum itself
1880        assert!(full_size < 100); // Small sanity check
1881
1882        // Test Partial variant - should include bitmap size
1883        let mut bitmap = RoaringBitmap::new();
1884        bitmap.insert_range(0..100);
1885        let partial = RowAddrSelection::Partial(bitmap.clone());
1886        let partial_size = partial.deep_size_of();
1887        // Partial variant should be larger due to bitmap
1888        assert!(partial_size >= bitmap.serialized_size());
1889    }
1890
1891    #[test]
1892    fn test_row_addr_selection_union_all_with_full() {
1893        let full = RowAddrSelection::Full;
1894        let partial = RowAddrSelection::Partial(RoaringBitmap::from_iter(&[1, 2, 3]));
1895
1896        assert!(matches!(
1897            RowAddrSelection::union_all(&[&full, &partial]),
1898            RowAddrSelection::Full
1899        ));
1900
1901        let partial2 = RowAddrSelection::Partial(RoaringBitmap::from_iter(&[4, 5, 6]));
1902        let RowAddrSelection::Partial(bitmap) = RowAddrSelection::union_all(&[&partial, &partial2])
1903        else {
1904            panic!("Expected Partial");
1905        };
1906        assert!(bitmap.contains(1) && bitmap.contains(4));
1907    }
1908
1909    #[test]
1910    fn test_insert_range_unbounded_start() {
1911        let mut map = RowAddrTreeMap::default();
1912
1913        // Test exclusive start bound
1914        let count = map.insert_range((std::ops::Bound::Excluded(5), std::ops::Bound::Included(10)));
1915        assert_eq!(count, 5); // 6, 7, 8, 9, 10
1916        assert!(!map.contains(5));
1917        assert!(map.contains(6));
1918        assert!(map.contains(10));
1919
1920        // Test unbounded end
1921        let mut map2 = RowAddrTreeMap::default();
1922        let count = map2.insert_range(0..5);
1923        assert_eq!(count, 5);
1924        assert!(map2.contains(0));
1925        assert!(map2.contains(4));
1926        assert!(!map2.contains(5));
1927    }
1928
1929    #[test]
1930    fn test_remove_from_full_fragment() {
1931        let mut map = RowAddrTreeMap::default();
1932        map.insert_fragment(0);
1933
1934        // Verify it's a full fragment - get_fragment_bitmap returns None for Full
1935        for id in [0, 100, u32::MAX as u64] {
1936            assert!(map.contains(id));
1937        }
1938        assert!(map.get_fragment_bitmap(0).is_none());
1939
1940        // Remove a value from the full fragment
1941        assert!(map.remove(50));
1942
1943        // Now it should be partial (a full RoaringBitmap minus one value)
1944        assert!(map.contains(0) && !map.contains(50) && map.contains(100));
1945        assert!(map.get_fragment_bitmap(0).is_some());
1946    }
1947
1948    #[test]
1949    fn test_retain_fragments() {
1950        let mut map = RowAddrTreeMap::default();
1951        map.insert(0); // fragment 0
1952        map.insert(1 << 32 | 5); // fragment 1
1953        map.insert(2 << 32 | 10); // fragment 2
1954        map.insert_fragment(3); // fragment 3
1955
1956        map.retain_fragments([0, 2]);
1957
1958        assert!(map.contains(0) && map.contains(2 << 32 | 10));
1959        assert!(!map.contains(1 << 32 | 5) && !map.contains(3 << 32));
1960    }
1961
1962    #[test]
1963    fn test_bitor_assign_full_fragment() {
1964        // Test BitOrAssign when LHS has Full and RHS has Partial
1965        let mut map1 = RowAddrTreeMap::default();
1966        map1.insert_fragment(0);
1967        let mut map2 = RowAddrTreeMap::default();
1968        map2.insert(5);
1969
1970        map1 |= &map2;
1971        // Full | Partial = Full
1972        assert!(map1.contains(0) && map1.contains(5) && map1.contains(100));
1973
1974        // Test BitOrAssign when LHS has Partial and RHS has Full
1975        let mut map3 = RowAddrTreeMap::default();
1976        map3.insert(5);
1977        let mut map4 = RowAddrTreeMap::default();
1978        map4.insert_fragment(0);
1979
1980        map3 |= &map4;
1981        // Partial | Full = Full
1982        assert!(map3.contains(0) && map3.contains(5) && map3.contains(100));
1983    }
1984
1985    #[test]
1986    fn test_bitand_assign_full_fragments() {
1987        // Test BitAndAssign when both have Full for same fragment
1988        let mut map1 = RowAddrTreeMap::default();
1989        map1.insert_fragment(0);
1990        let mut map2 = RowAddrTreeMap::default();
1991        map2.insert_fragment(0);
1992
1993        map1 &= &map2;
1994        // Full & Full = Full
1995        assert!(map1.contains(0) && map1.contains(100));
1996
1997        // Test BitAndAssign when LHS Full, RHS Partial
1998        let mut map3 = RowAddrTreeMap::default();
1999        map3.insert_fragment(0);
2000        let mut map4 = RowAddrTreeMap::default();
2001        map4.insert(5);
2002        map4.insert(10);
2003
2004        map3 &= &map4;
2005        // Full & Partial([5,10]) = Partial([5,10])
2006        assert!(map3.contains(5) && map3.contains(10));
2007        assert!(!map3.contains(0) && !map3.contains(100));
2008
2009        // Test that empty intersection results in removal
2010        let mut map5 = RowAddrTreeMap::default();
2011        map5.insert(5);
2012        let mut map6 = RowAddrTreeMap::default();
2013        map6.insert(10);
2014
2015        map5 &= &map6;
2016        assert!(map5.is_empty());
2017    }
2018
2019    #[test]
2020    fn test_sub_assign_with_full_fragments() {
2021        // Test SubAssign when LHS is Full and RHS is Partial
2022        let mut map1 = RowAddrTreeMap::default();
2023        map1.insert_fragment(0);
2024        let mut map2 = RowAddrTreeMap::default();
2025        map2.insert(5);
2026        map2.insert(10);
2027
2028        map1 -= &map2;
2029        // Full - Partial([5,10]) = Full minus those values
2030        assert!(map1.contains(0) && map1.contains(100));
2031        assert!(!map1.contains(5) && !map1.contains(10));
2032
2033        // Test SubAssign when both are Full for same fragment
2034        let mut map3 = RowAddrTreeMap::default();
2035        map3.insert_fragment(0);
2036        let mut map4 = RowAddrTreeMap::default();
2037        map4.insert_fragment(0);
2038
2039        map3 -= &map4;
2040        // Full - Full = empty
2041        assert!(map3.is_empty());
2042
2043        // Test SubAssign when LHS is Partial and RHS is Full
2044        let mut map5 = RowAddrTreeMap::default();
2045        map5.insert(5);
2046        map5.insert(10);
2047        let mut map6 = RowAddrTreeMap::default();
2048        map6.insert_fragment(0);
2049
2050        map5 -= &map6;
2051        // Partial - Full = empty
2052        assert!(map5.is_empty());
2053    }
2054
2055    #[test]
2056    fn test_from_iterator_with_full_fragment() {
2057        // Test that inserting into a full fragment is a no-op
2058        let mut map = RowAddrTreeMap::default();
2059        map.insert_fragment(0);
2060
2061        // Extend with values that would go into fragment 0
2062        map.extend([5u64, 10, 100].iter());
2063
2064        // Should still be full fragment
2065        for id in [0, 5, 10, 100, u32::MAX as u64] {
2066            assert!(map.contains(id));
2067        }
2068    }
2069
2070    #[test]
2071    fn test_insert_range_excluded_end() {
2072        // Test excluded end bound (line 391-393)
2073        let mut map = RowAddrTreeMap::default();
2074        // Using RangeFrom with small range won't hit the unbounded case
2075        // Instead test Bound::Excluded for end
2076        let count = map.insert_range((std::ops::Bound::Included(5), std::ops::Bound::Excluded(10)));
2077        assert_eq!(count, 5); // 5, 6, 7, 8, 9
2078        assert!(map.contains(5));
2079        assert!(map.contains(9));
2080        assert!(!map.contains(10));
2081    }
2082
2083    #[test]
2084    fn test_bitand_assign_owned() {
2085        // Test BitAndAssign<Self> (owned, not reference)
2086        let mut map1 = RowAddrTreeMap::default();
2087        map1.insert(5);
2088        map1.insert(10);
2089
2090        // Using owned rhs (not reference)
2091        map1 &= rows(&[5, 15]);
2092
2093        assert!(map1.contains(5));
2094        assert!(!map1.contains(10) && !map1.contains(15));
2095    }
2096
2097    #[test]
2098    fn test_from_iter_with_full_fragment() {
2099        // When we collect into RowAddrTreeMap, it should handle duplicates
2100        let map: RowAddrTreeMap = vec![5u64, 10, 100].into_iter().collect();
2101        assert!(map.contains(5) && map.contains(10));
2102
2103        // Test that extending a map with full fragment ignores new values
2104        let mut map = RowAddrTreeMap::default();
2105        map.insert_fragment(0);
2106        for val in [5, 10, 100] {
2107            map.insert(val); // This should be no-op since fragment is full
2108        }
2109        // Still full fragment
2110        for id in [0, 5, u32::MAX as u64] {
2111            assert!(map.contains(id));
2112        }
2113    }
2114
2115    // ============================================================================
2116    // Tests for bitmap_to_ranges / ranges_to_bitmap
2117    // ============================================================================
2118
2119    #[test]
2120    fn test_bitmap_to_ranges_empty() {
2121        let bm = RoaringBitmap::new();
2122        assert!(bitmap_to_ranges(&bm).is_empty());
2123    }
2124
2125    #[test]
2126    fn test_bitmap_to_ranges_single() {
2127        let bm = RoaringBitmap::from_iter([5]);
2128        assert_eq!(bitmap_to_ranges(&bm), vec![5..6]);
2129    }
2130
2131    #[test]
2132    fn test_bitmap_to_ranges_contiguous() {
2133        let mut bm = RoaringBitmap::new();
2134        bm.insert_range(10..20);
2135        assert_eq!(bitmap_to_ranges(&bm), vec![10..20]);
2136    }
2137
2138    #[test]
2139    fn test_bitmap_to_ranges_multiple() {
2140        let mut bm = RoaringBitmap::new();
2141        bm.insert_range(0..3);
2142        bm.insert_range(10..15);
2143        bm.insert(100);
2144        assert_eq!(bitmap_to_ranges(&bm), vec![0..3, 10..15, 100..101]);
2145    }
2146
2147    #[test]
2148    fn test_ranges_to_bitmap_empty() {
2149        let bm = ranges_to_bitmap(&[], true);
2150        assert!(bm.is_empty());
2151    }
2152
2153    #[test]
2154    fn test_ranges_to_bitmap_sorted_short_ranges() {
2155        // avg len = 1, uses from_sorted_iter path
2156        let ranges = vec![0..1, 5..6, 10..11];
2157        let bm = ranges_to_bitmap(&ranges, true);
2158        assert!(bm.contains(0) && bm.contains(5) && bm.contains(10));
2159        assert_eq!(bm.len(), 3);
2160    }
2161
2162    #[test]
2163    fn test_ranges_to_bitmap_sorted_long_ranges() {
2164        // avg len = 100, uses insert_range path
2165        let ranges = vec![0..100, 200..300];
2166        let bm = ranges_to_bitmap(&ranges, true);
2167        assert_eq!(bm.len(), 200);
2168        assert!(bm.contains(0) && bm.contains(99));
2169        assert!(!bm.contains(100));
2170        assert!(bm.contains(200) && bm.contains(299));
2171    }
2172
2173    #[test]
2174    fn test_ranges_to_bitmap_unsorted() {
2175        let ranges = vec![200..300, 0..100];
2176        let bm = ranges_to_bitmap(&ranges, false);
2177        assert_eq!(bm.len(), 200);
2178        assert!(bm.contains(0) && bm.contains(250));
2179    }
2180
2181    #[test]
2182    fn test_bitmap_ranges_roundtrip() {
2183        let mut original = RoaringBitmap::new();
2184        original.insert_range(0..50);
2185        original.insert_range(100..200);
2186        original.insert(500);
2187        original.insert_range(1000..1010);
2188
2189        let ranges = bitmap_to_ranges(&original);
2190        let reconstructed = ranges_to_bitmap(&ranges, true);
2191        assert_eq!(original, reconstructed);
2192    }
2193
2194    // ============================================================================
2195    // Tests for RowIdSet
2196    // ============================================================================
2197
2198    fn row_ids(ids: &[u64]) -> RowIdSet {
2199        let mut set = RowIdSet::new();
2200        for &id in ids {
2201            set.inner.insert(id);
2202        }
2203        set
2204    }
2205
2206    #[test]
2207    fn test_row_id_set_construction() {
2208        let set = RowIdSet::new();
2209        assert!(set.is_empty());
2210        assert_eq!(set.len(), Some(0));
2211
2212        let set = row_ids(&[10, 20, 30]);
2213        assert!(!set.is_empty());
2214        assert_eq!(set.len(), Some(3));
2215        assert!(set.contains(10));
2216        assert!(set.contains(20));
2217        assert!(set.contains(30));
2218        assert!(!set.contains(15));
2219    }
2220
2221    #[test]
2222    fn test_row_id_set_remove() {
2223        let mut set = row_ids(&[10, 20, 30]);
2224
2225        assert!(!set.remove(15)); // Not present
2226        assert_eq!(set.len(), Some(3));
2227
2228        assert!(set.remove(20)); // Present
2229        assert_eq!(set.len(), Some(2));
2230        assert!(!set.contains(20));
2231        assert!(set.contains(10));
2232        assert!(set.contains(30));
2233
2234        assert!(!set.remove(20)); // Already removed
2235    }
2236
2237    #[test]
2238    fn test_row_id_set_union() {
2239        let set1 = row_ids(&[10, 20, 30]);
2240        let set2 = row_ids(&[20, 30, 40]);
2241
2242        let result = set1.union(&set2);
2243        assert_eq!(result.len(), Some(4));
2244        for id in [10, 20, 30, 40] {
2245            assert!(result.contains(id));
2246        }
2247    }
2248
2249    #[test]
2250    fn test_row_id_set_difference() {
2251        let set1 = row_ids(&[10, 20, 30, 40]);
2252        let set2 = row_ids(&[20, 40]);
2253
2254        let result = set1.difference(&set2);
2255        assert_eq!(result.len(), Some(2));
2256        assert!(result.contains(10));
2257        assert!(result.contains(30));
2258        assert!(!result.contains(20));
2259        assert!(!result.contains(40));
2260    }
2261
2262    #[test]
2263    fn test_row_id_set_union_all() {
2264        let set1 = row_ids(&[10, 20]);
2265        let set2 = row_ids(&[20, 30]);
2266        let set3 = row_ids(&[30, 40]);
2267
2268        let result = RowIdSet::union_all(&[&set1, &set2, &set3]);
2269        assert_eq!(result.len(), Some(4));
2270        for id in [10, 20, 30, 40] {
2271            assert!(result.contains(id));
2272        }
2273
2274        // Empty slice should return empty set
2275        let result = RowIdSet::union_all(&[]);
2276        assert!(result.is_empty());
2277    }
2278
2279    #[test]
2280    fn test_row_id_set_iter() {
2281        let set = row_ids(&[10, 20, 30]);
2282        let collected: Vec<u64> = set.iter().collect();
2283        assert_eq!(collected, vec![10, 20, 30]);
2284
2285        let empty = RowIdSet::new();
2286        assert_eq!(empty.iter().count(), 0);
2287    }
2288
2289    #[test]
2290    fn test_row_id_set_from_sorted_iter() {
2291        // Valid sorted input
2292        let set = RowIdSet::from_sorted_iter([10, 20, 30, 40]).unwrap();
2293        assert_eq!(set.len(), Some(4));
2294        for id in [10, 20, 30, 40] {
2295            assert!(set.contains(id));
2296        }
2297
2298        // Empty iterator
2299        let set = RowIdSet::from_sorted_iter(std::iter::empty()).unwrap();
2300        assert!(set.is_empty());
2301
2302        // Single element
2303        let set = RowIdSet::from_sorted_iter([42]).unwrap();
2304        assert_eq!(set.len(), Some(1));
2305        assert!(set.contains(42));
2306    }
2307
2308    #[test]
2309    fn test_row_id_set_from_sorted_iter_unsorted() {
2310        // Non-sorted input should return error
2311        let result = RowIdSet::from_sorted_iter([30, 10, 20]);
2312        assert!(result.is_err());
2313        assert!(result.unwrap_err().to_string().contains("non-sorted"));
2314    }
2315
2316    #[test]
2317    fn test_row_id_set_large_values() {
2318        // Test with large u64 values
2319        let large_ids = [u64::MAX - 10, u64::MAX - 5, u64::MAX - 1];
2320        let set = row_ids(&large_ids);
2321
2322        for &id in &large_ids {
2323            assert!(set.contains(id));
2324        }
2325        assert!(!set.contains(u64::MAX));
2326        assert_eq!(set.len(), Some(3));
2327    }
2328
2329    // ============================================================================
2330    // Tests for RowIdMask
2331    // ============================================================================
2332
2333    fn assert_row_id_mask_selects(mask: &RowIdMask, selected: &[u64], not_selected: &[u64]) {
2334        for &id in selected {
2335            assert!(mask.selected(id), "Expected row id {} to be selected", id);
2336        }
2337        for &id in not_selected {
2338            assert!(
2339                !mask.selected(id),
2340                "Expected row id {} to NOT be selected",
2341                id
2342            );
2343        }
2344    }
2345
2346    #[test]
2347    fn test_row_id_mask_construction() {
2348        let full_mask = RowIdMask::all_rows();
2349        assert_eq!(full_mask.max_len(), None);
2350        assert_row_id_mask_selects(&full_mask, &[0, 1, 100, u64::MAX - 1], &[]);
2351
2352        let empty_mask = RowIdMask::allow_nothing();
2353        assert_eq!(empty_mask.max_len(), Some(0));
2354        assert_row_id_mask_selects(&empty_mask, &[], &[0, 1, 100]);
2355
2356        let allow_list = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2357        assert_eq!(allow_list.max_len(), Some(3));
2358        assert_row_id_mask_selects(&allow_list, &[10, 20, 30], &[0, 15, 25, 40]);
2359
2360        let block_list = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2361        assert_eq!(block_list.max_len(), None);
2362        assert_row_id_mask_selects(&block_list, &[0, 15, 25, 40], &[10, 20, 30]);
2363    }
2364
2365    #[test]
2366    fn test_row_id_mask_selected_indices() {
2367        // Allow list
2368        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 40]));
2369        assert!(mask.selected_indices(std::iter::empty()).is_empty());
2370        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[1, 3]);
2371
2372        // Block list
2373        let mask = RowIdMask::from_block(row_ids(&[10, 20, 40]));
2374        assert!(mask.selected_indices(std::iter::empty()).is_empty());
2375        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[0, 2]);
2376    }
2377
2378    #[test]
2379    fn test_row_id_mask_also_allow() {
2380        // Allow list
2381        let mask = RowIdMask::from_allowed(row_ids(&[10, 20]));
2382        let new_mask = mask.also_allow(row_ids(&[20, 30, 40]));
2383        assert_eq!(
2384            new_mask,
2385            RowIdMask::from_allowed(row_ids(&[10, 20, 30, 40]))
2386        );
2387
2388        // Block list
2389        let mask = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2390        let new_mask = mask.also_allow(row_ids(&[20, 40]));
2391        assert_eq!(new_mask, RowIdMask::from_block(row_ids(&[10, 30])));
2392    }
2393
2394    #[test]
2395    fn test_row_id_mask_also_block() {
2396        // Allow list
2397        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2398        let new_mask = mask.also_block(row_ids(&[20, 40]));
2399        assert_eq!(new_mask, RowIdMask::from_allowed(row_ids(&[10, 30])));
2400
2401        // Block list
2402        let mask = RowIdMask::from_block(row_ids(&[10, 20]));
2403        let new_mask = mask.also_block(row_ids(&[20, 30, 40]));
2404        assert_eq!(new_mask, RowIdMask::from_block(row_ids(&[10, 20, 30, 40])));
2405    }
2406
2407    #[test]
2408    fn test_row_id_mask_iter_ids() {
2409        // Allow list
2410        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2411        let ids: Vec<u64> = mask.iter_ids().unwrap().collect();
2412        assert_eq!(ids, vec![10, 20, 30]);
2413
2414        // Empty allow list
2415        let mask = RowIdMask::allow_nothing();
2416        let iter = mask.iter_ids();
2417        assert!(iter.is_some());
2418        assert_eq!(iter.unwrap().count(), 0);
2419
2420        // Block list
2421        let mask = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2422        assert!(mask.iter_ids().is_none());
2423    }
2424
2425    #[test]
2426    fn test_row_id_mask_default() {
2427        let mask = RowIdMask::default();
2428        // Default should be BlockList with empty set (all rows allowed)
2429        assert_row_id_mask_selects(&mask, &[0, 1, 100, 1000], &[]);
2430        assert_eq!(mask.max_len(), None);
2431    }
2432
2433    #[test]
2434    fn test_row_id_mask_ops() {
2435        let mask = RowIdMask::default();
2436        assert_row_id_mask_selects(&mask, &[1, 5, 100], &[]);
2437
2438        let block_list = mask.also_block(row_ids(&[0, 5, 15]));
2439        assert_row_id_mask_selects(&block_list, &[1, 100], &[5]);
2440
2441        let allow_list = RowIdMask::from_allowed(row_ids(&[0, 2, 5]));
2442        assert_row_id_mask_selects(&allow_list, &[5], &[1, 100]);
2443    }
2444
2445    #[test]
2446    fn test_row_id_mask_combined_ops() {
2447        // Test combining allow and block operations
2448        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30, 40, 50]));
2449        let mask = mask.also_block(row_ids(&[20, 40]));
2450        assert_row_id_mask_selects(&mask, &[10, 30, 50], &[20, 40]);
2451
2452        let mask = mask.also_allow(row_ids(&[20, 60]));
2453        assert_row_id_mask_selects(&mask, &[10, 20, 30, 50, 60], &[40]);
2454    }
2455
2456    #[test]
2457    fn test_row_id_mask_with_large_values() {
2458        let large_ids = [u64::MAX - 10, u64::MAX - 5, u64::MAX - 1];
2459
2460        // Allow list with large values
2461        let mask = RowIdMask::from_allowed(row_ids(&large_ids));
2462        for &id in &large_ids {
2463            assert!(mask.selected(id));
2464        }
2465        assert!(!mask.selected(u64::MAX));
2466        assert!(!mask.selected(0));
2467
2468        // Block list with large values
2469        let mask = RowIdMask::from_block(row_ids(&large_ids));
2470        for &id in &large_ids {
2471            assert!(!mask.selected(id));
2472        }
2473        assert!(mask.selected(u64::MAX));
2474        assert!(mask.selected(0));
2475    }
2476
2477    proptest::proptest! {
2478        #[test]
2479        fn test_row_id_set_from_sorted_iter_proptest(
2480            mut row_ids in proptest::collection::vec(0..u64::MAX, 0..1000)
2481        ) {
2482            row_ids.sort();
2483            row_ids.dedup();
2484            let num_rows = row_ids.len();
2485            let set = RowIdSet::from_sorted_iter(row_ids.clone()).unwrap();
2486            prop_assert_eq!(set.len(), Some(num_rows as u64));
2487            for id in row_ids {
2488                prop_assert!(set.contains(id));
2489            }
2490        }
2491
2492        #[test]
2493        fn test_row_id_set_union_proptest(
2494            ids1 in proptest::collection::vec(0..u64::MAX, 0..500),
2495            ids2 in proptest::collection::vec(0..u64::MAX, 0..500),
2496        ) {
2497            let set1 = row_ids(&ids1);
2498            let set2 = row_ids(&ids2);
2499
2500            let result = set1.union(&set2);
2501
2502            // All ids from both sets should be in result
2503            for id in ids1.iter().chain(ids2.iter()) {
2504                prop_assert!(result.contains(*id));
2505            }
2506
2507            // Result size should be union size
2508            let expected_size = ids1.iter().chain(ids2.iter()).collect::<std::collections::HashSet<_>>().len();
2509            prop_assert_eq!(result.len(), Some(expected_size as u64));
2510        }
2511
2512        #[test]
2513        fn test_row_id_set_difference_proptest(
2514            ids1 in proptest::collection::vec(0..u64::MAX, 0..500),
2515            ids2 in proptest::collection::vec(0..u64::MAX, 0..500),
2516        ) {
2517            let set1 = row_ids(&ids1);
2518            let set2 = row_ids(&ids2);
2519
2520            let result = set1.difference(&set2);
2521
2522            // Items in ids1 but not in ids2 should be in result
2523            for id in &ids1 {
2524                if !ids2.contains(id) {
2525                    prop_assert!(result.contains(*id));
2526                } else {
2527                    prop_assert!(!result.contains(*id));
2528                }
2529            }
2530        }
2531
2532        #[test]
2533        fn test_row_id_mask_allow_block_proptest(
2534            allow_ids in proptest::collection::vec(0..10000u64, 0..100),
2535            block_ids in proptest::collection::vec(0..10000u64, 0..100),
2536            test_ids in proptest::collection::vec(0..10000u64, 0..50),
2537        ) {
2538            let mask = RowIdMask::from_allowed(row_ids(&allow_ids))
2539                .also_block(row_ids(&block_ids));
2540
2541            for id in test_ids {
2542                let expected = allow_ids.contains(&id) && !block_ids.contains(&id);
2543                prop_assert_eq!(mask.selected(id), expected);
2544            }
2545        }
2546    }
2547}