Skip to main content

lance_core/utils/
mask.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashSet;
5use std::io::Write;
6use std::ops::{Range, RangeBounds, RangeInclusive};
7use std::{collections::BTreeMap, io::Read};
8
9use arrow_array::{Array, BinaryArray, GenericBinaryArray};
10use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer};
11use byteorder::{ReadBytesExt, WriteBytesExt};
12use deepsize::DeepSizeOf;
13use itertools::Itertools;
14use roaring::{MultiOps, RoaringBitmap, RoaringTreemap};
15
16use crate::cache::CacheCodecImpl;
17use crate::{Error, Result};
18
19use super::address::RowAddress;
20
21mod nullable;
22
23pub use nullable::{NullableRowAddrMask, NullableRowAddrSet};
24
25/// A mask that selects or deselects rows based on an allow-list or block-list.
26#[derive(Clone, Debug, DeepSizeOf, PartialEq)]
27pub enum RowAddrMask {
28    AllowList(RowAddrTreeMap),
29    BlockList(RowAddrTreeMap),
30}
31
32impl Default for RowAddrMask {
33    fn default() -> Self {
34        // Empty block list means all rows are allowed
35        Self::BlockList(RowAddrTreeMap::new())
36    }
37}
38
39impl RowAddrMask {
40    // Create a mask allowing all rows, this is an alias for [default]
41    pub fn all_rows() -> Self {
42        Self::default()
43    }
44
45    // Create a mask that doesn't allow anything
46    pub fn allow_nothing() -> Self {
47        Self::AllowList(RowAddrTreeMap::new())
48    }
49
50    // Create a mask from an allow list
51    pub fn from_allowed(allow_list: RowAddrTreeMap) -> Self {
52        Self::AllowList(allow_list)
53    }
54
55    // Create a mask from a block list
56    pub fn from_block(block_list: RowAddrTreeMap) -> Self {
57        Self::BlockList(block_list)
58    }
59
60    pub fn block_list(&self) -> Option<&RowAddrTreeMap> {
61        match self {
62            Self::BlockList(block_list) => Some(block_list),
63            _ => None,
64        }
65    }
66
67    pub fn allow_list(&self) -> Option<&RowAddrTreeMap> {
68        match self {
69            Self::AllowList(allow_list) => Some(allow_list),
70            _ => None,
71        }
72    }
73
74    /// True if the row_id is selected by the mask, false otherwise
75    pub fn selected(&self, row_id: u64) -> bool {
76        match self {
77            Self::AllowList(allow_list) => allow_list.contains(row_id),
78            Self::BlockList(block_list) => !block_list.contains(row_id),
79        }
80    }
81
82    /// Return the indices of the input row ids that were valid
83    pub fn selected_indices<'a>(&self, row_ids: impl Iterator<Item = &'a u64> + 'a) -> Vec<u64> {
84        row_ids
85            .enumerate()
86            .filter_map(|(idx, row_id)| {
87                if self.selected(*row_id) {
88                    Some(idx as u64)
89                } else {
90                    None
91                }
92            })
93            .collect()
94    }
95
96    /// Also block the given addrs
97    pub fn also_block(self, block_list: RowAddrTreeMap) -> Self {
98        match self {
99            Self::AllowList(allow_list) => Self::AllowList(allow_list - block_list),
100            Self::BlockList(existing) => Self::BlockList(existing | block_list),
101        }
102    }
103
104    /// Also allow the given addrs
105    pub fn also_allow(self, allow_list: RowAddrTreeMap) -> Self {
106        match self {
107            Self::AllowList(existing) => Self::AllowList(existing | allow_list),
108            Self::BlockList(block_list) => Self::BlockList(block_list - allow_list),
109        }
110    }
111
112    /// Convert a mask into an arrow array
113    ///
114    /// A row addr mask is not very arrow-compatible.  We can't make it a batch with
115    /// two columns because the block list and allow list will have different lengths.  Also,
116    /// there is no Arrow type for compressed bitmaps.
117    ///
118    /// However, we need to shove it into some kind of Arrow container to pass it along the
119    /// datafusion stream.  Perhaps, in the future, we can add row addr masks as first class
120    /// types in datafusion, and this can be passed along as a mask / selection vector.
121    ///
122    /// We serialize this as a variable length binary array with two items.  The first item
123    /// is the block list and the second item is the allow list.
124    pub fn into_arrow(&self) -> Result<BinaryArray> {
125        // NOTE: This serialization format must be stable as it is used in IPC.
126        let (block_list, allow_list) = match self {
127            Self::AllowList(allow_list) => (None, Some(allow_list)),
128            Self::BlockList(block_list) => (Some(block_list), None),
129        };
130
131        let block_list_length = block_list
132            .as_ref()
133            .map(|bl| bl.serialized_size())
134            .unwrap_or(0);
135        let allow_list_length = allow_list
136            .as_ref()
137            .map(|al| al.serialized_size())
138            .unwrap_or(0);
139        let lengths = vec![block_list_length, allow_list_length];
140        let offsets = OffsetBuffer::from_lengths(lengths);
141        let mut value_bytes = vec![0; block_list_length + allow_list_length];
142        let mut validity = vec![false, false];
143        if let Some(block_list) = &block_list {
144            validity[0] = true;
145            block_list.serialize_into(&mut value_bytes[0..])?;
146        }
147        if let Some(allow_list) = &allow_list {
148            validity[1] = true;
149            allow_list.serialize_into(&mut value_bytes[block_list_length..])?;
150        }
151        let values = Buffer::from(value_bytes);
152        let nulls = NullBuffer::from(validity);
153        Ok(BinaryArray::try_new(offsets, values, Some(nulls))?)
154    }
155
156    /// Deserialize a row address mask from Arrow
157    pub fn from_arrow(array: &GenericBinaryArray<i32>) -> Result<Self> {
158        let block_list = if array.is_null(0) {
159            None
160        } else {
161            Some(RowAddrTreeMap::deserialize_from(array.value(0)))
162        }
163        .transpose()?;
164
165        let allow_list = if array.is_null(1) {
166            None
167        } else {
168            Some(RowAddrTreeMap::deserialize_from(array.value(1)))
169        }
170        .transpose()?;
171
172        let res = match (block_list, allow_list) {
173            (Some(bl), None) => Self::BlockList(bl),
174            (None, Some(al)) => Self::AllowList(al),
175            (Some(block), Some(allow)) => Self::AllowList(allow).also_block(block),
176            (None, None) => Self::all_rows(),
177        };
178        Ok(res)
179    }
180
181    /// Return the maximum number of row addresses that could be selected by this mask
182    ///
183    /// Will be None if this is a BlockList (unbounded)
184    pub fn max_len(&self) -> Option<u64> {
185        match self {
186            Self::AllowList(selection) => selection.len(),
187            Self::BlockList(_) => None,
188        }
189    }
190
191    /// Iterate over the row addresses that are selected by the mask
192    ///
193    /// This is only possible if this is an AllowList and the maps don't contain
194    /// any "full fragment" blocks.
195    pub fn iter_addrs(&self) -> Option<Box<dyn Iterator<Item = RowAddress> + '_>> {
196        match self {
197            Self::AllowList(allow_list) => {
198                if let Some(allow_iter) = allow_list.row_addrs() {
199                    Some(Box::new(allow_iter))
200                } else {
201                    None
202                }
203            }
204            Self::BlockList(_) => None, // Can't iterate over block list
205        }
206    }
207}
208
209impl std::ops::Not for RowAddrMask {
210    type Output = Self;
211
212    fn not(self) -> Self::Output {
213        match self {
214            Self::AllowList(allow_list) => Self::BlockList(allow_list),
215            Self::BlockList(block_list) => Self::AllowList(block_list),
216        }
217    }
218}
219
220impl std::ops::BitAnd for RowAddrMask {
221    type Output = Self;
222
223    fn bitand(self, rhs: Self) -> Self::Output {
224        match (self, rhs) {
225            (Self::AllowList(a), Self::AllowList(b)) => Self::AllowList(a & b),
226            (Self::AllowList(allow), Self::BlockList(block))
227            | (Self::BlockList(block), Self::AllowList(allow)) => Self::AllowList(allow - block),
228            (Self::BlockList(a), Self::BlockList(b)) => Self::BlockList(a | b),
229        }
230    }
231}
232
233impl std::ops::BitOr for RowAddrMask {
234    type Output = Self;
235
236    fn bitor(self, rhs: Self) -> Self::Output {
237        match (self, rhs) {
238            (Self::AllowList(a), Self::AllowList(b)) => Self::AllowList(a | b),
239            (Self::AllowList(allow), Self::BlockList(block))
240            | (Self::BlockList(block), Self::AllowList(allow)) => Self::BlockList(block - allow),
241            (Self::BlockList(a), Self::BlockList(b)) => Self::BlockList(a & b),
242        }
243    }
244}
245
246/// Common operations over a set of rows (either row ids or row addresses).
247///
248/// The concrete representation can be address-based (`RowAddrTreeMap`) or
249/// id-based (for example a future `RowIdSet`), but the semantics are the same:
250/// a set of unique rows.
251pub trait RowSetOps: Clone + Sized {
252    /// Logical row handle (`u64` for both row ids and row addresses).
253    type Row;
254
255    /// Returns true if the set is empty.
256    fn is_empty(&self) -> bool;
257
258    /// Returns the number of rows in the set, if it is known.
259    ///
260    /// Implementations that cannot always compute an exact size (for example
261    /// because of "full fragment" markers) should return `None`.
262    fn len(&self) -> Option<u64>;
263
264    /// Remove a value from the row set.
265    fn remove(&mut self, row: Self::Row) -> bool;
266
267    /// Returns whether this set contains the given row.
268    fn contains(&self, row: Self::Row) -> bool;
269
270    /// Returns the union of `other` and init self.
271    fn union_all(other: &[&Self]) -> Self;
272
273    /// Builds a row set from an iterator of rows.
274    fn from_sorted_iter<I>(iter: I) -> Result<Self>
275    where
276        I: IntoIterator<Item = Self::Row>;
277}
278
279/// A collection of row addresses.
280///
281/// Note: For stable row id mode, this may be split into a separate structure in the future.
282///
283/// These row ids may either be stable-style (where they can be an incrementing
284/// u64 sequence) or address style, where they are a fragment id and a row offset.
285/// When address style, this supports setting entire fragments as selected,
286/// without needing to enumerate all the ids in the fragment.
287///
288/// This is similar to a [RoaringTreemap] but it is optimized for the case where
289/// entire fragments are selected or deselected.
290#[derive(Clone, Debug, Default, PartialEq, DeepSizeOf)]
291pub struct RowAddrTreeMap {
292    /// The contents of the set. If there is a pair (k, Full) then the entire
293    /// fragment k is selected. If there is a pair (k, Partial(v)) then the
294    /// fragment k has the selected rows in v.
295    inner: BTreeMap<u32, RowAddrSelection>,
296}
297
298#[derive(Clone, Debug, PartialEq)]
299pub enum RowAddrSelection {
300    Full,
301    Partial(RoaringBitmap),
302}
303
304impl DeepSizeOf for RowAddrSelection {
305    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
306        match self {
307            Self::Full => 0,
308            Self::Partial(bitmap) => bitmap.serialized_size(),
309        }
310    }
311}
312
313impl RowAddrSelection {
314    fn union_all(selections: &[&Self]) -> Self {
315        let mut is_full = false;
316
317        let res = Self::Partial(
318            selections
319                .iter()
320                .filter_map(|selection| match selection {
321                    Self::Full => {
322                        is_full = true;
323                        None
324                    }
325                    Self::Partial(bitmap) => Some(bitmap),
326                })
327                .union(),
328        );
329
330        if is_full { Self::Full } else { res }
331    }
332}
333
334impl RowSetOps for RowAddrTreeMap {
335    type Row = u64;
336
337    fn is_empty(&self) -> bool {
338        self.inner.is_empty()
339    }
340
341    fn len(&self) -> Option<u64> {
342        self.inner
343            .values()
344            .map(|row_addr_selection| match row_addr_selection {
345                RowAddrSelection::Full => None,
346                RowAddrSelection::Partial(indices) => Some(indices.len()),
347            })
348            .try_fold(0_u64, |acc, next| next.map(|next| next + acc))
349    }
350
351    fn remove(&mut self, row: Self::Row) -> bool {
352        let upper = (row >> 32) as u32;
353        let lower = row as u32;
354        match self.inner.get_mut(&upper) {
355            None => false,
356            Some(RowAddrSelection::Full) => {
357                let mut set = RoaringBitmap::full();
358                set.remove(lower);
359                self.inner.insert(upper, RowAddrSelection::Partial(set));
360                true
361            }
362            Some(RowAddrSelection::Partial(lower_set)) => {
363                let removed = lower_set.remove(lower);
364                if lower_set.is_empty() {
365                    self.inner.remove(&upper);
366                }
367                removed
368            }
369        }
370    }
371
372    fn contains(&self, row: Self::Row) -> bool {
373        let upper = (row >> 32) as u32;
374        let lower = row as u32;
375        match self.inner.get(&upper) {
376            None => false,
377            Some(RowAddrSelection::Full) => true,
378            Some(RowAddrSelection::Partial(fragment_set)) => fragment_set.contains(lower),
379        }
380    }
381
382    fn union_all(other: &[&Self]) -> Self {
383        let mut new_map = BTreeMap::new();
384
385        for map in other {
386            for (fragment, selection) in &map.inner {
387                new_map
388                    .entry(fragment)
389                    // I hate this allocation, but I can't think of a better way
390                    .or_insert_with(|| Vec::with_capacity(other.len()))
391                    .push(selection);
392            }
393        }
394
395        let new_map = new_map
396            .into_iter()
397            .map(|(&fragment, selections)| (fragment, RowAddrSelection::union_all(&selections)))
398            .collect();
399
400        Self { inner: new_map }
401    }
402
403    #[track_caller]
404    fn from_sorted_iter<I>(iter: I) -> Result<Self>
405    where
406        I: IntoIterator<Item = Self::Row>,
407    {
408        let mut iter = iter.into_iter().peekable();
409        let mut inner = BTreeMap::new();
410
411        while let Some(row_id) = iter.peek() {
412            let fragment_id = (row_id >> 32) as u32;
413            let next_bitmap_iter = iter
414                .peeking_take_while(|row_id| (row_id >> 32) as u32 == fragment_id)
415                .map(|row_id| row_id as u32);
416            let Ok(bitmap) = RoaringBitmap::from_sorted_iter(next_bitmap_iter) else {
417                return Err(Error::internal(
418                    "RowAddrTreeMap::from_sorted_iter called with non-sorted input",
419                ));
420            };
421            inner.insert(fragment_id, RowAddrSelection::Partial(bitmap));
422        }
423
424        Ok(Self { inner })
425    }
426}
427
428impl RowAddrTreeMap {
429    /// Create an empty set
430    pub fn new() -> Self {
431        Self::default()
432    }
433
434    /// An iterator of row addrs
435    ///
436    /// If there are any "full fragment" items then this can't be calculated and None
437    /// is returned
438    pub fn row_addrs(&self) -> Option<impl Iterator<Item = RowAddress> + '_> {
439        let inner_iters = self
440            .inner
441            .iter()
442            .filter_map(|(frag_id, row_addr_selection)| match row_addr_selection {
443                RowAddrSelection::Full => None,
444                RowAddrSelection::Partial(bitmap) => Some(
445                    bitmap
446                        .iter()
447                        .map(|row_offset| RowAddress::new_from_parts(*frag_id, row_offset)),
448                ),
449            })
450            .collect::<Vec<_>>();
451        if inner_iters.len() != self.inner.len() {
452            None
453        } else {
454            Some(inner_iters.into_iter().flatten())
455        }
456    }
457
458    /// Insert a single value into the set
459    ///
460    /// Returns true if the value was not already in the set.
461    ///
462    /// ```rust
463    /// use lance_core::utils::mask::{RowAddrTreeMap, RowSetOps};
464    ///
465    /// let mut set = RowAddrTreeMap::new();
466    /// assert_eq!(set.insert(10), true);
467    /// assert_eq!(set.insert(10), false);
468    /// assert_eq!(set.contains(10), true);
469    /// ```
470    pub fn insert(&mut self, value: u64) -> bool {
471        let fragment = (value >> 32) as u32;
472        let row_addr = value as u32;
473        match self.inner.get_mut(&fragment) {
474            None => {
475                let mut set = RoaringBitmap::new();
476                set.insert(row_addr);
477                self.inner.insert(fragment, RowAddrSelection::Partial(set));
478                true
479            }
480            Some(RowAddrSelection::Full) => false,
481            Some(RowAddrSelection::Partial(set)) => set.insert(row_addr),
482        }
483    }
484
485    /// Insert a range of values into the set
486    pub fn insert_range<R: RangeBounds<u64>>(&mut self, range: R) -> u64 {
487        // Separate the start and end into high and low bits.
488        let (mut start_high, mut start_low) = match range.start_bound() {
489            std::ops::Bound::Included(&start) => ((start >> 32) as u32, start as u32),
490            std::ops::Bound::Excluded(&start) => {
491                let start = start.saturating_add(1);
492                ((start >> 32) as u32, start as u32)
493            }
494            std::ops::Bound::Unbounded => (0, 0),
495        };
496
497        let (end_high, end_low) = match range.end_bound() {
498            std::ops::Bound::Included(&end) => ((end >> 32) as u32, end as u32),
499            std::ops::Bound::Excluded(&end) => {
500                let end = end.saturating_sub(1);
501                ((end >> 32) as u32, end as u32)
502            }
503            std::ops::Bound::Unbounded => (u32::MAX, u32::MAX),
504        };
505
506        let mut count = 0;
507
508        while start_high <= end_high {
509            let start = start_low;
510            let end = if start_high == end_high {
511                end_low
512            } else {
513                u32::MAX
514            };
515            let fragment = start_high;
516            match self.inner.get_mut(&fragment) {
517                None => {
518                    let mut set = RoaringBitmap::new();
519                    count += set.insert_range(start..=end);
520                    self.inner.insert(fragment, RowAddrSelection::Partial(set));
521                }
522                Some(RowAddrSelection::Full) => {}
523                Some(RowAddrSelection::Partial(set)) => {
524                    count += set.insert_range(start..=end);
525                }
526            }
527            start_high += 1;
528            start_low = 0;
529        }
530
531        count
532    }
533
534    /// Add a bitmap for a single fragment
535    pub fn insert_bitmap(&mut self, fragment: u32, bitmap: RoaringBitmap) {
536        self.inner
537            .insert(fragment, RowAddrSelection::Partial(bitmap));
538    }
539
540    /// Add a whole fragment to the set
541    pub fn insert_fragment(&mut self, fragment_id: u32) {
542        self.inner.insert(fragment_id, RowAddrSelection::Full);
543    }
544
545    pub fn get_fragment_bitmap(&self, fragment_id: u32) -> Option<&RoaringBitmap> {
546        match self.inner.get(&fragment_id) {
547            None => None,
548            Some(RowAddrSelection::Full) => None,
549            Some(RowAddrSelection::Partial(set)) => Some(set),
550        }
551    }
552
553    /// Get the selection for a fragment
554    pub fn get(&self, fragment_id: &u32) -> Option<&RowAddrSelection> {
555        self.inner.get(fragment_id)
556    }
557
558    /// Iterate over (fragment_id, selection) pairs
559    pub fn iter(&self) -> impl Iterator<Item = (&u32, &RowAddrSelection)> {
560        self.inner.iter()
561    }
562
563    pub fn retain_fragments(&mut self, frag_ids: impl IntoIterator<Item = u32>) {
564        let frag_id_set = frag_ids.into_iter().collect::<HashSet<_>>();
565        self.inner
566            .retain(|frag_id, _| frag_id_set.contains(frag_id));
567    }
568
569    /// Compute the serialized size of the set.
570    pub fn serialized_size(&self) -> usize {
571        // Starts at 4 because of the u32 num_entries
572        let mut size = 4;
573        for set in self.inner.values() {
574            // Each entry is 8 bytes for the fragment id and the bitmap size
575            size += 8;
576            if let RowAddrSelection::Partial(set) = set {
577                size += set.serialized_size();
578            }
579        }
580        size
581    }
582
583    /// Serialize the set into the given buffer
584    ///
585    /// The serialization format is stable and used for index serialization
586    ///
587    /// The serialization format is:
588    /// * u32: num_entries
589    ///
590    /// for each entry:
591    ///   * u32: fragment_id
592    ///   * u32: bitmap size
593    ///   * \[u8\]: bitmap
594    ///
595    /// If bitmap size is zero then the entire fragment is selected.
596    pub fn serialize_into<W: Write>(&self, mut writer: W) -> Result<()> {
597        writer.write_u32::<byteorder::LittleEndian>(self.inner.len() as u32)?;
598        for (fragment, set) in &self.inner {
599            writer.write_u32::<byteorder::LittleEndian>(*fragment)?;
600            if let RowAddrSelection::Partial(set) = set {
601                writer.write_u32::<byteorder::LittleEndian>(set.serialized_size() as u32)?;
602                set.serialize_into(&mut writer)?;
603            } else {
604                writer.write_u32::<byteorder::LittleEndian>(0)?;
605            }
606        }
607        Ok(())
608    }
609
610    /// Deserialize the set from the given buffer
611    pub fn deserialize_from<R: Read>(mut reader: R) -> Result<Self> {
612        let num_entries = reader.read_u32::<byteorder::LittleEndian>()?;
613        let mut inner = BTreeMap::new();
614        for _ in 0..num_entries {
615            let fragment = reader.read_u32::<byteorder::LittleEndian>()?;
616            let bitmap_size = reader.read_u32::<byteorder::LittleEndian>()?;
617            if bitmap_size == 0 {
618                inner.insert(fragment, RowAddrSelection::Full);
619            } else {
620                let mut buffer = vec![0; bitmap_size as usize];
621                reader.read_exact(&mut buffer)?;
622                let set = RoaringBitmap::deserialize_from(&buffer[..])?;
623                inner.insert(fragment, RowAddrSelection::Partial(set));
624            }
625        }
626        Ok(Self { inner })
627    }
628
629    /// Apply a mask to the row addrs
630    ///
631    /// For AllowList: only keep rows that are in the selection and not null
632    /// For BlockList: remove rows that are blocked (not null) and remove nulls
633    pub fn mask(&mut self, mask: &RowAddrMask) {
634        match mask {
635            RowAddrMask::AllowList(allow_list) => {
636                *self &= allow_list;
637            }
638            RowAddrMask::BlockList(block_list) => {
639                *self -= block_list;
640            }
641        }
642    }
643
644    /// Convert the set into an iterator of row addrs
645    ///
646    /// # Safety
647    ///
648    /// This is unsafe because if any of the inner RowAddrSelection elements
649    /// is not a Partial then the iterator will panic because we don't know
650    /// the size of the bitmap.
651    pub unsafe fn into_addr_iter(self) -> impl Iterator<Item = u64> {
652        self.inner
653            .into_iter()
654            .flat_map(|(fragment, selection)| match selection {
655                RowAddrSelection::Full => panic!("Size of full fragment is unknown"),
656                RowAddrSelection::Partial(bitmap) => bitmap.into_iter().map(move |val| {
657                    let fragment = fragment as u64;
658                    let row_offset = val as u64;
659                    (fragment << 32) | row_offset
660                }),
661            })
662    }
663}
664
665impl CacheCodecImpl for RowAddrTreeMap {
666    fn serialize(&self, writer: &mut dyn Write) -> Result<()> {
667        self.serialize_into(writer)
668    }
669
670    fn deserialize(data: &bytes::Bytes) -> Result<Self> {
671        Self::deserialize_from(data.as_ref())
672    }
673}
674
675impl std::ops::BitOr<Self> for RowAddrTreeMap {
676    type Output = Self;
677
678    fn bitor(mut self, rhs: Self) -> Self::Output {
679        self |= rhs;
680        self
681    }
682}
683
684impl std::ops::BitOr<&Self> for RowAddrTreeMap {
685    type Output = Self;
686
687    fn bitor(mut self, rhs: &Self) -> Self::Output {
688        self |= rhs;
689        self
690    }
691}
692
693impl std::ops::BitOrAssign<Self> for RowAddrTreeMap {
694    fn bitor_assign(&mut self, rhs: Self) {
695        *self |= &rhs;
696    }
697}
698
699impl std::ops::BitOrAssign<&Self> for RowAddrTreeMap {
700    fn bitor_assign(&mut self, rhs: &Self) {
701        for (fragment, rhs_set) in &rhs.inner {
702            let lhs_set = self.inner.get_mut(fragment);
703            if let Some(lhs_set) = lhs_set {
704                match lhs_set {
705                    RowAddrSelection::Full => {
706                        // If the fragment is already selected then there is nothing to do
707                    }
708                    RowAddrSelection::Partial(lhs_bitmap) => match rhs_set {
709                        RowAddrSelection::Full => {
710                            *lhs_set = RowAddrSelection::Full;
711                        }
712                        RowAddrSelection::Partial(rhs_set) => {
713                            *lhs_bitmap |= rhs_set;
714                        }
715                    },
716                }
717            } else {
718                self.inner.insert(*fragment, rhs_set.clone());
719            }
720        }
721    }
722}
723
724impl std::ops::BitAnd<Self> for RowAddrTreeMap {
725    type Output = Self;
726
727    fn bitand(mut self, rhs: Self) -> Self::Output {
728        self &= &rhs;
729        self
730    }
731}
732
733impl std::ops::BitAnd<&Self> for RowAddrTreeMap {
734    type Output = Self;
735
736    fn bitand(mut self, rhs: &Self) -> Self::Output {
737        self &= rhs;
738        self
739    }
740}
741
742impl std::ops::BitAndAssign<Self> for RowAddrTreeMap {
743    fn bitand_assign(&mut self, rhs: Self) {
744        *self &= &rhs;
745    }
746}
747
748impl std::ops::BitAndAssign<&Self> for RowAddrTreeMap {
749    fn bitand_assign(&mut self, rhs: &Self) {
750        // Remove fragment that aren't on the RHS
751        self.inner
752            .retain(|fragment, _| rhs.inner.contains_key(fragment));
753
754        // For fragments that are on the RHS, intersect the bitmaps
755        for (fragment, mut lhs_set) in &mut self.inner {
756            match (&mut lhs_set, rhs.inner.get(fragment)) {
757                (_, None) => {} // Already handled by retain
758                (_, Some(RowAddrSelection::Full)) => {
759                    // Everything selected on RHS, so can leave LHS untouched.
760                }
761                (RowAddrSelection::Partial(lhs_set), Some(RowAddrSelection::Partial(rhs_set))) => {
762                    *lhs_set &= rhs_set;
763                }
764                (RowAddrSelection::Full, Some(RowAddrSelection::Partial(rhs_set))) => {
765                    *lhs_set = RowAddrSelection::Partial(rhs_set.clone());
766                }
767            }
768        }
769        // Some bitmaps might now be empty. If they are, we should remove them.
770        self.inner.retain(|_, set| match set {
771            RowAddrSelection::Partial(set) => !set.is_empty(),
772            RowAddrSelection::Full => true,
773        });
774    }
775}
776
777impl std::ops::Sub<Self> for RowAddrTreeMap {
778    type Output = Self;
779
780    fn sub(mut self, rhs: Self) -> Self {
781        self -= &rhs;
782        self
783    }
784}
785
786impl std::ops::Sub<&Self> for RowAddrTreeMap {
787    type Output = Self;
788
789    fn sub(mut self, rhs: &Self) -> Self {
790        self -= rhs;
791        self
792    }
793}
794
795impl std::ops::SubAssign<&Self> for RowAddrTreeMap {
796    fn sub_assign(&mut self, rhs: &Self) {
797        for (fragment, rhs_set) in &rhs.inner {
798            match self.inner.get_mut(fragment) {
799                None => {}
800                Some(RowAddrSelection::Full) => {
801                    // If the fragment is already selected then there is nothing to do
802                    match rhs_set {
803                        RowAddrSelection::Full => {
804                            self.inner.remove(fragment);
805                        }
806                        RowAddrSelection::Partial(rhs_set) => {
807                            // This generally won't be hit.
808                            let mut set = RoaringBitmap::full();
809                            set -= rhs_set;
810                            self.inner.insert(*fragment, RowAddrSelection::Partial(set));
811                        }
812                    }
813                }
814                Some(RowAddrSelection::Partial(lhs_set)) => match rhs_set {
815                    RowAddrSelection::Full => {
816                        self.inner.remove(fragment);
817                    }
818                    RowAddrSelection::Partial(rhs_set) => {
819                        *lhs_set -= rhs_set;
820                        if lhs_set.is_empty() {
821                            self.inner.remove(fragment);
822                        }
823                    }
824                },
825            }
826        }
827    }
828}
829
830impl FromIterator<u64> for RowAddrTreeMap {
831    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
832        let mut inner = BTreeMap::new();
833        for row_addr in iter {
834            let upper = (row_addr >> 32) as u32;
835            let lower = row_addr as u32;
836            match inner.get_mut(&upper) {
837                None => {
838                    let mut set = RoaringBitmap::new();
839                    set.insert(lower);
840                    inner.insert(upper, RowAddrSelection::Partial(set));
841                }
842                Some(RowAddrSelection::Full) => {
843                    // If the fragment is already selected then there is nothing to do
844                }
845                Some(RowAddrSelection::Partial(set)) => {
846                    set.insert(lower);
847                }
848            }
849        }
850        Self { inner }
851    }
852}
853
854impl<'a> FromIterator<&'a u64> for RowAddrTreeMap {
855    fn from_iter<T: IntoIterator<Item = &'a u64>>(iter: T) -> Self {
856        Self::from_iter(iter.into_iter().copied())
857    }
858}
859
860impl From<Range<u64>> for RowAddrTreeMap {
861    fn from(range: Range<u64>) -> Self {
862        let mut map = Self::default();
863        map.insert_range(range);
864        map
865    }
866}
867
868impl From<RangeInclusive<u64>> for RowAddrTreeMap {
869    fn from(range: RangeInclusive<u64>) -> Self {
870        let mut map = Self::default();
871        map.insert_range(range);
872        map
873    }
874}
875
876impl From<RoaringTreemap> for RowAddrTreeMap {
877    fn from(roaring: RoaringTreemap) -> Self {
878        let mut inner = BTreeMap::new();
879        for (fragment, set) in roaring.bitmaps() {
880            inner.insert(fragment, RowAddrSelection::Partial(set.clone()));
881        }
882        Self { inner }
883    }
884}
885
886impl Extend<u64> for RowAddrTreeMap {
887    fn extend<T: IntoIterator<Item = u64>>(&mut self, iter: T) {
888        for row_addr in iter {
889            let upper = (row_addr >> 32) as u32;
890            let lower = row_addr as u32;
891            match self.inner.get_mut(&upper) {
892                None => {
893                    let mut set = RoaringBitmap::new();
894                    set.insert(lower);
895                    self.inner.insert(upper, RowAddrSelection::Partial(set));
896                }
897                Some(RowAddrSelection::Full) => {
898                    // If the fragment is already selected then there is nothing to do
899                }
900                Some(RowAddrSelection::Partial(set)) => {
901                    set.insert(lower);
902                }
903            }
904        }
905    }
906}
907
908impl<'a> Extend<&'a u64> for RowAddrTreeMap {
909    fn extend<T: IntoIterator<Item = &'a u64>>(&mut self, iter: T) {
910        self.extend(iter.into_iter().copied())
911    }
912}
913
914// Extending with RowAddrTreeMap is basically a cumulative set union
915impl Extend<Self> for RowAddrTreeMap {
916    fn extend<T: IntoIterator<Item = Self>>(&mut self, iter: T) {
917        for other in iter {
918            for (fragment, set) in other.inner {
919                match self.inner.get_mut(&fragment) {
920                    None => {
921                        self.inner.insert(fragment, set);
922                    }
923                    Some(RowAddrSelection::Full) => {
924                        // If the fragment is already selected then there is nothing to do
925                    }
926                    Some(RowAddrSelection::Partial(lhs_set)) => match set {
927                        RowAddrSelection::Full => {
928                            self.inner.insert(fragment, RowAddrSelection::Full);
929                        }
930                        RowAddrSelection::Partial(rhs_set) => {
931                            *lhs_set |= rhs_set;
932                        }
933                    },
934                }
935            }
936        }
937    }
938}
939
940pub fn bitmap_to_ranges(bitmap: &RoaringBitmap) -> Vec<Range<u64>> {
941    let mut ranges = Vec::new();
942    let mut iter = bitmap.iter();
943    while let Some(r) = iter.next_range() {
944        ranges.push(*r.start() as u64..(*r.end() as u64 + 1));
945    }
946    ranges
947}
948
949pub fn ranges_to_bitmap(ranges: &[Range<u64>], sorted: bool) -> RoaringBitmap {
950    if ranges.is_empty() {
951        return RoaringBitmap::new();
952    }
953    if sorted {
954        let sample_size = ranges.len().min(10);
955        let avg_len: u64 = ranges
956            .iter()
957            .take(sample_size)
958            .map(|r| r.end - r.start)
959            .sum::<u64>()
960            / sample_size as u64;
961        // from_sorted_iter appends each value in O(1) but must visit every u32.
962        // insert_range bulk-fills containers but does a binary search per call.
963        // Crossover is ~6: below that, iterating all values is cheaper.
964        if avg_len <= 6 {
965            return RoaringBitmap::from_sorted_iter(
966                ranges.iter().flat_map(|r| r.start as u32..r.end as u32),
967            )
968            .unwrap();
969        }
970    }
971    let mut bm = RoaringBitmap::new();
972    for r in ranges {
973        bm.insert_range(r.start as u32..r.end as u32);
974    }
975    bm
976}
977
978/// A set of stable row ids backed by a 64-bit Roaring bitmap.
979///
980/// This is a thin wrapper around [`RoaringTreemap`]. It represents a
981/// collection of unique row ids and provides the common row-set
982/// operations defined by [`RowSetOps`].
983#[derive(Clone, Debug, Default, PartialEq)]
984pub struct RowIdSet {
985    inner: RoaringTreemap,
986}
987
988impl RowIdSet {
989    /// Creates an empty set of row ids.
990    pub fn new() -> Self {
991        Self::default()
992    }
993    /// Returns an iterator over the contained row ids in ascending order.
994    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
995        self.inner.iter()
996    }
997    /// Returns the union of `self` and `other`.
998    pub fn union(mut self, other: &Self) -> Self {
999        self.inner |= &other.inner;
1000        self
1001    }
1002    /// Returns the set difference `self \\ other`.
1003    pub fn difference(mut self, other: &Self) -> Self {
1004        self.inner -= &other.inner;
1005        self
1006    }
1007}
1008
1009impl RowSetOps for RowIdSet {
1010    type Row = u64;
1011    fn is_empty(&self) -> bool {
1012        self.inner.is_empty()
1013    }
1014    fn len(&self) -> Option<u64> {
1015        Some(self.inner.len())
1016    }
1017    fn remove(&mut self, row: Self::Row) -> bool {
1018        self.inner.remove(row)
1019    }
1020    fn contains(&self, row: Self::Row) -> bool {
1021        self.inner.contains(row)
1022    }
1023    fn union_all(other: &[&Self]) -> Self {
1024        let mut result = other
1025            .first()
1026            .map_or(Self::default(), |&first| first.clone());
1027        for set in other {
1028            result.inner |= &set.inner;
1029        }
1030        result
1031    }
1032    #[track_caller]
1033    fn from_sorted_iter<I>(iter: I) -> Result<Self>
1034    where
1035        I: IntoIterator<Item = Self::Row>,
1036    {
1037        let mut inner = RoaringTreemap::new();
1038        let mut last: Option<u64> = None;
1039        for value in iter {
1040            if let Some(prev) = last
1041                && value < prev
1042            {
1043                return Err(Error::internal(
1044                    "RowIdSet::from_sorted_iter called with non-sorted input",
1045                ));
1046            }
1047            inner.insert(value);
1048            last = Some(value);
1049        }
1050        Ok(Self { inner })
1051    }
1052}
1053
1054/// A mask over stable row ids based on an allow-list or block-list.
1055///
1056/// The semantics mirror [`RowAddrMask`], but operate on stable
1057/// row ids instead of physical row addresses.
1058#[derive(Clone, Debug, PartialEq)]
1059pub enum RowIdMask {
1060    /// Only the ids in the set are selected.
1061    AllowList(RowIdSet),
1062    /// All ids are selected except those in the set.
1063    BlockList(RowIdSet),
1064}
1065
1066impl Default for RowIdMask {
1067    fn default() -> Self {
1068        // Empty block list means all rows are allowed
1069        Self::BlockList(RowIdSet::default())
1070    }
1071}
1072impl RowIdMask {
1073    /// Create a mask allowing all rows, this is an alias for [`Default`].
1074    pub fn all_rows() -> Self {
1075        Self::default()
1076    }
1077    /// Create a mask that doesn't allow any row id.
1078    pub fn allow_nothing() -> Self {
1079        Self::AllowList(RowIdSet::default())
1080    }
1081    /// Create a mask from an allow list.
1082    pub fn from_allowed(allow_list: RowIdSet) -> Self {
1083        Self::AllowList(allow_list)
1084    }
1085    /// Create a mask from a block list.
1086    pub fn from_block(block_list: RowIdSet) -> Self {
1087        Self::BlockList(block_list)
1088    }
1089    /// True if the row id is selected by the mask, false otherwise.
1090    pub fn selected(&self, row_id: u64) -> bool {
1091        match self {
1092            Self::AllowList(allow_list) => allow_list.contains(row_id),
1093            Self::BlockList(block_list) => !block_list.contains(row_id),
1094        }
1095    }
1096    /// Return the indices of the input row ids that are selected by the mask.
1097    pub fn selected_indices<'a>(&self, row_ids: impl Iterator<Item = &'a u64> + 'a) -> Vec<u64> {
1098        row_ids
1099            .enumerate()
1100            .filter_map(|(idx, row_id)| {
1101                if self.selected(*row_id) {
1102                    Some(idx as u64)
1103                } else {
1104                    None
1105                }
1106            })
1107            .collect()
1108    }
1109    /// Also block the given ids.
1110    ///
1111    /// * `AllowList(a)` -> `AllowList(a \\ block_list)`
1112    /// * `BlockList(b)` -> `BlockList(b union block_list)`
1113    pub fn also_block(self, block_list: RowIdSet) -> Self {
1114        match self {
1115            Self::AllowList(allow_list) => Self::AllowList(allow_list.difference(&block_list)),
1116            Self::BlockList(existing) => Self::BlockList(existing.union(&block_list)),
1117        }
1118    }
1119    /// Also allow the given ids.
1120    ///
1121    /// * `AllowList(a)` -> `AllowList(a union allow_list)`
1122    /// * `BlockList(b)` -> `BlockList(b \\ allow_list)`
1123    pub fn also_allow(self, allow_list: RowIdSet) -> Self {
1124        match self {
1125            Self::AllowList(existing) => Self::AllowList(existing.union(&allow_list)),
1126            Self::BlockList(block_list) => Self::BlockList(block_list.difference(&allow_list)),
1127        }
1128    }
1129    /// Return the maximum number of row ids that could be selected by this mask.
1130    ///
1131    /// Will be `None` if this is a `BlockList` (unbounded).
1132    pub fn max_len(&self) -> Option<u64> {
1133        match self {
1134            Self::AllowList(selection) => selection.len(),
1135            Self::BlockList(_) => None,
1136        }
1137    }
1138    /// Iterate over the row ids that are selected by the mask.
1139    ///
1140    /// This is only possible if this is an `AllowList`. For a `BlockList`
1141    /// the domain of possible row ids is unbounded.
1142    pub fn iter_ids(&self) -> Option<Box<dyn Iterator<Item = u64> + '_>> {
1143        match self {
1144            Self::AllowList(allow_list) => Some(Box::new(allow_list.iter())),
1145            Self::BlockList(_) => None,
1146        }
1147    }
1148}
1149
1150#[cfg(test)]
1151mod tests {
1152    use super::*;
1153    use proptest::{prop_assert, prop_assert_eq};
1154
1155    fn rows(ids: &[u64]) -> RowAddrTreeMap {
1156        RowAddrTreeMap::from_iter(ids)
1157    }
1158
1159    fn assert_mask_selects(mask: &RowAddrMask, selected: &[u64], not_selected: &[u64]) {
1160        for &id in selected {
1161            assert!(mask.selected(id), "Expected row {} to be selected", id);
1162        }
1163        for &id in not_selected {
1164            assert!(!mask.selected(id), "Expected row {} to NOT be selected", id);
1165        }
1166    }
1167
1168    fn selected_in_range(mask: &RowAddrMask, range: std::ops::Range<u64>) -> Vec<u64> {
1169        range.filter(|val| mask.selected(*val)).collect()
1170    }
1171
1172    #[test]
1173    fn test_row_addr_mask_construction() {
1174        let full_mask = RowAddrMask::all_rows();
1175        assert_eq!(full_mask.max_len(), None);
1176        assert_mask_selects(&full_mask, &[0, 1, 4 << 32 | 3], &[]);
1177        assert_eq!(full_mask.allow_list(), None);
1178        assert_eq!(full_mask.block_list(), Some(&RowAddrTreeMap::default()));
1179        assert!(full_mask.iter_addrs().is_none());
1180
1181        let empty_mask = RowAddrMask::allow_nothing();
1182        assert_eq!(empty_mask.max_len(), Some(0));
1183        assert_mask_selects(&empty_mask, &[], &[0, 1, 4 << 32 | 3]);
1184        assert_eq!(empty_mask.allow_list(), Some(&RowAddrTreeMap::default()));
1185        assert_eq!(empty_mask.block_list(), None);
1186        let iter = empty_mask.iter_addrs();
1187        assert!(iter.is_some());
1188        assert_eq!(iter.unwrap().count(), 0);
1189
1190        let allow_list = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1191        assert_eq!(allow_list.max_len(), Some(3));
1192        assert_mask_selects(&allow_list, &[10, 20, 30], &[0, 15, 25, 40]);
1193        assert_eq!(allow_list.allow_list(), Some(&rows(&[10, 20, 30])));
1194        assert_eq!(allow_list.block_list(), None);
1195        let iter = allow_list.iter_addrs();
1196        assert!(iter.is_some());
1197        let ids: Vec<u64> = iter.unwrap().map(|addr| addr.into()).collect();
1198        assert_eq!(ids, vec![10, 20, 30]);
1199
1200        let mut full_frag = RowAddrTreeMap::default();
1201        full_frag.insert_fragment(2);
1202        let allow_list = RowAddrMask::from_allowed(full_frag);
1203        assert_eq!(allow_list.max_len(), None);
1204        assert_mask_selects(&allow_list, &[(2 << 32) + 5], &[(3 << 32) + 5]);
1205        assert!(allow_list.iter_addrs().is_none());
1206    }
1207
1208    #[test]
1209    fn test_selected_indices() {
1210        // Allow list
1211        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 40]));
1212        assert!(mask.selected_indices(std::iter::empty()).is_empty());
1213        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[1, 3]);
1214
1215        // Block list
1216        let mask = RowAddrMask::from_block(rows(&[10, 20, 40]));
1217        assert!(mask.selected_indices(std::iter::empty()).is_empty());
1218        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[0, 2]);
1219    }
1220
1221    #[test]
1222    fn test_also_allow() {
1223        // Allow list
1224        let mask = RowAddrMask::from_allowed(rows(&[10, 20]));
1225        let new_mask = mask.also_allow(rows(&[20, 30, 40]));
1226        assert_eq!(new_mask, RowAddrMask::from_allowed(rows(&[10, 20, 30, 40])));
1227
1228        // Block list
1229        let mask = RowAddrMask::from_block(rows(&[10, 20, 30]));
1230        let new_mask = mask.also_allow(rows(&[20, 40]));
1231        assert_eq!(new_mask, RowAddrMask::from_block(rows(&[10, 30])));
1232    }
1233
1234    #[test]
1235    fn test_also_block() {
1236        // Allow list
1237        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1238        let new_mask = mask.also_block(rows(&[20, 40]));
1239        assert_eq!(new_mask, RowAddrMask::from_allowed(rows(&[10, 30])));
1240
1241        // Block list
1242        let mask = RowAddrMask::from_block(rows(&[10, 20]));
1243        let new_mask = mask.also_block(rows(&[20, 30, 40]));
1244        assert_eq!(new_mask, RowAddrMask::from_block(rows(&[10, 20, 30, 40])));
1245    }
1246
1247    #[test]
1248    fn test_iter_ids() {
1249        // Allow list
1250        let mask = RowAddrMask::from_allowed(rows(&[10, 20, 30]));
1251        let expected: Vec<_> = [10, 20, 30].into_iter().map(RowAddress::from).collect();
1252        assert_eq!(mask.iter_addrs().unwrap().collect::<Vec<_>>(), expected);
1253
1254        // Allow list with full fragment
1255        let mut inner = RowAddrTreeMap::default();
1256        inner.insert_fragment(10);
1257        let mask = RowAddrMask::from_allowed(inner);
1258        assert!(mask.iter_addrs().is_none());
1259
1260        // Block list
1261        let mask = RowAddrMask::from_block(rows(&[10, 20, 30]));
1262        assert!(mask.iter_addrs().is_none());
1263    }
1264
1265    #[test]
1266    fn test_row_addr_mask_not() {
1267        let allow_list = RowAddrMask::from_allowed(rows(&[1, 2, 3]));
1268        let block_list = !allow_list.clone();
1269        assert_eq!(block_list, RowAddrMask::from_block(rows(&[1, 2, 3])));
1270        // Can roundtrip by negating again
1271        assert_eq!(!block_list, allow_list);
1272    }
1273
1274    #[test]
1275    fn test_ops() {
1276        let mask = RowAddrMask::default();
1277        assert_mask_selects(&mask, &[1, 5], &[]);
1278
1279        let block_list = mask.also_block(rows(&[0, 5, 15]));
1280        assert_mask_selects(&block_list, &[1], &[5]);
1281
1282        let allow_list = RowAddrMask::from_allowed(rows(&[0, 2, 5]));
1283        assert_mask_selects(&allow_list, &[5], &[1]);
1284
1285        let combined = block_list & allow_list;
1286        assert_mask_selects(&combined, &[2], &[0, 5]);
1287
1288        let other = RowAddrMask::from_allowed(rows(&[3]));
1289        let combined = combined | other;
1290        assert_mask_selects(&combined, &[2, 3], &[0, 5]);
1291
1292        let block_list = RowAddrMask::from_block(rows(&[0]));
1293        let allow_list = RowAddrMask::from_allowed(rows(&[3]));
1294
1295        let combined = block_list | allow_list;
1296        assert_mask_selects(&combined, &[1], &[]);
1297    }
1298
1299    #[test]
1300    fn test_logical_and() {
1301        let allow1 = RowAddrMask::from_allowed(rows(&[0, 1]));
1302        let block1 = RowAddrMask::from_block(rows(&[1, 2]));
1303        let allow2 = RowAddrMask::from_allowed(rows(&[1, 2, 3, 4]));
1304        let block2 = RowAddrMask::from_block(rows(&[3, 4]));
1305
1306        fn check(lhs: &RowAddrMask, rhs: &RowAddrMask, expected: &[u64]) {
1307            for mask in [lhs.clone() & rhs.clone(), rhs.clone() & lhs.clone()] {
1308                assert_eq!(selected_in_range(&mask, 0..10), expected);
1309            }
1310        }
1311
1312        // Allow & Allow
1313        check(&allow1, &allow1, &[0, 1]);
1314        check(&allow1, &allow2, &[1]);
1315
1316        // Block & Block
1317        check(&block1, &block1, &[0, 3, 4, 5, 6, 7, 8, 9]);
1318        check(&block1, &block2, &[0, 5, 6, 7, 8, 9]);
1319
1320        // Allow & Block
1321        check(&allow1, &block1, &[0]);
1322        check(&allow1, &block2, &[0, 1]);
1323        check(&allow2, &block1, &[3, 4]);
1324        check(&allow2, &block2, &[1, 2]);
1325    }
1326
1327    #[test]
1328    fn test_logical_or() {
1329        let allow1 = RowAddrMask::from_allowed(rows(&[5, 6, 7, 8, 9]));
1330        let block1 = RowAddrMask::from_block(rows(&[5, 6]));
1331        let mixed1 = allow1.clone().also_block(rows(&[5, 6]));
1332        let allow2 = RowAddrMask::from_allowed(rows(&[2, 3, 4, 5, 6, 7, 8]));
1333        let block2 = RowAddrMask::from_block(rows(&[4, 5]));
1334        let mixed2 = allow2.clone().also_block(rows(&[4, 5]));
1335
1336        fn check(lhs: &RowAddrMask, rhs: &RowAddrMask, expected: &[u64]) {
1337            for mask in [lhs.clone() | rhs.clone(), rhs.clone() | lhs.clone()] {
1338                assert_eq!(selected_in_range(&mask, 0..10), expected);
1339            }
1340        }
1341
1342        check(&allow1, &allow1, &[5, 6, 7, 8, 9]);
1343        check(&block1, &block1, &[0, 1, 2, 3, 4, 7, 8, 9]);
1344        check(&mixed1, &mixed1, &[7, 8, 9]);
1345        check(&allow2, &allow2, &[2, 3, 4, 5, 6, 7, 8]);
1346        check(&block2, &block2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1347        check(&mixed2, &mixed2, &[2, 3, 6, 7, 8]);
1348
1349        check(&allow1, &block1, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1350        check(&allow1, &mixed1, &[5, 6, 7, 8, 9]);
1351        check(&allow1, &allow2, &[2, 3, 4, 5, 6, 7, 8, 9]);
1352        check(&allow1, &block2, &[0, 1, 2, 3, 5, 6, 7, 8, 9]);
1353        check(&allow1, &mixed2, &[2, 3, 5, 6, 7, 8, 9]);
1354        check(&block1, &mixed1, &[0, 1, 2, 3, 4, 7, 8, 9]);
1355        check(&block1, &allow2, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1356        check(&block1, &block2, &[0, 1, 2, 3, 4, 6, 7, 8, 9]);
1357        check(&block1, &mixed2, &[0, 1, 2, 3, 4, 6, 7, 8, 9]);
1358        check(&mixed1, &allow2, &[2, 3, 4, 5, 6, 7, 8, 9]);
1359        check(&mixed1, &block2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1360        check(&mixed1, &mixed2, &[2, 3, 6, 7, 8, 9]);
1361        check(&allow2, &block2, &[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
1362        check(&allow2, &mixed2, &[2, 3, 4, 5, 6, 7, 8]);
1363        check(&block2, &mixed2, &[0, 1, 2, 3, 6, 7, 8, 9]);
1364    }
1365
1366    #[test]
1367    fn test_deserialize_legacy_format() {
1368        // Test that we can deserialize the old format where both allow_list
1369        // and block_list could be present in the serialized form.
1370        //
1371        // The old format (before this PR) used a struct with both allow_list and block_list
1372        // fields. The new format uses an enum. The deserialization code should handle
1373        // the case where both lists are present by converting to AllowList(allow - block).
1374
1375        // Create the RowIdTreeMaps and serialize them directly
1376        let allow = rows(&[1, 2, 3, 4, 5, 10, 15]);
1377        let block = rows(&[2, 4, 15]);
1378
1379        // Serialize using the stable RowIdTreeMap serialization format
1380        let block_bytes = {
1381            let mut buf = Vec::with_capacity(block.serialized_size());
1382            block.serialize_into(&mut buf).unwrap();
1383            buf
1384        };
1385        let allow_bytes = {
1386            let mut buf = Vec::with_capacity(allow.serialized_size());
1387            allow.serialize_into(&mut buf).unwrap();
1388            buf
1389        };
1390
1391        // Construct a binary array with both values present (simulating old format)
1392        let old_format_array =
1393            BinaryArray::from_opt_vec(vec![Some(&block_bytes), Some(&allow_bytes)]);
1394
1395        // Deserialize - should handle this by creating AllowList(allow - block)
1396        let deserialized = RowAddrMask::from_arrow(&old_format_array).unwrap();
1397
1398        // The expected result: AllowList([1, 2, 3, 4, 5, 10, 15] - [2, 4, 15]) = [1, 3, 5, 10]
1399        assert_mask_selects(&deserialized, &[1, 3, 5, 10], &[2, 4, 15]);
1400        assert!(
1401            deserialized.allow_list().is_some(),
1402            "Should deserialize to AllowList variant"
1403        );
1404    }
1405
1406    #[test]
1407    fn test_roundtrip_arrow() {
1408        let row_addrs = rows(&[1, 2, 3, 100, 2000]);
1409
1410        // Allow list
1411        let original = RowAddrMask::from_allowed(row_addrs.clone());
1412        let array = original.into_arrow().unwrap();
1413        assert_eq!(RowAddrMask::from_arrow(&array).unwrap(), original);
1414
1415        // Block list
1416        let original = RowAddrMask::from_block(row_addrs);
1417        let array = original.into_arrow().unwrap();
1418        assert_eq!(RowAddrMask::from_arrow(&array).unwrap(), original);
1419    }
1420
1421    #[test]
1422    fn test_deserialize_legacy_empty_lists() {
1423        // Case 1: Both None (should become all_rows)
1424        let array = BinaryArray::from_opt_vec(vec![None, None]);
1425        let mask = RowAddrMask::from_arrow(&array).unwrap();
1426        assert_mask_selects(&mask, &[0, 100, u64::MAX], &[]);
1427
1428        // Case 2: Only block list (no allow list)
1429        let block = rows(&[5, 10]);
1430        let block_bytes = {
1431            let mut buf = Vec::with_capacity(block.serialized_size());
1432            block.serialize_into(&mut buf).unwrap();
1433            buf
1434        };
1435        let array = BinaryArray::from_opt_vec(vec![Some(&block_bytes[..]), None]);
1436        let mask = RowAddrMask::from_arrow(&array).unwrap();
1437        assert_mask_selects(&mask, &[0, 15], &[5, 10]);
1438
1439        // Case 3: Only allow list (no block list)
1440        let allow = rows(&[5, 10]);
1441        let allow_bytes = {
1442            let mut buf = Vec::with_capacity(allow.serialized_size());
1443            allow.serialize_into(&mut buf).unwrap();
1444            buf
1445        };
1446        let array = BinaryArray::from_opt_vec(vec![None, Some(&allow_bytes[..])]);
1447        let mask = RowAddrMask::from_arrow(&array).unwrap();
1448        assert_mask_selects(&mask, &[5, 10], &[0, 15]);
1449    }
1450
1451    #[test]
1452    fn test_map_insert() {
1453        let mut map = RowAddrTreeMap::default();
1454
1455        assert!(!map.contains(20));
1456        assert!(map.insert(20));
1457        assert!(map.contains(20));
1458        assert!(!map.insert(20)); // Inserting again should be no-op
1459
1460        let bitmap = map.get_fragment_bitmap(0);
1461        assert!(bitmap.is_some());
1462        let bitmap = bitmap.unwrap();
1463        assert_eq!(bitmap.len(), 1);
1464
1465        assert!(map.get_fragment_bitmap(1).is_none());
1466
1467        map.insert_fragment(0);
1468        assert!(map.contains(0));
1469        assert!(!map.insert(0)); // Inserting into full fragment should be no-op
1470        assert!(map.get_fragment_bitmap(0).is_none());
1471    }
1472
1473    #[test]
1474    fn test_map_insert_range() {
1475        let ranges = &[
1476            (0..10),
1477            (40..500),
1478            ((u32::MAX as u64 - 10)..(u32::MAX as u64 + 20)),
1479        ];
1480
1481        for range in ranges {
1482            let mut mask = RowAddrTreeMap::default();
1483
1484            let count = mask.insert_range(range.clone());
1485            let expected = range.end - range.start;
1486            assert_eq!(count, expected);
1487
1488            let count = mask.insert_range(range.clone());
1489            assert_eq!(count, 0);
1490
1491            let new_range = range.start + 5..range.end + 5;
1492            let count = mask.insert_range(new_range.clone());
1493            assert_eq!(count, 5);
1494        }
1495
1496        let mut mask = RowAddrTreeMap::default();
1497        let count = mask.insert_range(..10);
1498        assert_eq!(count, 10);
1499        assert!(mask.contains(0));
1500
1501        let count = mask.insert_range(20..=24);
1502        assert_eq!(count, 5);
1503
1504        mask.insert_fragment(0);
1505        let count = mask.insert_range(100..200);
1506        assert_eq!(count, 0);
1507    }
1508
1509    #[test]
1510    fn test_map_remove() {
1511        let mut mask = RowAddrTreeMap::default();
1512
1513        assert!(!mask.remove(20));
1514
1515        mask.insert(20);
1516        assert!(mask.contains(20));
1517        assert!(mask.remove(20));
1518        assert!(!mask.contains(20));
1519
1520        mask.insert_range(10..=20);
1521        assert!(mask.contains(15));
1522        assert!(mask.remove(15));
1523        assert!(!mask.contains(15));
1524
1525        // We don't test removing from a full fragment, because that would take
1526        // a lot of memory.
1527    }
1528
1529    #[test]
1530    fn test_map_mask() {
1531        let mask = rows(&[0, 1, 2]);
1532        let mask2 = rows(&[0, 2, 3]);
1533
1534        let allow_list = RowAddrMask::AllowList(mask2.clone());
1535        let mut actual = mask.clone();
1536        actual.mask(&allow_list);
1537        assert_eq!(actual, rows(&[0, 2]));
1538
1539        let block_list = RowAddrMask::BlockList(mask2);
1540        let mut actual = mask;
1541        actual.mask(&block_list);
1542        assert_eq!(actual, rows(&[1]));
1543    }
1544
1545    #[test]
1546    #[should_panic(expected = "Size of full fragment is unknown")]
1547    fn test_map_insert_full_fragment_row() {
1548        let mut mask = RowAddrTreeMap::default();
1549        mask.insert_fragment(0);
1550
1551        unsafe {
1552            let _ = mask.into_addr_iter().collect::<Vec<u64>>();
1553        }
1554    }
1555
1556    #[test]
1557    fn test_map_into_addr_iter() {
1558        let mut mask = RowAddrTreeMap::default();
1559        mask.insert(0);
1560        mask.insert(1);
1561        mask.insert(1 << 32 | 5);
1562        mask.insert(2 << 32 | 10);
1563
1564        let expected = vec![0u64, 1, 1 << 32 | 5, 2 << 32 | 10];
1565        let actual: Vec<u64> = unsafe { mask.into_addr_iter().collect() };
1566        assert_eq!(actual, expected);
1567    }
1568
1569    #[test]
1570    fn test_map_from() {
1571        let map = RowAddrTreeMap::from(10..12);
1572        assert!(map.contains(10));
1573        assert!(map.contains(11));
1574        assert!(!map.contains(12));
1575        assert!(!map.contains(3));
1576
1577        let map = RowAddrTreeMap::from(10..=12);
1578        assert!(map.contains(10));
1579        assert!(map.contains(11));
1580        assert!(map.contains(12));
1581        assert!(!map.contains(3));
1582    }
1583
1584    #[test]
1585    fn test_map_from_roaring() {
1586        let bitmap = RoaringTreemap::from_iter(&[0, 1, 1 << 32]);
1587        let map = RowAddrTreeMap::from(bitmap);
1588        assert!(map.contains(0) && map.contains(1) && map.contains(1 << 32));
1589        assert!(!map.contains(2));
1590    }
1591
1592    #[test]
1593    fn test_map_extend() {
1594        let mut map = RowAddrTreeMap::default();
1595        map.insert(0);
1596        map.insert_fragment(1);
1597
1598        let other_rows = [0, 2, 1 << 32 | 10, 3 << 32 | 5];
1599        map.extend(other_rows.iter().copied());
1600
1601        assert!(map.contains(0));
1602        assert!(map.contains(2));
1603        assert!(map.contains(1 << 32 | 5));
1604        assert!(map.contains(1 << 32 | 10));
1605        assert!(map.contains(3 << 32 | 5));
1606        assert!(!map.contains(3));
1607    }
1608
1609    #[test]
1610    fn test_map_extend_other_maps() {
1611        let mut map = RowAddrTreeMap::default();
1612        map.insert(0);
1613        map.insert_fragment(1);
1614        map.insert(4 << 32);
1615
1616        let mut other_map = rows(&[0, 2, 1 << 32 | 10, 3 << 32 | 5]);
1617        other_map.insert_fragment(4);
1618        map.extend(std::iter::once(other_map));
1619
1620        for id in [
1621            0,
1622            2,
1623            1 << 32 | 5,
1624            1 << 32 | 10,
1625            3 << 32 | 5,
1626            4 << 32,
1627            4 << 32 | 7,
1628        ] {
1629            assert!(map.contains(id), "Expected {} to be contained", id);
1630        }
1631        assert!(!map.contains(3));
1632    }
1633
1634    proptest::proptest! {
1635        #[test]
1636        fn test_map_serialization_roundtrip(
1637            values in proptest::collection::vec(
1638                (0..u32::MAX, proptest::option::of(proptest::collection::vec(0..u32::MAX, 0..1000))),
1639                0..10
1640            )
1641        ) {
1642            let mut mask = RowAddrTreeMap::default();
1643            for (fragment, rows) in values {
1644                if let Some(rows) = rows {
1645                    let bitmap = RoaringBitmap::from_iter(rows);
1646                    mask.insert_bitmap(fragment, bitmap);
1647                } else {
1648                    mask.insert_fragment(fragment);
1649                }
1650            }
1651
1652            let mut data = Vec::new();
1653            mask.serialize_into(&mut data).unwrap();
1654            let deserialized = RowAddrTreeMap::deserialize_from(data.as_slice()).unwrap();
1655            prop_assert_eq!(mask, deserialized);
1656        }
1657
1658        #[test]
1659        fn test_map_intersect(
1660            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1661            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1662            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1663            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1664        ) {
1665            let mut left = RowAddrTreeMap::default();
1666            for fragment in left_full_fragments.clone() {
1667                left.insert_fragment(fragment);
1668            }
1669            left.extend(left_rows.iter().copied());
1670
1671            let mut right = RowAddrTreeMap::default();
1672            for fragment in right_full_fragments.clone() {
1673                right.insert_fragment(fragment);
1674            }
1675            right.extend(right_rows.iter().copied());
1676
1677            let mut expected = RowAddrTreeMap::default();
1678            for fragment in &left_full_fragments {
1679                if right_full_fragments.contains(fragment) {
1680                    expected.insert_fragment(*fragment);
1681                }
1682            }
1683
1684            let left_in_right = left_rows.iter().filter(|row| {
1685                right_rows.contains(row)
1686                    || right_full_fragments.contains(&((*row >> 32) as u32))
1687            });
1688            expected.extend(left_in_right);
1689            let right_in_left = right_rows.iter().filter(|row| {
1690                left_rows.contains(row)
1691                    || left_full_fragments.contains(&((*row >> 32) as u32))
1692            });
1693            expected.extend(right_in_left);
1694
1695            let actual = left & right;
1696            prop_assert_eq!(expected, actual);
1697        }
1698
1699        #[test]
1700        fn test_map_union(
1701            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1702            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1703            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1704            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1705        ) {
1706            let mut left = RowAddrTreeMap::default();
1707            for fragment in left_full_fragments.clone() {
1708                left.insert_fragment(fragment);
1709            }
1710            left.extend(left_rows.iter().copied());
1711
1712            let mut right = RowAddrTreeMap::default();
1713            for fragment in right_full_fragments.clone() {
1714                right.insert_fragment(fragment);
1715            }
1716            right.extend(right_rows.iter().copied());
1717
1718            let mut expected = RowAddrTreeMap::default();
1719            for fragment in left_full_fragments {
1720                expected.insert_fragment(fragment);
1721            }
1722            for fragment in right_full_fragments {
1723                expected.insert_fragment(fragment);
1724            }
1725
1726            let combined_rows = left_rows.iter().chain(right_rows.iter());
1727            expected.extend(combined_rows);
1728
1729            let actual = left | right;
1730            for actual_key_val in &actual.inner {
1731                proptest::prop_assert!(expected.inner.contains_key(actual_key_val.0));
1732                let expected_val = expected.inner.get(actual_key_val.0).unwrap();
1733                prop_assert_eq!(
1734                    actual_key_val.1,
1735                    expected_val,
1736                    "error on key {}",
1737                    actual_key_val.0
1738                );
1739            }
1740            prop_assert_eq!(expected, actual);
1741        }
1742
1743        #[test]
1744        fn test_map_subassign_rows(
1745            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1746            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1747            right_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1748        ) {
1749            let mut left = RowAddrTreeMap::default();
1750            for fragment in left_full_fragments {
1751                left.insert_fragment(fragment);
1752            }
1753            left.extend(left_rows.iter().copied());
1754
1755            let mut right = RowAddrTreeMap::default();
1756            right.extend(right_rows.iter().copied());
1757
1758            let mut expected = left.clone();
1759            for row in right_rows {
1760                expected.remove(row);
1761            }
1762
1763            left -= &right;
1764            prop_assert_eq!(expected, left);
1765        }
1766
1767        #[test]
1768        fn test_map_subassign_frags(
1769            left_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1770            right_full_fragments in proptest::collection::vec(0..u32::MAX, 0..10),
1771            left_rows in proptest::collection::vec(0..u64::MAX, 0..1000),
1772        ) {
1773            let mut left = RowAddrTreeMap::default();
1774            for fragment in left_full_fragments {
1775                left.insert_fragment(fragment);
1776            }
1777            left.extend(left_rows.iter().copied());
1778
1779            let mut right = RowAddrTreeMap::default();
1780            for fragment in right_full_fragments.clone() {
1781                right.insert_fragment(fragment);
1782            }
1783
1784            let mut expected = left.clone();
1785            for fragment in right_full_fragments {
1786                expected.inner.remove(&fragment);
1787            }
1788
1789            left -= &right;
1790            prop_assert_eq!(expected, left);
1791        }
1792
1793        #[test]
1794        fn test_from_sorted_iter(
1795            mut rows in proptest::collection::vec(0..u64::MAX, 0..1000)
1796        ) {
1797            rows.sort();
1798            let num_rows = rows.len();
1799            let mask = RowAddrTreeMap::from_sorted_iter(rows).unwrap();
1800            prop_assert_eq!(mask.len(), Some(num_rows as u64));
1801        }
1802
1803
1804    }
1805
1806    #[test]
1807    fn test_row_addr_selection_deep_size_of() {
1808        use deepsize::DeepSizeOf;
1809
1810        // Test Full variant - should have minimal size (just the enum discriminant)
1811        let full = RowAddrSelection::Full;
1812        let full_size = full.deep_size_of();
1813        // Full variant has no heap allocations beyond the enum itself
1814        assert!(full_size < 100); // Small sanity check
1815
1816        // Test Partial variant - should include bitmap size
1817        let mut bitmap = RoaringBitmap::new();
1818        bitmap.insert_range(0..100);
1819        let partial = RowAddrSelection::Partial(bitmap.clone());
1820        let partial_size = partial.deep_size_of();
1821        // Partial variant should be larger due to bitmap
1822        assert!(partial_size >= bitmap.serialized_size());
1823    }
1824
1825    #[test]
1826    fn test_row_addr_selection_union_all_with_full() {
1827        let full = RowAddrSelection::Full;
1828        let partial = RowAddrSelection::Partial(RoaringBitmap::from_iter(&[1, 2, 3]));
1829
1830        assert!(matches!(
1831            RowAddrSelection::union_all(&[&full, &partial]),
1832            RowAddrSelection::Full
1833        ));
1834
1835        let partial2 = RowAddrSelection::Partial(RoaringBitmap::from_iter(&[4, 5, 6]));
1836        let RowAddrSelection::Partial(bitmap) = RowAddrSelection::union_all(&[&partial, &partial2])
1837        else {
1838            panic!("Expected Partial");
1839        };
1840        assert!(bitmap.contains(1) && bitmap.contains(4));
1841    }
1842
1843    #[test]
1844    fn test_insert_range_unbounded_start() {
1845        let mut map = RowAddrTreeMap::default();
1846
1847        // Test exclusive start bound
1848        let count = map.insert_range((std::ops::Bound::Excluded(5), std::ops::Bound::Included(10)));
1849        assert_eq!(count, 5); // 6, 7, 8, 9, 10
1850        assert!(!map.contains(5));
1851        assert!(map.contains(6));
1852        assert!(map.contains(10));
1853
1854        // Test unbounded end
1855        let mut map2 = RowAddrTreeMap::default();
1856        let count = map2.insert_range(0..5);
1857        assert_eq!(count, 5);
1858        assert!(map2.contains(0));
1859        assert!(map2.contains(4));
1860        assert!(!map2.contains(5));
1861    }
1862
1863    #[test]
1864    fn test_remove_from_full_fragment() {
1865        let mut map = RowAddrTreeMap::default();
1866        map.insert_fragment(0);
1867
1868        // Verify it's a full fragment - get_fragment_bitmap returns None for Full
1869        for id in [0, 100, u32::MAX as u64] {
1870            assert!(map.contains(id));
1871        }
1872        assert!(map.get_fragment_bitmap(0).is_none());
1873
1874        // Remove a value from the full fragment
1875        assert!(map.remove(50));
1876
1877        // Now it should be partial (a full RoaringBitmap minus one value)
1878        assert!(map.contains(0) && !map.contains(50) && map.contains(100));
1879        assert!(map.get_fragment_bitmap(0).is_some());
1880    }
1881
1882    #[test]
1883    fn test_retain_fragments() {
1884        let mut map = RowAddrTreeMap::default();
1885        map.insert(0); // fragment 0
1886        map.insert(1 << 32 | 5); // fragment 1
1887        map.insert(2 << 32 | 10); // fragment 2
1888        map.insert_fragment(3); // fragment 3
1889
1890        map.retain_fragments([0, 2]);
1891
1892        assert!(map.contains(0) && map.contains(2 << 32 | 10));
1893        assert!(!map.contains(1 << 32 | 5) && !map.contains(3 << 32));
1894    }
1895
1896    #[test]
1897    fn test_bitor_assign_full_fragment() {
1898        // Test BitOrAssign when LHS has Full and RHS has Partial
1899        let mut map1 = RowAddrTreeMap::default();
1900        map1.insert_fragment(0);
1901        let mut map2 = RowAddrTreeMap::default();
1902        map2.insert(5);
1903
1904        map1 |= &map2;
1905        // Full | Partial = Full
1906        assert!(map1.contains(0) && map1.contains(5) && map1.contains(100));
1907
1908        // Test BitOrAssign when LHS has Partial and RHS has Full
1909        let mut map3 = RowAddrTreeMap::default();
1910        map3.insert(5);
1911        let mut map4 = RowAddrTreeMap::default();
1912        map4.insert_fragment(0);
1913
1914        map3 |= &map4;
1915        // Partial | Full = Full
1916        assert!(map3.contains(0) && map3.contains(5) && map3.contains(100));
1917    }
1918
1919    #[test]
1920    fn test_bitand_assign_full_fragments() {
1921        // Test BitAndAssign when both have Full for same fragment
1922        let mut map1 = RowAddrTreeMap::default();
1923        map1.insert_fragment(0);
1924        let mut map2 = RowAddrTreeMap::default();
1925        map2.insert_fragment(0);
1926
1927        map1 &= &map2;
1928        // Full & Full = Full
1929        assert!(map1.contains(0) && map1.contains(100));
1930
1931        // Test BitAndAssign when LHS Full, RHS Partial
1932        let mut map3 = RowAddrTreeMap::default();
1933        map3.insert_fragment(0);
1934        let mut map4 = RowAddrTreeMap::default();
1935        map4.insert(5);
1936        map4.insert(10);
1937
1938        map3 &= &map4;
1939        // Full & Partial([5,10]) = Partial([5,10])
1940        assert!(map3.contains(5) && map3.contains(10));
1941        assert!(!map3.contains(0) && !map3.contains(100));
1942
1943        // Test that empty intersection results in removal
1944        let mut map5 = RowAddrTreeMap::default();
1945        map5.insert(5);
1946        let mut map6 = RowAddrTreeMap::default();
1947        map6.insert(10);
1948
1949        map5 &= &map6;
1950        assert!(map5.is_empty());
1951    }
1952
1953    #[test]
1954    fn test_sub_assign_with_full_fragments() {
1955        // Test SubAssign when LHS is Full and RHS is Partial
1956        let mut map1 = RowAddrTreeMap::default();
1957        map1.insert_fragment(0);
1958        let mut map2 = RowAddrTreeMap::default();
1959        map2.insert(5);
1960        map2.insert(10);
1961
1962        map1 -= &map2;
1963        // Full - Partial([5,10]) = Full minus those values
1964        assert!(map1.contains(0) && map1.contains(100));
1965        assert!(!map1.contains(5) && !map1.contains(10));
1966
1967        // Test SubAssign when both are Full for same fragment
1968        let mut map3 = RowAddrTreeMap::default();
1969        map3.insert_fragment(0);
1970        let mut map4 = RowAddrTreeMap::default();
1971        map4.insert_fragment(0);
1972
1973        map3 -= &map4;
1974        // Full - Full = empty
1975        assert!(map3.is_empty());
1976
1977        // Test SubAssign when LHS is Partial and RHS is Full
1978        let mut map5 = RowAddrTreeMap::default();
1979        map5.insert(5);
1980        map5.insert(10);
1981        let mut map6 = RowAddrTreeMap::default();
1982        map6.insert_fragment(0);
1983
1984        map5 -= &map6;
1985        // Partial - Full = empty
1986        assert!(map5.is_empty());
1987    }
1988
1989    #[test]
1990    fn test_from_iterator_with_full_fragment() {
1991        // Test that inserting into a full fragment is a no-op
1992        let mut map = RowAddrTreeMap::default();
1993        map.insert_fragment(0);
1994
1995        // Extend with values that would go into fragment 0
1996        map.extend([5u64, 10, 100].iter());
1997
1998        // Should still be full fragment
1999        for id in [0, 5, 10, 100, u32::MAX as u64] {
2000            assert!(map.contains(id));
2001        }
2002    }
2003
2004    #[test]
2005    fn test_insert_range_excluded_end() {
2006        // Test excluded end bound (line 391-393)
2007        let mut map = RowAddrTreeMap::default();
2008        // Using RangeFrom with small range won't hit the unbounded case
2009        // Instead test Bound::Excluded for end
2010        let count = map.insert_range((std::ops::Bound::Included(5), std::ops::Bound::Excluded(10)));
2011        assert_eq!(count, 5); // 5, 6, 7, 8, 9
2012        assert!(map.contains(5));
2013        assert!(map.contains(9));
2014        assert!(!map.contains(10));
2015    }
2016
2017    #[test]
2018    fn test_bitand_assign_owned() {
2019        // Test BitAndAssign<Self> (owned, not reference)
2020        let mut map1 = RowAddrTreeMap::default();
2021        map1.insert(5);
2022        map1.insert(10);
2023
2024        // Using owned rhs (not reference)
2025        map1 &= rows(&[5, 15]);
2026
2027        assert!(map1.contains(5));
2028        assert!(!map1.contains(10) && !map1.contains(15));
2029    }
2030
2031    #[test]
2032    fn test_from_iter_with_full_fragment() {
2033        // When we collect into RowAddrTreeMap, it should handle duplicates
2034        let map: RowAddrTreeMap = vec![5u64, 10, 100].into_iter().collect();
2035        assert!(map.contains(5) && map.contains(10));
2036
2037        // Test that extending a map with full fragment ignores new values
2038        let mut map = RowAddrTreeMap::default();
2039        map.insert_fragment(0);
2040        for val in [5, 10, 100] {
2041            map.insert(val); // This should be no-op since fragment is full
2042        }
2043        // Still full fragment
2044        for id in [0, 5, u32::MAX as u64] {
2045            assert!(map.contains(id));
2046        }
2047    }
2048
2049    // ============================================================================
2050    // Tests for bitmap_to_ranges / ranges_to_bitmap
2051    // ============================================================================
2052
2053    #[test]
2054    fn test_bitmap_to_ranges_empty() {
2055        let bm = RoaringBitmap::new();
2056        assert!(bitmap_to_ranges(&bm).is_empty());
2057    }
2058
2059    #[test]
2060    fn test_bitmap_to_ranges_single() {
2061        let bm = RoaringBitmap::from_iter([5]);
2062        assert_eq!(bitmap_to_ranges(&bm), vec![5..6]);
2063    }
2064
2065    #[test]
2066    fn test_bitmap_to_ranges_contiguous() {
2067        let mut bm = RoaringBitmap::new();
2068        bm.insert_range(10..20);
2069        assert_eq!(bitmap_to_ranges(&bm), vec![10..20]);
2070    }
2071
2072    #[test]
2073    fn test_bitmap_to_ranges_multiple() {
2074        let mut bm = RoaringBitmap::new();
2075        bm.insert_range(0..3);
2076        bm.insert_range(10..15);
2077        bm.insert(100);
2078        assert_eq!(bitmap_to_ranges(&bm), vec![0..3, 10..15, 100..101]);
2079    }
2080
2081    #[test]
2082    fn test_ranges_to_bitmap_empty() {
2083        let bm = ranges_to_bitmap(&[], true);
2084        assert!(bm.is_empty());
2085    }
2086
2087    #[test]
2088    fn test_ranges_to_bitmap_sorted_short_ranges() {
2089        // avg len = 1, uses from_sorted_iter path
2090        let ranges = vec![0..1, 5..6, 10..11];
2091        let bm = ranges_to_bitmap(&ranges, true);
2092        assert!(bm.contains(0) && bm.contains(5) && bm.contains(10));
2093        assert_eq!(bm.len(), 3);
2094    }
2095
2096    #[test]
2097    fn test_ranges_to_bitmap_sorted_long_ranges() {
2098        // avg len = 100, uses insert_range path
2099        let ranges = vec![0..100, 200..300];
2100        let bm = ranges_to_bitmap(&ranges, true);
2101        assert_eq!(bm.len(), 200);
2102        assert!(bm.contains(0) && bm.contains(99));
2103        assert!(!bm.contains(100));
2104        assert!(bm.contains(200) && bm.contains(299));
2105    }
2106
2107    #[test]
2108    fn test_ranges_to_bitmap_unsorted() {
2109        let ranges = vec![200..300, 0..100];
2110        let bm = ranges_to_bitmap(&ranges, false);
2111        assert_eq!(bm.len(), 200);
2112        assert!(bm.contains(0) && bm.contains(250));
2113    }
2114
2115    #[test]
2116    fn test_bitmap_ranges_roundtrip() {
2117        let mut original = RoaringBitmap::new();
2118        original.insert_range(0..50);
2119        original.insert_range(100..200);
2120        original.insert(500);
2121        original.insert_range(1000..1010);
2122
2123        let ranges = bitmap_to_ranges(&original);
2124        let reconstructed = ranges_to_bitmap(&ranges, true);
2125        assert_eq!(original, reconstructed);
2126    }
2127
2128    // ============================================================================
2129    // Tests for RowIdSet
2130    // ============================================================================
2131
2132    fn row_ids(ids: &[u64]) -> RowIdSet {
2133        let mut set = RowIdSet::new();
2134        for &id in ids {
2135            set.inner.insert(id);
2136        }
2137        set
2138    }
2139
2140    #[test]
2141    fn test_row_id_set_construction() {
2142        let set = RowIdSet::new();
2143        assert!(set.is_empty());
2144        assert_eq!(set.len(), Some(0));
2145
2146        let set = row_ids(&[10, 20, 30]);
2147        assert!(!set.is_empty());
2148        assert_eq!(set.len(), Some(3));
2149        assert!(set.contains(10));
2150        assert!(set.contains(20));
2151        assert!(set.contains(30));
2152        assert!(!set.contains(15));
2153    }
2154
2155    #[test]
2156    fn test_row_id_set_remove() {
2157        let mut set = row_ids(&[10, 20, 30]);
2158
2159        assert!(!set.remove(15)); // Not present
2160        assert_eq!(set.len(), Some(3));
2161
2162        assert!(set.remove(20)); // Present
2163        assert_eq!(set.len(), Some(2));
2164        assert!(!set.contains(20));
2165        assert!(set.contains(10));
2166        assert!(set.contains(30));
2167
2168        assert!(!set.remove(20)); // Already removed
2169    }
2170
2171    #[test]
2172    fn test_row_id_set_union() {
2173        let set1 = row_ids(&[10, 20, 30]);
2174        let set2 = row_ids(&[20, 30, 40]);
2175
2176        let result = set1.union(&set2);
2177        assert_eq!(result.len(), Some(4));
2178        for id in [10, 20, 30, 40] {
2179            assert!(result.contains(id));
2180        }
2181    }
2182
2183    #[test]
2184    fn test_row_id_set_difference() {
2185        let set1 = row_ids(&[10, 20, 30, 40]);
2186        let set2 = row_ids(&[20, 40]);
2187
2188        let result = set1.difference(&set2);
2189        assert_eq!(result.len(), Some(2));
2190        assert!(result.contains(10));
2191        assert!(result.contains(30));
2192        assert!(!result.contains(20));
2193        assert!(!result.contains(40));
2194    }
2195
2196    #[test]
2197    fn test_row_id_set_union_all() {
2198        let set1 = row_ids(&[10, 20]);
2199        let set2 = row_ids(&[20, 30]);
2200        let set3 = row_ids(&[30, 40]);
2201
2202        let result = RowIdSet::union_all(&[&set1, &set2, &set3]);
2203        assert_eq!(result.len(), Some(4));
2204        for id in [10, 20, 30, 40] {
2205            assert!(result.contains(id));
2206        }
2207
2208        // Empty slice should return empty set
2209        let result = RowIdSet::union_all(&[]);
2210        assert!(result.is_empty());
2211    }
2212
2213    #[test]
2214    fn test_row_id_set_iter() {
2215        let set = row_ids(&[10, 20, 30]);
2216        let collected: Vec<u64> = set.iter().collect();
2217        assert_eq!(collected, vec![10, 20, 30]);
2218
2219        let empty = RowIdSet::new();
2220        assert_eq!(empty.iter().count(), 0);
2221    }
2222
2223    #[test]
2224    fn test_row_id_set_from_sorted_iter() {
2225        // Valid sorted input
2226        let set = RowIdSet::from_sorted_iter([10, 20, 30, 40]).unwrap();
2227        assert_eq!(set.len(), Some(4));
2228        for id in [10, 20, 30, 40] {
2229            assert!(set.contains(id));
2230        }
2231
2232        // Empty iterator
2233        let set = RowIdSet::from_sorted_iter(std::iter::empty()).unwrap();
2234        assert!(set.is_empty());
2235
2236        // Single element
2237        let set = RowIdSet::from_sorted_iter([42]).unwrap();
2238        assert_eq!(set.len(), Some(1));
2239        assert!(set.contains(42));
2240    }
2241
2242    #[test]
2243    fn test_row_id_set_from_sorted_iter_unsorted() {
2244        // Non-sorted input should return error
2245        let result = RowIdSet::from_sorted_iter([30, 10, 20]);
2246        assert!(result.is_err());
2247        assert!(result.unwrap_err().to_string().contains("non-sorted"));
2248    }
2249
2250    #[test]
2251    fn test_row_id_set_large_values() {
2252        // Test with large u64 values
2253        let large_ids = [u64::MAX - 10, u64::MAX - 5, u64::MAX - 1];
2254        let set = row_ids(&large_ids);
2255
2256        for &id in &large_ids {
2257            assert!(set.contains(id));
2258        }
2259        assert!(!set.contains(u64::MAX));
2260        assert_eq!(set.len(), Some(3));
2261    }
2262
2263    // ============================================================================
2264    // Tests for RowIdMask
2265    // ============================================================================
2266
2267    fn assert_row_id_mask_selects(mask: &RowIdMask, selected: &[u64], not_selected: &[u64]) {
2268        for &id in selected {
2269            assert!(mask.selected(id), "Expected row id {} to be selected", id);
2270        }
2271        for &id in not_selected {
2272            assert!(
2273                !mask.selected(id),
2274                "Expected row id {} to NOT be selected",
2275                id
2276            );
2277        }
2278    }
2279
2280    #[test]
2281    fn test_row_id_mask_construction() {
2282        let full_mask = RowIdMask::all_rows();
2283        assert_eq!(full_mask.max_len(), None);
2284        assert_row_id_mask_selects(&full_mask, &[0, 1, 100, u64::MAX - 1], &[]);
2285
2286        let empty_mask = RowIdMask::allow_nothing();
2287        assert_eq!(empty_mask.max_len(), Some(0));
2288        assert_row_id_mask_selects(&empty_mask, &[], &[0, 1, 100]);
2289
2290        let allow_list = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2291        assert_eq!(allow_list.max_len(), Some(3));
2292        assert_row_id_mask_selects(&allow_list, &[10, 20, 30], &[0, 15, 25, 40]);
2293
2294        let block_list = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2295        assert_eq!(block_list.max_len(), None);
2296        assert_row_id_mask_selects(&block_list, &[0, 15, 25, 40], &[10, 20, 30]);
2297    }
2298
2299    #[test]
2300    fn test_row_id_mask_selected_indices() {
2301        // Allow list
2302        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 40]));
2303        assert!(mask.selected_indices(std::iter::empty()).is_empty());
2304        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[1, 3]);
2305
2306        // Block list
2307        let mask = RowIdMask::from_block(row_ids(&[10, 20, 40]));
2308        assert!(mask.selected_indices(std::iter::empty()).is_empty());
2309        assert_eq!(mask.selected_indices([25, 20, 14, 10].iter()), &[0, 2]);
2310    }
2311
2312    #[test]
2313    fn test_row_id_mask_also_allow() {
2314        // Allow list
2315        let mask = RowIdMask::from_allowed(row_ids(&[10, 20]));
2316        let new_mask = mask.also_allow(row_ids(&[20, 30, 40]));
2317        assert_eq!(
2318            new_mask,
2319            RowIdMask::from_allowed(row_ids(&[10, 20, 30, 40]))
2320        );
2321
2322        // Block list
2323        let mask = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2324        let new_mask = mask.also_allow(row_ids(&[20, 40]));
2325        assert_eq!(new_mask, RowIdMask::from_block(row_ids(&[10, 30])));
2326    }
2327
2328    #[test]
2329    fn test_row_id_mask_also_block() {
2330        // Allow list
2331        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2332        let new_mask = mask.also_block(row_ids(&[20, 40]));
2333        assert_eq!(new_mask, RowIdMask::from_allowed(row_ids(&[10, 30])));
2334
2335        // Block list
2336        let mask = RowIdMask::from_block(row_ids(&[10, 20]));
2337        let new_mask = mask.also_block(row_ids(&[20, 30, 40]));
2338        assert_eq!(new_mask, RowIdMask::from_block(row_ids(&[10, 20, 30, 40])));
2339    }
2340
2341    #[test]
2342    fn test_row_id_mask_iter_ids() {
2343        // Allow list
2344        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30]));
2345        let ids: Vec<u64> = mask.iter_ids().unwrap().collect();
2346        assert_eq!(ids, vec![10, 20, 30]);
2347
2348        // Empty allow list
2349        let mask = RowIdMask::allow_nothing();
2350        let iter = mask.iter_ids();
2351        assert!(iter.is_some());
2352        assert_eq!(iter.unwrap().count(), 0);
2353
2354        // Block list
2355        let mask = RowIdMask::from_block(row_ids(&[10, 20, 30]));
2356        assert!(mask.iter_ids().is_none());
2357    }
2358
2359    #[test]
2360    fn test_row_id_mask_default() {
2361        let mask = RowIdMask::default();
2362        // Default should be BlockList with empty set (all rows allowed)
2363        assert_row_id_mask_selects(&mask, &[0, 1, 100, 1000], &[]);
2364        assert_eq!(mask.max_len(), None);
2365    }
2366
2367    #[test]
2368    fn test_row_id_mask_ops() {
2369        let mask = RowIdMask::default();
2370        assert_row_id_mask_selects(&mask, &[1, 5, 100], &[]);
2371
2372        let block_list = mask.also_block(row_ids(&[0, 5, 15]));
2373        assert_row_id_mask_selects(&block_list, &[1, 100], &[5]);
2374
2375        let allow_list = RowIdMask::from_allowed(row_ids(&[0, 2, 5]));
2376        assert_row_id_mask_selects(&allow_list, &[5], &[1, 100]);
2377    }
2378
2379    #[test]
2380    fn test_row_id_mask_combined_ops() {
2381        // Test combining allow and block operations
2382        let mask = RowIdMask::from_allowed(row_ids(&[10, 20, 30, 40, 50]));
2383        let mask = mask.also_block(row_ids(&[20, 40]));
2384        assert_row_id_mask_selects(&mask, &[10, 30, 50], &[20, 40]);
2385
2386        let mask = mask.also_allow(row_ids(&[20, 60]));
2387        assert_row_id_mask_selects(&mask, &[10, 20, 30, 50, 60], &[40]);
2388    }
2389
2390    #[test]
2391    fn test_row_id_mask_with_large_values() {
2392        let large_ids = [u64::MAX - 10, u64::MAX - 5, u64::MAX - 1];
2393
2394        // Allow list with large values
2395        let mask = RowIdMask::from_allowed(row_ids(&large_ids));
2396        for &id in &large_ids {
2397            assert!(mask.selected(id));
2398        }
2399        assert!(!mask.selected(u64::MAX));
2400        assert!(!mask.selected(0));
2401
2402        // Block list with large values
2403        let mask = RowIdMask::from_block(row_ids(&large_ids));
2404        for &id in &large_ids {
2405            assert!(!mask.selected(id));
2406        }
2407        assert!(mask.selected(u64::MAX));
2408        assert!(mask.selected(0));
2409    }
2410
2411    proptest::proptest! {
2412        #[test]
2413        fn test_row_id_set_from_sorted_iter_proptest(
2414            mut row_ids in proptest::collection::vec(0..u64::MAX, 0..1000)
2415        ) {
2416            row_ids.sort();
2417            row_ids.dedup();
2418            let num_rows = row_ids.len();
2419            let set = RowIdSet::from_sorted_iter(row_ids.clone()).unwrap();
2420            prop_assert_eq!(set.len(), Some(num_rows as u64));
2421            for id in row_ids {
2422                prop_assert!(set.contains(id));
2423            }
2424        }
2425
2426        #[test]
2427        fn test_row_id_set_union_proptest(
2428            ids1 in proptest::collection::vec(0..u64::MAX, 0..500),
2429            ids2 in proptest::collection::vec(0..u64::MAX, 0..500),
2430        ) {
2431            let set1 = row_ids(&ids1);
2432            let set2 = row_ids(&ids2);
2433
2434            let result = set1.union(&set2);
2435
2436            // All ids from both sets should be in result
2437            for id in ids1.iter().chain(ids2.iter()) {
2438                prop_assert!(result.contains(*id));
2439            }
2440
2441            // Result size should be union size
2442            let expected_size = ids1.iter().chain(ids2.iter()).collect::<std::collections::HashSet<_>>().len();
2443            prop_assert_eq!(result.len(), Some(expected_size as u64));
2444        }
2445
2446        #[test]
2447        fn test_row_id_set_difference_proptest(
2448            ids1 in proptest::collection::vec(0..u64::MAX, 0..500),
2449            ids2 in proptest::collection::vec(0..u64::MAX, 0..500),
2450        ) {
2451            let set1 = row_ids(&ids1);
2452            let set2 = row_ids(&ids2);
2453
2454            let result = set1.difference(&set2);
2455
2456            // Items in ids1 but not in ids2 should be in result
2457            for id in &ids1 {
2458                if !ids2.contains(id) {
2459                    prop_assert!(result.contains(*id));
2460                } else {
2461                    prop_assert!(!result.contains(*id));
2462                }
2463            }
2464        }
2465
2466        #[test]
2467        fn test_row_id_mask_allow_block_proptest(
2468            allow_ids in proptest::collection::vec(0..10000u64, 0..100),
2469            block_ids in proptest::collection::vec(0..10000u64, 0..100),
2470            test_ids in proptest::collection::vec(0..10000u64, 0..50),
2471        ) {
2472            let mask = RowIdMask::from_allowed(row_ids(&allow_ids))
2473                .also_block(row_ids(&block_ids));
2474
2475            for id in test_ids {
2476                let expected = allow_ids.contains(&id) && !block_ids.contains(&id);
2477                prop_assert_eq!(mask.selected(id), expected);
2478            }
2479        }
2480    }
2481}