lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8use snafu::location;
9
10/// Different ways to represent a sequence of distinct u64s.
11///
12/// This is designed to be especially efficient for sequences that are sorted,
13/// but not meaningfully larger than a Vec<u64> in the worst case.
14///
15/// The representation is chosen based on the properties of the sequence:
16///                                                           
17///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
18///    │                ▼                                 
19///    │                No                                
20///    │                ▼                                 
21///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
22///    │                ▼                                 
23///    │                No─────────────► SortedArray      
24///    ▼                                                    
25///    No──────────────────────────────► Array            
26///
27/// "Dense" is decided based on the estimated byte size of the representation.
28///
29/// Size of RangeWithBitMap for N values:
30///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
31/// Size of SortedArray for N values (assuming u16 packed):
32///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
33///
34#[derive(Debug, PartialEq, Eq, Clone)]
35pub enum U64Segment {
36    /// A contiguous sorted range of row ids.
37    ///
38    /// Total size: 16 bytes
39    Range(Range<u64>),
40    /// A sorted range of row ids, that is mostly contiguous.
41    ///
42    /// Total size: 24 bytes + n_holes * 4 bytes
43    /// Use when: 32 * n_holes < max - min
44    RangeWithHoles {
45        range: Range<u64>,
46        /// Bitmap of offsets from the start of the range that are holes.
47        /// This is sorted, so binary search can be used. It's typically
48        /// relatively small.
49        holes: EncodedU64Array,
50    },
51    /// A sorted range of row ids, that is mostly contiguous.
52    ///
53    /// Bitmap is 1 when the value is present, 0 when it's missing.
54    ///
55    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
56    /// Use when: max - min > 16 * len
57    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
58    /// A sorted array of row ids, that is sparse.
59    ///
60    /// Total size: 24 bytes + 2 * n_values bytes
61    SortedArray(EncodedU64Array),
62    /// An array of row ids, that is not sorted.
63    Array(EncodedU64Array),
64}
65
66impl DeepSizeOf for U64Segment {
67    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
68        match self {
69            Self::Range(_) => 0,
70            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
71            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
72            Self::SortedArray(array) => array.deep_size_of_children(context),
73            Self::Array(array) => array.deep_size_of_children(context),
74        }
75    }
76}
77
78/// Statistics about a segment of u64s.
79#[derive(Debug)]
80struct SegmentStats {
81    /// Min value in the segment.
82    min: u64,
83    /// Max value in the segment
84    max: u64,
85    /// Total number of values in the segment
86    count: u64,
87    /// Whether the segment is sorted
88    sorted: bool,
89}
90
91impl SegmentStats {
92    fn n_holes(&self) -> u64 {
93        debug_assert!(self.sorted);
94        if self.count == 0 {
95            0
96        } else {
97            let total_slots = self.max - self.min + 1;
98            total_slots - self.count
99        }
100    }
101}
102
103impl U64Segment {
104    /// Return the values that are missing from the slice.
105    fn holes_in_slice<'a>(
106        range: RangeInclusive<u64>,
107        existing: impl IntoIterator<Item = u64> + 'a,
108    ) -> impl Iterator<Item = u64> + 'a {
109        let mut existing = existing.into_iter().peekable();
110        range.filter(move |val| {
111            if let Some(&existing_val) = existing.peek() {
112                if existing_val == *val {
113                    existing.next();
114                    return false;
115                }
116            }
117            true
118        })
119    }
120
121    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
122        let mut sorted = true;
123        let mut min = u64::MAX;
124        let mut max = 0;
125        let mut count = 0;
126
127        for val in values {
128            count += 1;
129            if val < min {
130                min = val;
131            }
132            if val > max {
133                max = val;
134            }
135            if sorted && count > 1 && val < max {
136                sorted = false;
137            }
138        }
139
140        if count == 0 {
141            min = 0;
142            max = 0;
143        }
144
145        SegmentStats {
146            min,
147            max,
148            count,
149            sorted,
150        }
151    }
152
153    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
154        let n_holes = stats.n_holes();
155        let total_slots = stats.max - stats.min + 1;
156
157        let range_with_holes = 24 + 4 * n_holes as usize;
158        let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize;
159        let sorted_array = 24 + 2 * stats.count as usize;
160
161        [range_with_holes, range_with_bitmap, sorted_array]
162    }
163
164    fn from_stats_and_sequence(
165        stats: SegmentStats,
166        sequence: impl IntoIterator<Item = u64>,
167    ) -> Self {
168        if stats.sorted {
169            let n_holes = stats.n_holes();
170            if stats.count == 0 {
171                Self::Range(0..0)
172            } else if n_holes == 0 {
173                Self::Range(stats.min..(stats.max + 1))
174            } else {
175                let sizes = Self::sorted_sequence_sizes(&stats);
176                let min_size = sizes.iter().min().unwrap();
177                if min_size == &sizes[0] {
178                    let range = stats.min..(stats.max + 1);
179                    let mut holes =
180                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
181                    holes.sort_unstable();
182                    let holes = EncodedU64Array::from(holes);
183
184                    Self::RangeWithHoles { range, holes }
185                } else if min_size == &sizes[1] {
186                    let range = stats.min..(stats.max + 1);
187                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
188
189                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
190                        let offset = (hole - stats.min) as usize;
191                        bitmap.clear(offset);
192                    }
193
194                    Self::RangeWithBitmap { range, bitmap }
195                } else {
196                    // Must use array, but at least it's sorted
197                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
198                }
199            }
200        } else {
201            // Must use array
202            Self::Array(EncodedU64Array::from_iter(sequence))
203        }
204    }
205
206    pub fn from_slice(slice: &[u64]) -> Self {
207        let stats = Self::compute_stats(slice.iter().copied());
208        Self::from_stats_and_sequence(stats, slice.iter().copied())
209    }
210}
211
212impl U64Segment {
213    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
214        match self {
215            Self::Range(range) => Box::new(range.clone()),
216            Self::RangeWithHoles { range, holes } => {
217                Box::new((range.start..range.end).filter(move |&val| {
218                    // TODO: we could write a more optimal version of this
219                    // iterator, but would need special handling to make it
220                    // double ended.
221                    holes.binary_search(val).is_err()
222                }))
223            }
224            Self::RangeWithBitmap { range, bitmap } => {
225                Box::new((range.start..range.end).filter(|val| {
226                    let offset = (val - range.start) as usize;
227                    bitmap.get(offset)
228                }))
229            }
230            Self::SortedArray(array) => Box::new(array.iter()),
231            Self::Array(array) => Box::new(array.iter()),
232        }
233    }
234
235    pub fn len(&self) -> usize {
236        match self {
237            Self::Range(range) => (range.end - range.start) as usize,
238            Self::RangeWithHoles { range, holes } => {
239                let holes = holes.iter().count();
240                (range.end - range.start) as usize - holes
241            }
242            Self::RangeWithBitmap { range, bitmap } => {
243                let holes = bitmap.count_zeros();
244                (range.end - range.start) as usize - holes
245            }
246            Self::SortedArray(array) => array.len(),
247            Self::Array(array) => array.len(),
248        }
249    }
250
251    pub fn is_empty(&self) -> bool {
252        self.len() == 0
253    }
254
255    /// Get the min and max value of the segment, excluding tombstones.
256    pub fn range(&self) -> Option<RangeInclusive<u64>> {
257        match self {
258            Self::Range(range) if range.is_empty() => None,
259            Self::Range(range)
260            | Self::RangeWithBitmap { range, .. }
261            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
262            Self::SortedArray(array) => {
263                // We can assume that the array is sorted.
264                let min_value = array.first().unwrap();
265                let max_value = array.last().unwrap();
266                Some(min_value..=max_value)
267            }
268            Self::Array(array) => {
269                let min_value = array.min().unwrap();
270                let max_value = array.max().unwrap();
271                Some(min_value..=max_value)
272            }
273        }
274    }
275
276    pub fn slice(&self, offset: usize, len: usize) -> Self {
277        if len == 0 {
278            return Self::Range(0..0);
279        }
280
281        let values: Vec<u64> = self.iter().skip(offset).take(len).collect();
282
283        // `from_slice` will compute stats and select the best representation.
284        Self::from_slice(&values)
285    }
286
287    pub fn position(&self, val: u64) -> Option<usize> {
288        match self {
289            Self::Range(range) => {
290                if range.contains(&val) {
291                    Some((val - range.start) as usize)
292                } else {
293                    None
294                }
295            }
296            Self::RangeWithHoles { range, holes } => {
297                if range.contains(&val) && holes.binary_search(val).is_err() {
298                    let offset = (val - range.start) as usize;
299                    let holes = holes.iter().take_while(|&hole| hole < val).count();
300                    Some(offset - holes)
301                } else {
302                    None
303                }
304            }
305            Self::RangeWithBitmap { range, bitmap } => {
306                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
307                    let offset = (val - range.start) as usize;
308                    let num_zeros = bitmap.slice(0, offset).count_zeros();
309                    Some(offset - num_zeros)
310                } else {
311                    None
312                }
313            }
314            Self::SortedArray(array) => array.binary_search(val).ok(),
315            Self::Array(array) => array.iter().position(|v| v == val),
316        }
317    }
318
319    pub fn get(&self, i: usize) -> Option<u64> {
320        match self {
321            Self::Range(range) => match range.start.checked_add(i as u64) {
322                Some(val) if val < range.end => Some(val),
323                _ => None,
324            },
325            Self::RangeWithHoles { range, .. } => {
326                if i >= (range.end - range.start) as usize {
327                    return None;
328                }
329                self.iter().nth(i)
330            }
331            Self::RangeWithBitmap { range, .. } => {
332                if i >= (range.end - range.start) as usize {
333                    return None;
334                }
335                self.iter().nth(i)
336            }
337            Self::SortedArray(array) => array.get(i),
338            Self::Array(array) => array.get(i),
339        }
340    }
341
342    /// Check if a value is contained in the segment
343    pub fn contains(&self, val: u64) -> bool {
344        match self {
345            Self::Range(range) => range.contains(&val),
346            Self::RangeWithHoles { range, holes } => {
347                if !range.contains(&val) {
348                    return false;
349                }
350                // Check if the value is not in the holes
351                !holes.iter().any(|hole| hole == val)
352            }
353            Self::RangeWithBitmap { range, bitmap } => {
354                if !range.contains(&val) {
355                    return false;
356                }
357                // Check if the bitmap has the value set (not cleared)
358                let idx = (val - range.start) as usize;
359                bitmap.get(idx)
360            }
361            Self::SortedArray(array) => array.binary_search(val).is_ok(),
362            Self::Array(array) => array.iter().any(|v| v == val),
363        }
364    }
365
366    /// Produce a new segment that has [`val`] as the new highest value in the segment
367    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
368        // Check that the new value is higher than the current maximum
369        if let Some(range) = self.range() {
370            if val <= *range.end() {
371                return Err(lance_core::Error::invalid_input(
372                    format!(
373                        "New value {} must be higher than current maximum {}",
374                        val,
375                        range.end()
376                    ),
377                    location!(),
378                ));
379            }
380        }
381
382        Ok(match self {
383            Self::Range(range) => {
384                // Special case for empty range: create a range containing only the new value
385                if range.start == range.end {
386                    Self::Range(Range {
387                        start: val,
388                        end: val + 1,
389                    })
390                } else if val == range.end {
391                    Self::Range(Range {
392                        start: range.start,
393                        end: val + 1,
394                    })
395                } else {
396                    Self::RangeWithHoles {
397                        range: Range {
398                            start: range.start,
399                            end: val + 1,
400                        },
401                        holes: EncodedU64Array::U64((range.end..val).collect()),
402                    }
403                }
404            }
405            Self::RangeWithHoles { range, holes } => {
406                if val == range.end {
407                    Self::RangeWithHoles {
408                        range: Range {
409                            start: range.start,
410                            end: val + 1,
411                        },
412                        holes,
413                    }
414                } else {
415                    let mut new_holes: Vec<u64> = holes.iter().collect();
416                    new_holes.extend(range.end..val);
417                    Self::RangeWithHoles {
418                        range: Range {
419                            start: range.start,
420                            end: val + 1,
421                        },
422                        holes: EncodedU64Array::U64(new_holes),
423                    }
424                }
425            }
426            Self::RangeWithBitmap { range, bitmap } => {
427                let new_range = Range {
428                    start: range.start,
429                    end: val + 1,
430                };
431                let gap_size = (val - range.end) as usize;
432                let new_bitmap = bitmap
433                    .iter()
434                    .chain(std::iter::repeat_n(false, gap_size))
435                    .chain(std::iter::once(true))
436                    .collect::<Vec<bool>>();
437
438                Self::RangeWithBitmap {
439                    range: new_range,
440                    bitmap: Bitmap::from(new_bitmap.as_slice()),
441                }
442            }
443            Self::SortedArray(array) => match array {
444                EncodedU64Array::U64(mut vec) => {
445                    vec.push(val);
446                    Self::SortedArray(EncodedU64Array::U64(vec))
447                }
448                EncodedU64Array::U16 { base, offsets } => {
449                    if let Some(offset) = val.checked_sub(base) {
450                        if offset <= u16::MAX as u64 {
451                            let mut offsets = offsets;
452                            offsets.push(offset as u16);
453                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
454                        } else if offset <= u32::MAX as u64 {
455                            let mut u32_offsets: Vec<u32> =
456                                offsets.into_iter().map(|o| o as u32).collect();
457                            u32_offsets.push(offset as u32);
458                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
459                                base,
460                                offsets: u32_offsets,
461                            }));
462                        }
463                    }
464                    let mut new_array: Vec<u64> =
465                        offsets.into_iter().map(|o| base + o as u64).collect();
466                    new_array.push(val);
467                    Self::SortedArray(EncodedU64Array::from(new_array))
468                }
469                EncodedU64Array::U32 { base, mut offsets } => {
470                    if let Some(offset) = val.checked_sub(base) {
471                        if offset <= u32::MAX as u64 {
472                            offsets.push(offset as u32);
473                            return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
474                        }
475                    }
476                    let mut new_array: Vec<u64> =
477                        offsets.into_iter().map(|o| base + o as u64).collect();
478                    new_array.push(val);
479                    Self::SortedArray(EncodedU64Array::from(new_array))
480                }
481            },
482            Self::Array(array) => match array {
483                EncodedU64Array::U64(mut vec) => {
484                    vec.push(val);
485                    Self::Array(EncodedU64Array::U64(vec))
486                }
487                EncodedU64Array::U16 { base, offsets } => {
488                    if let Some(offset) = val.checked_sub(base) {
489                        if offset <= u16::MAX as u64 {
490                            let mut offsets = offsets;
491                            offsets.push(offset as u16);
492                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
493                        } else if offset <= u32::MAX as u64 {
494                            let mut u32_offsets: Vec<u32> =
495                                offsets.into_iter().map(|o| o as u32).collect();
496                            u32_offsets.push(offset as u32);
497                            return Ok(Self::Array(EncodedU64Array::U32 {
498                                base,
499                                offsets: u32_offsets,
500                            }));
501                        }
502                    }
503                    let mut new_array: Vec<u64> =
504                        offsets.into_iter().map(|o| base + o as u64).collect();
505                    new_array.push(val);
506                    Self::Array(EncodedU64Array::from(new_array))
507                }
508                EncodedU64Array::U32 { base, mut offsets } => {
509                    if let Some(offset) = val.checked_sub(base) {
510                        if offset <= u32::MAX as u64 {
511                            offsets.push(offset as u32);
512                            return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
513                        }
514                    }
515                    let mut new_array: Vec<u64> =
516                        offsets.into_iter().map(|o| base + o as u64).collect();
517                    new_array.push(val);
518                    Self::Array(EncodedU64Array::from(new_array))
519                }
520            },
521        })
522    }
523
524    /// Delete a set of row ids from the segment.
525    /// The row ids are assumed to be in the segment. (within the range, not
526    /// already deleted.)
527    /// They are also assumed to be ordered by appearance in the segment.
528    pub fn delete(&self, vals: &[u64]) -> Self {
529        // TODO: can we enforce these assumptions? or make them safer?
530        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
531
532        let make_new_iter = || {
533            let mut vals_iter = vals.iter().copied().peekable();
534            self.iter().filter(move |val| {
535                if let Some(&next_val) = vals_iter.peek() {
536                    if next_val == *val {
537                        vals_iter.next();
538                        return false;
539                    }
540                }
541                true
542            })
543        };
544        let stats = Self::compute_stats(make_new_iter());
545        Self::from_stats_and_sequence(stats, make_new_iter())
546    }
547
548    pub fn mask(&mut self, positions: &[u32]) {
549        if positions.is_empty() {
550            return;
551        }
552        if positions.len() == self.len() {
553            *self = Self::Range(0..0);
554            return;
555        }
556        let count = (self.len() - positions.len()) as u64;
557        let sorted = match self {
558            Self::Range(_) => true,
559            Self::RangeWithHoles { .. } => true,
560            Self::RangeWithBitmap { .. } => true,
561            Self::SortedArray(_) => true,
562            Self::Array(_) => false,
563        };
564        // To get minimum, need to find the first value that is not masked.
565        let first_unmasked = (0..self.len())
566            .zip(positions.iter().cycle())
567            .find(|(sequential_i, i)| **i != *sequential_i as u32)
568            .map(|(sequential_i, _)| sequential_i)
569            .unwrap();
570        let min = self.get(first_unmasked).unwrap();
571
572        let last_unmasked = (0..self.len())
573            .rev()
574            .zip(positions.iter().rev().cycle())
575            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
576            .map(|(sequential_i, _)| sequential_i)
577            .next()
578            .unwrap();
579        let max = self.get(last_unmasked).unwrap();
580
581        let stats = SegmentStats {
582            min,
583            max,
584            count,
585            sorted,
586        };
587
588        let mut positions = positions.iter().copied().peekable();
589        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
590            if let Some(next_pos) = positions.peek() {
591                if *next_pos == i as u32 {
592                    positions.next();
593                    return None;
594                }
595            }
596            Some(val)
597        });
598        *self = Self::from_stats_and_sequence(stats, sequence)
599    }
600}
601
602#[cfg(test)]
603mod test {
604    use super::*;
605
606    #[test]
607    fn test_segments() {
608        fn check_segment(values: &[u64], expected: &U64Segment) {
609            let segment = U64Segment::from_slice(values);
610            assert_eq!(segment, *expected);
611            assert_eq!(values.len(), segment.len());
612
613            let roundtripped = segment.iter().collect::<Vec<_>>();
614            assert_eq!(roundtripped, values);
615
616            let expected_min = values.iter().copied().min();
617            let expected_max = values.iter().copied().max();
618            match segment.range() {
619                Some(range) => {
620                    assert_eq!(range.start(), &expected_min.unwrap());
621                    assert_eq!(range.end(), &expected_max.unwrap());
622                }
623                None => {
624                    assert_eq!(expected_min, None);
625                    assert_eq!(expected_max, None);
626                }
627            }
628
629            for (i, value) in values.iter().enumerate() {
630                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
631                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
632            }
633
634            check_segment_iter(&segment);
635        }
636
637        fn check_segment_iter(segment: &U64Segment) {
638            // Should be able to iterate forwards and backwards, and get the same thing.
639            let forwards = segment.iter().collect::<Vec<_>>();
640            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
641            backwards.reverse();
642            assert_eq!(forwards, backwards);
643
644            // Should be able to pull from both sides in lockstep.
645            let mut expected = Vec::with_capacity(segment.len());
646            let mut actual = Vec::with_capacity(segment.len());
647            let mut iter = segment.iter();
648            // Alternating forwards and backwards
649            for i in 0..segment.len() {
650                if i % 2 == 0 {
651                    actual.push(iter.next().unwrap());
652                    expected.push(segment.get(i / 2).unwrap());
653                } else {
654                    let i = segment.len() - 1 - i / 2;
655                    actual.push(iter.next_back().unwrap());
656                    expected.push(segment.get(i).unwrap());
657                };
658            }
659            assert_eq!(expected, actual);
660        }
661
662        // Empty
663        check_segment(&[], &U64Segment::Range(0..0));
664
665        // Single value
666        check_segment(&[42], &U64Segment::Range(42..43));
667
668        // Contiguous range
669        check_segment(
670            &(100..200).collect::<Vec<_>>(),
671            &U64Segment::Range(100..200),
672        );
673
674        // Range with a hole
675        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
676        check_segment(
677            &values,
678            &U64Segment::RangeWithHoles {
679                range: 0..1000,
680                holes: vec![100].into(),
681            },
682        );
683
684        // Range with every other value missing
685        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
686        check_segment(
687            &values,
688            &U64Segment::RangeWithBitmap {
689                range: 0..999,
690                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
691            },
692        );
693
694        // Sparse but sorted sequence
695        check_segment(
696            &[1, 7000, 24000],
697            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
698        );
699
700        // Sparse unsorted sequence
701        check_segment(
702            &[7000, 1, 24000],
703            &U64Segment::Array(vec![7000, 1, 24000].into()),
704        );
705    }
706
707    #[test]
708    fn test_with_new_high() {
709        // Test Range: contiguous sequence
710        let segment = U64Segment::Range(10..20);
711
712        // Test adding value that extends the range
713        let result = segment.clone().with_new_high(20).unwrap();
714        assert_eq!(result, U64Segment::Range(10..21));
715
716        // Test adding value that creates holes
717        let result = segment.with_new_high(25).unwrap();
718        assert_eq!(
719            result,
720            U64Segment::RangeWithHoles {
721                range: 10..26,
722                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
723            }
724        );
725
726        // Test RangeWithHoles: sequence with existing holes
727        let segment = U64Segment::RangeWithHoles {
728            range: 10..20,
729            holes: EncodedU64Array::U64(vec![15, 17]),
730        };
731
732        // Test adding value that extends the range without new holes
733        let result = segment.clone().with_new_high(20).unwrap();
734        assert_eq!(
735            result,
736            U64Segment::RangeWithHoles {
737                range: 10..21,
738                holes: EncodedU64Array::U64(vec![15, 17]),
739            }
740        );
741
742        // Test adding value that creates additional holes
743        let result = segment.with_new_high(25).unwrap();
744        assert_eq!(
745            result,
746            U64Segment::RangeWithHoles {
747                range: 10..26,
748                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
749            }
750        );
751
752        // Test RangeWithBitmap: sequence with bitmap representation
753        let mut bitmap = Bitmap::new_full(10);
754        bitmap.clear(3); // Clear position 3 (value 13)
755        bitmap.clear(7); // Clear position 7 (value 17)
756        let segment = U64Segment::RangeWithBitmap {
757            range: 10..20,
758            bitmap,
759        };
760
761        // Test adding value that extends the range without new holes
762        let result = segment.clone().with_new_high(20).unwrap();
763        let expected_bitmap = {
764            let mut b = Bitmap::new_full(11);
765            b.clear(3); // Clear position 3 (value 13)
766            b.clear(7); // Clear position 7 (value 17)
767            b
768        };
769        assert_eq!(
770            result,
771            U64Segment::RangeWithBitmap {
772                range: 10..21,
773                bitmap: expected_bitmap,
774            }
775        );
776
777        // Test adding value that creates additional holes
778        let result = segment.with_new_high(25).unwrap();
779        let expected_bitmap = {
780            let mut b = Bitmap::new_full(16);
781            b.clear(3); // Clear position 3 (value 13)
782            b.clear(7); // Clear position 7 (value 17)
783                        // Clear positions 10-14 (values 20-24)
784            for i in 10..15 {
785                b.clear(i);
786            }
787            b
788        };
789        assert_eq!(
790            result,
791            U64Segment::RangeWithBitmap {
792                range: 10..26,
793                bitmap: expected_bitmap,
794            }
795        );
796
797        // Test SortedArray: sparse sorted sequence
798        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
799
800        let result = segment.with_new_high(15).unwrap();
801        assert_eq!(
802            result,
803            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
804        );
805
806        // Test Array: unsorted sequence
807        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
808
809        let result = segment.with_new_high(15).unwrap();
810        assert_eq!(
811            result,
812            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
813        );
814
815        // Test edge cases
816        // Empty segment
817        let segment = U64Segment::Range(0..0);
818        let result = segment.with_new_high(5).unwrap();
819        assert_eq!(result, U64Segment::Range(5..6));
820
821        // Single value segment
822        let segment = U64Segment::Range(42..43);
823        let result = segment.with_new_high(50).unwrap();
824        assert_eq!(
825            result,
826            U64Segment::RangeWithHoles {
827                range: 42..51,
828                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
829            }
830        );
831    }
832
833    #[test]
834    fn test_with_new_high_assertion() {
835        let segment = U64Segment::Range(10..20);
836        // This should return an error because 15 is not higher than the current maximum 19
837        let result = segment.with_new_high(15);
838        assert!(result.is_err());
839        let error = result.unwrap_err();
840        assert!(error
841            .to_string()
842            .contains("New value 15 must be higher than current maximum 19"));
843    }
844
845    #[test]
846    fn test_with_new_high_assertion_equal() {
847        let segment = U64Segment::Range(1..6);
848        // This should return an error because 5 is not higher than the current maximum 5
849        let result = segment.with_new_high(5);
850        assert!(result.is_err());
851        let error = result.unwrap_err();
852        assert!(error
853            .to_string()
854            .contains("New value 5 must be higher than current maximum 5"));
855    }
856
857    #[test]
858    fn test_contains() {
859        // Test Range: contiguous sequence
860        let segment = U64Segment::Range(10..20);
861        assert!(segment.contains(10), "Should contain 10");
862        assert!(segment.contains(15), "Should contain 15");
863        assert!(segment.contains(19), "Should contain 19");
864        assert!(!segment.contains(9), "Should not contain 9");
865        assert!(!segment.contains(20), "Should not contain 20");
866        assert!(!segment.contains(25), "Should not contain 25");
867
868        // Test RangeWithHoles: sequence with holes
869        let segment = U64Segment::RangeWithHoles {
870            range: 10..20,
871            holes: EncodedU64Array::U64(vec![15, 17]),
872        };
873        assert!(segment.contains(10), "Should contain 10");
874        assert!(segment.contains(14), "Should contain 14");
875        assert!(!segment.contains(15), "Should not contain 15 (hole)");
876        assert!(segment.contains(16), "Should contain 16");
877        assert!(!segment.contains(17), "Should not contain 17 (hole)");
878        assert!(segment.contains(18), "Should contain 18");
879        assert!(
880            !segment.contains(20),
881            "Should not contain 20 (out of range)"
882        );
883
884        // Test RangeWithBitmap: sequence with bitmap
885        let mut bitmap = Bitmap::new_full(10);
886        bitmap.clear(3); // Clear position 3 (value 13)
887        bitmap.clear(7); // Clear position 7 (value 17)
888        let segment = U64Segment::RangeWithBitmap {
889            range: 10..20,
890            bitmap,
891        };
892        assert!(segment.contains(10), "Should contain 10");
893        assert!(segment.contains(12), "Should contain 12");
894        assert!(
895            !segment.contains(13),
896            "Should not contain 13 (cleared in bitmap)"
897        );
898        assert!(segment.contains(16), "Should contain 16");
899        assert!(
900            !segment.contains(17),
901            "Should not contain 17 (cleared in bitmap)"
902        );
903        assert!(segment.contains(19), "Should contain 19");
904        assert!(
905            !segment.contains(20),
906            "Should not contain 20 (out of range)"
907        );
908
909        // Test SortedArray: sparse sorted sequence
910        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
911        assert!(segment.contains(1), "Should contain 1");
912        assert!(segment.contains(5), "Should contain 5");
913        assert!(segment.contains(10), "Should contain 10");
914        assert!(!segment.contains(0), "Should not contain 0");
915        assert!(!segment.contains(3), "Should not contain 3");
916        assert!(!segment.contains(15), "Should not contain 15");
917
918        // Test Array: unsorted sequence
919        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
920        assert!(segment.contains(1), "Should contain 1");
921        assert!(segment.contains(5), "Should contain 5");
922        assert!(segment.contains(10), "Should contain 10");
923        assert!(!segment.contains(0), "Should not contain 0");
924        assert!(!segment.contains(3), "Should not contain 3");
925        assert!(!segment.contains(15), "Should not contain 15");
926
927        // Test empty segment
928        let segment = U64Segment::Range(0..0);
929        assert!(
930            !segment.contains(0),
931            "Empty segment should not contain anything"
932        );
933        assert!(
934            !segment.contains(5),
935            "Empty segment should not contain anything"
936        );
937    }
938}