lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8use snafu::location;
9
10/// Different ways to represent a sequence of distinct u64s.
11///
12/// This is designed to be especially efficient for sequences that are sorted,
13/// but not meaningfully larger than a Vec<u64> in the worst case.
14///
15/// The representation is chosen based on the properties of the sequence:
16///                                                           
17///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
18///    │                ▼                                 
19///    │                No                                
20///    │                ▼                                 
21///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
22///    │                ▼                                 
23///    │                No─────────────► SortedArray      
24///    ▼                                                    
25///    No──────────────────────────────► Array            
26///
27/// "Dense" is decided based on the estimated byte size of the representation.
28///
29/// Size of RangeWithBitMap for N values:
30///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
31/// Size of SortedArray for N values (assuming u16 packed):
32///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
33///
34#[derive(Debug, PartialEq, Eq, Clone)]
35pub enum U64Segment {
36    /// A contiguous sorted range of row ids.
37    ///
38    /// Total size: 16 bytes
39    Range(Range<u64>),
40    /// A sorted range of row ids, that is mostly contiguous.
41    ///
42    /// Total size: 24 bytes + n_holes * 4 bytes
43    /// Use when: 32 * n_holes < max - min
44    RangeWithHoles {
45        range: Range<u64>,
46        /// Bitmap of offsets from the start of the range that are holes.
47        /// This is sorted, so binary search can be used. It's typically
48        /// relatively small.
49        holes: EncodedU64Array,
50    },
51    /// A sorted range of row ids, that is mostly contiguous.
52    ///
53    /// Bitmap is 1 when the value is present, 0 when it's missing.
54    ///
55    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
56    /// Use when: max - min > 16 * len
57    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
58    /// A sorted array of row ids, that is sparse.
59    ///
60    /// Total size: 24 bytes + 2 * n_values bytes
61    SortedArray(EncodedU64Array),
62    /// An array of row ids, that is not sorted.
63    Array(EncodedU64Array),
64}
65
66impl DeepSizeOf for U64Segment {
67    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
68        match self {
69            Self::Range(_) => 0,
70            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
71            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
72            Self::SortedArray(array) => array.deep_size_of_children(context),
73            Self::Array(array) => array.deep_size_of_children(context),
74        }
75    }
76}
77
78/// Statistics about a segment of u64s.
79#[derive(Debug)]
80struct SegmentStats {
81    /// Min value in the segment.
82    min: u64,
83    /// Max value in the segment
84    max: u64,
85    /// Total number of values in the segment
86    count: u64,
87    /// Whether the segment is sorted
88    sorted: bool,
89}
90
91impl SegmentStats {
92    fn n_holes(&self) -> u64 {
93        debug_assert!(self.sorted);
94        if self.count == 0 {
95            0
96        } else {
97            let total_slots = self.max - self.min + 1;
98            total_slots - self.count
99        }
100    }
101}
102
103impl U64Segment {
104    /// Return the values that are missing from the slice.
105    fn holes_in_slice<'a>(
106        range: RangeInclusive<u64>,
107        existing: impl IntoIterator<Item = u64> + 'a,
108    ) -> impl Iterator<Item = u64> + 'a {
109        let mut existing = existing.into_iter().peekable();
110        range.filter(move |val| {
111            if let Some(&existing_val) = existing.peek() {
112                if existing_val == *val {
113                    existing.next();
114                    return false;
115                }
116            }
117            true
118        })
119    }
120
121    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
122        let mut sorted = true;
123        let mut min = u64::MAX;
124        let mut max = 0;
125        let mut count = 0;
126
127        for val in values {
128            count += 1;
129            if val < min {
130                min = val;
131            }
132            if val > max {
133                max = val;
134            }
135            if sorted && count > 1 && val < max {
136                sorted = false;
137            }
138        }
139
140        if count == 0 {
141            min = 0;
142            max = 0;
143        }
144
145        SegmentStats {
146            min,
147            max,
148            count,
149            sorted,
150        }
151    }
152
153    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
154        let n_holes = stats.n_holes();
155        let total_slots = stats.max - stats.min + 1;
156
157        let range_with_holes = 24 + 4 * n_holes as usize;
158        let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize;
159        let sorted_array = 24 + 2 * stats.count as usize;
160
161        [range_with_holes, range_with_bitmap, sorted_array]
162    }
163
164    fn from_stats_and_sequence(
165        stats: SegmentStats,
166        sequence: impl IntoIterator<Item = u64>,
167    ) -> Self {
168        if stats.sorted {
169            let n_holes = stats.n_holes();
170            if stats.count == 0 {
171                Self::Range(0..0)
172            } else if n_holes == 0 {
173                Self::Range(stats.min..(stats.max + 1))
174            } else {
175                let sizes = Self::sorted_sequence_sizes(&stats);
176                let min_size = sizes.iter().min().unwrap();
177                if min_size == &sizes[0] {
178                    let range = stats.min..(stats.max + 1);
179                    let mut holes =
180                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
181                    holes.sort_unstable();
182                    let holes = EncodedU64Array::from(holes);
183
184                    Self::RangeWithHoles { range, holes }
185                } else if min_size == &sizes[1] {
186                    let range = stats.min..(stats.max + 1);
187                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
188
189                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
190                        let offset = (hole - stats.min) as usize;
191                        bitmap.clear(offset);
192                    }
193
194                    Self::RangeWithBitmap { range, bitmap }
195                } else {
196                    // Must use array, but at least it's sorted
197                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
198                }
199            }
200        } else {
201            // Must use array
202            Self::Array(EncodedU64Array::from_iter(sequence))
203        }
204    }
205
206    pub fn from_slice(slice: &[u64]) -> Self {
207        Self::from_iter(slice.iter().copied())
208    }
209}
210
211impl FromIterator<u64> for U64Segment {
212    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
213        let values: Vec<u64> = iter.into_iter().collect();
214        let stats = Self::compute_stats(values.iter().copied());
215        Self::from_stats_and_sequence(stats, values)
216    }
217}
218
219impl U64Segment {
220    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
221        match self {
222            Self::Range(range) => Box::new(range.clone()),
223            Self::RangeWithHoles { range, holes } => {
224                Box::new((range.start..range.end).filter(move |&val| {
225                    // TODO: we could write a more optimal version of this
226                    // iterator, but would need special handling to make it
227                    // double ended.
228                    holes.binary_search(val).is_err()
229                }))
230            }
231            Self::RangeWithBitmap { range, bitmap } => {
232                Box::new((range.start..range.end).filter(|val| {
233                    let offset = (val - range.start) as usize;
234                    bitmap.get(offset)
235                }))
236            }
237            Self::SortedArray(array) => Box::new(array.iter()),
238            Self::Array(array) => Box::new(array.iter()),
239        }
240    }
241
242    pub fn len(&self) -> usize {
243        match self {
244            Self::Range(range) => (range.end - range.start) as usize,
245            Self::RangeWithHoles { range, holes } => {
246                let holes = holes.iter().count();
247                (range.end - range.start) as usize - holes
248            }
249            Self::RangeWithBitmap { range, bitmap } => {
250                let holes = bitmap.count_zeros();
251                (range.end - range.start) as usize - holes
252            }
253            Self::SortedArray(array) => array.len(),
254            Self::Array(array) => array.len(),
255        }
256    }
257
258    pub fn is_empty(&self) -> bool {
259        self.len() == 0
260    }
261
262    /// Get the min and max value of the segment, excluding tombstones.
263    pub fn range(&self) -> Option<RangeInclusive<u64>> {
264        match self {
265            Self::Range(range) if range.is_empty() => None,
266            Self::Range(range)
267            | Self::RangeWithBitmap { range, .. }
268            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
269            Self::SortedArray(array) => {
270                // We can assume that the array is sorted.
271                let min_value = array.first().unwrap();
272                let max_value = array.last().unwrap();
273                Some(min_value..=max_value)
274            }
275            Self::Array(array) => {
276                let min_value = array.min().unwrap();
277                let max_value = array.max().unwrap();
278                Some(min_value..=max_value)
279            }
280        }
281    }
282
283    pub fn slice(&self, offset: usize, len: usize) -> Self {
284        if len == 0 {
285            return Self::Range(0..0);
286        }
287
288        let values: Vec<u64> = self.iter().skip(offset).take(len).collect();
289
290        // `from_slice` will compute stats and select the best representation.
291        Self::from_slice(&values)
292    }
293
294    pub fn position(&self, val: u64) -> Option<usize> {
295        match self {
296            Self::Range(range) => {
297                if range.contains(&val) {
298                    Some((val - range.start) as usize)
299                } else {
300                    None
301                }
302            }
303            Self::RangeWithHoles { range, holes } => {
304                if range.contains(&val) && holes.binary_search(val).is_err() {
305                    let offset = (val - range.start) as usize;
306                    let holes = holes.iter().take_while(|&hole| hole < val).count();
307                    Some(offset - holes)
308                } else {
309                    None
310                }
311            }
312            Self::RangeWithBitmap { range, bitmap } => {
313                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
314                    let offset = (val - range.start) as usize;
315                    let num_zeros = bitmap.slice(0, offset).count_zeros();
316                    Some(offset - num_zeros)
317                } else {
318                    None
319                }
320            }
321            Self::SortedArray(array) => array.binary_search(val).ok(),
322            Self::Array(array) => array.iter().position(|v| v == val),
323        }
324    }
325
326    pub fn get(&self, i: usize) -> Option<u64> {
327        match self {
328            Self::Range(range) => match range.start.checked_add(i as u64) {
329                Some(val) if val < range.end => Some(val),
330                _ => None,
331            },
332            Self::RangeWithHoles { range, .. } => {
333                if i >= (range.end - range.start) as usize {
334                    return None;
335                }
336                self.iter().nth(i)
337            }
338            Self::RangeWithBitmap { range, .. } => {
339                if i >= (range.end - range.start) as usize {
340                    return None;
341                }
342                self.iter().nth(i)
343            }
344            Self::SortedArray(array) => array.get(i),
345            Self::Array(array) => array.get(i),
346        }
347    }
348
349    /// Check if a value is contained in the segment
350    pub fn contains(&self, val: u64) -> bool {
351        match self {
352            Self::Range(range) => range.contains(&val),
353            Self::RangeWithHoles { range, holes } => {
354                if !range.contains(&val) {
355                    return false;
356                }
357                // Check if the value is not in the holes
358                !holes.iter().any(|hole| hole == val)
359            }
360            Self::RangeWithBitmap { range, bitmap } => {
361                if !range.contains(&val) {
362                    return false;
363                }
364                // Check if the bitmap has the value set (not cleared)
365                let idx = (val - range.start) as usize;
366                bitmap.get(idx)
367            }
368            Self::SortedArray(array) => array.binary_search(val).is_ok(),
369            Self::Array(array) => array.iter().any(|v| v == val),
370        }
371    }
372
373    /// Produce a new segment that has [`val`] as the new highest value in the segment
374    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
375        // Check that the new value is higher than the current maximum
376        if let Some(range) = self.range() {
377            if val <= *range.end() {
378                return Err(lance_core::Error::invalid_input(
379                    format!(
380                        "New value {} must be higher than current maximum {}",
381                        val,
382                        range.end()
383                    ),
384                    location!(),
385                ));
386            }
387        }
388
389        Ok(match self {
390            Self::Range(range) => {
391                // Special case for empty range: create a range containing only the new value
392                if range.start == range.end {
393                    Self::Range(Range {
394                        start: val,
395                        end: val + 1,
396                    })
397                } else if val == range.end {
398                    Self::Range(Range {
399                        start: range.start,
400                        end: val + 1,
401                    })
402                } else {
403                    Self::RangeWithHoles {
404                        range: Range {
405                            start: range.start,
406                            end: val + 1,
407                        },
408                        holes: EncodedU64Array::U64((range.end..val).collect()),
409                    }
410                }
411            }
412            Self::RangeWithHoles { range, holes } => {
413                if val == range.end {
414                    Self::RangeWithHoles {
415                        range: Range {
416                            start: range.start,
417                            end: val + 1,
418                        },
419                        holes,
420                    }
421                } else {
422                    let mut new_holes: Vec<u64> = holes.iter().collect();
423                    new_holes.extend(range.end..val);
424                    Self::RangeWithHoles {
425                        range: Range {
426                            start: range.start,
427                            end: val + 1,
428                        },
429                        holes: EncodedU64Array::U64(new_holes),
430                    }
431                }
432            }
433            Self::RangeWithBitmap { range, bitmap } => {
434                let new_range = Range {
435                    start: range.start,
436                    end: val + 1,
437                };
438                let gap_size = (val - range.end) as usize;
439                let new_bitmap = bitmap
440                    .iter()
441                    .chain(std::iter::repeat_n(false, gap_size))
442                    .chain(std::iter::once(true))
443                    .collect::<Vec<bool>>();
444
445                Self::RangeWithBitmap {
446                    range: new_range,
447                    bitmap: Bitmap::from(new_bitmap.as_slice()),
448                }
449            }
450            Self::SortedArray(array) => match array {
451                EncodedU64Array::U64(mut vec) => {
452                    vec.push(val);
453                    Self::SortedArray(EncodedU64Array::U64(vec))
454                }
455                EncodedU64Array::U16 { base, offsets } => {
456                    if let Some(offset) = val.checked_sub(base) {
457                        if offset <= u16::MAX as u64 {
458                            let mut offsets = offsets;
459                            offsets.push(offset as u16);
460                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
461                        } else if offset <= u32::MAX as u64 {
462                            let mut u32_offsets: Vec<u32> =
463                                offsets.into_iter().map(|o| o as u32).collect();
464                            u32_offsets.push(offset as u32);
465                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
466                                base,
467                                offsets: u32_offsets,
468                            }));
469                        }
470                    }
471                    let mut new_array: Vec<u64> =
472                        offsets.into_iter().map(|o| base + o as u64).collect();
473                    new_array.push(val);
474                    Self::SortedArray(EncodedU64Array::from(new_array))
475                }
476                EncodedU64Array::U32 { base, mut offsets } => {
477                    if let Some(offset) = val.checked_sub(base) {
478                        if offset <= u32::MAX as u64 {
479                            offsets.push(offset as u32);
480                            return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
481                        }
482                    }
483                    let mut new_array: Vec<u64> =
484                        offsets.into_iter().map(|o| base + o as u64).collect();
485                    new_array.push(val);
486                    Self::SortedArray(EncodedU64Array::from(new_array))
487                }
488            },
489            Self::Array(array) => match array {
490                EncodedU64Array::U64(mut vec) => {
491                    vec.push(val);
492                    Self::Array(EncodedU64Array::U64(vec))
493                }
494                EncodedU64Array::U16 { base, offsets } => {
495                    if let Some(offset) = val.checked_sub(base) {
496                        if offset <= u16::MAX as u64 {
497                            let mut offsets = offsets;
498                            offsets.push(offset as u16);
499                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
500                        } else if offset <= u32::MAX as u64 {
501                            let mut u32_offsets: Vec<u32> =
502                                offsets.into_iter().map(|o| o as u32).collect();
503                            u32_offsets.push(offset as u32);
504                            return Ok(Self::Array(EncodedU64Array::U32 {
505                                base,
506                                offsets: u32_offsets,
507                            }));
508                        }
509                    }
510                    let mut new_array: Vec<u64> =
511                        offsets.into_iter().map(|o| base + o as u64).collect();
512                    new_array.push(val);
513                    Self::Array(EncodedU64Array::from(new_array))
514                }
515                EncodedU64Array::U32 { base, mut offsets } => {
516                    if let Some(offset) = val.checked_sub(base) {
517                        if offset <= u32::MAX as u64 {
518                            offsets.push(offset as u32);
519                            return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
520                        }
521                    }
522                    let mut new_array: Vec<u64> =
523                        offsets.into_iter().map(|o| base + o as u64).collect();
524                    new_array.push(val);
525                    Self::Array(EncodedU64Array::from(new_array))
526                }
527            },
528        })
529    }
530
531    /// Delete a set of row ids from the segment.
532    /// The row ids are assumed to be in the segment. (within the range, not
533    /// already deleted.)
534    /// They are also assumed to be ordered by appearance in the segment.
535    pub fn delete(&self, vals: &[u64]) -> Self {
536        // TODO: can we enforce these assumptions? or make them safer?
537        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
538
539        let make_new_iter = || {
540            let mut vals_iter = vals.iter().copied().peekable();
541            self.iter().filter(move |val| {
542                if let Some(&next_val) = vals_iter.peek() {
543                    if next_val == *val {
544                        vals_iter.next();
545                        return false;
546                    }
547                }
548                true
549            })
550        };
551        let stats = Self::compute_stats(make_new_iter());
552        Self::from_stats_and_sequence(stats, make_new_iter())
553    }
554
555    pub fn mask(&mut self, positions: &[u32]) {
556        if positions.is_empty() {
557            return;
558        }
559        if positions.len() == self.len() {
560            *self = Self::Range(0..0);
561            return;
562        }
563        let count = (self.len() - positions.len()) as u64;
564        let sorted = match self {
565            Self::Range(_) => true,
566            Self::RangeWithHoles { .. } => true,
567            Self::RangeWithBitmap { .. } => true,
568            Self::SortedArray(_) => true,
569            Self::Array(_) => false,
570        };
571        // To get minimum, need to find the first value that is not masked.
572        let first_unmasked = (0..self.len())
573            .zip(positions.iter().cycle())
574            .find(|(sequential_i, i)| **i != *sequential_i as u32)
575            .map(|(sequential_i, _)| sequential_i)
576            .unwrap();
577        let min = self.get(first_unmasked).unwrap();
578
579        let last_unmasked = (0..self.len())
580            .rev()
581            .zip(positions.iter().rev().cycle())
582            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
583            .map(|(sequential_i, _)| sequential_i)
584            .next()
585            .unwrap();
586        let max = self.get(last_unmasked).unwrap();
587
588        let stats = SegmentStats {
589            min,
590            max,
591            count,
592            sorted,
593        };
594
595        let mut positions = positions.iter().copied().peekable();
596        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
597            if let Some(next_pos) = positions.peek() {
598                if *next_pos == i as u32 {
599                    positions.next();
600                    return None;
601                }
602            }
603            Some(val)
604        });
605        *self = Self::from_stats_and_sequence(stats, sequence)
606    }
607}
608
609#[cfg(test)]
610mod test {
611    use super::*;
612
613    #[test]
614    fn test_segments() {
615        fn check_segment(values: &[u64], expected: &U64Segment) {
616            let segment = U64Segment::from_slice(values);
617            assert_eq!(segment, *expected);
618            assert_eq!(values.len(), segment.len());
619
620            let roundtripped = segment.iter().collect::<Vec<_>>();
621            assert_eq!(roundtripped, values);
622
623            let expected_min = values.iter().copied().min();
624            let expected_max = values.iter().copied().max();
625            match segment.range() {
626                Some(range) => {
627                    assert_eq!(range.start(), &expected_min.unwrap());
628                    assert_eq!(range.end(), &expected_max.unwrap());
629                }
630                None => {
631                    assert_eq!(expected_min, None);
632                    assert_eq!(expected_max, None);
633                }
634            }
635
636            for (i, value) in values.iter().enumerate() {
637                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
638                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
639            }
640
641            check_segment_iter(&segment);
642        }
643
644        fn check_segment_iter(segment: &U64Segment) {
645            // Should be able to iterate forwards and backwards, and get the same thing.
646            let forwards = segment.iter().collect::<Vec<_>>();
647            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
648            backwards.reverse();
649            assert_eq!(forwards, backwards);
650
651            // Should be able to pull from both sides in lockstep.
652            let mut expected = Vec::with_capacity(segment.len());
653            let mut actual = Vec::with_capacity(segment.len());
654            let mut iter = segment.iter();
655            // Alternating forwards and backwards
656            for i in 0..segment.len() {
657                if i % 2 == 0 {
658                    actual.push(iter.next().unwrap());
659                    expected.push(segment.get(i / 2).unwrap());
660                } else {
661                    let i = segment.len() - 1 - i / 2;
662                    actual.push(iter.next_back().unwrap());
663                    expected.push(segment.get(i).unwrap());
664                };
665            }
666            assert_eq!(expected, actual);
667        }
668
669        // Empty
670        check_segment(&[], &U64Segment::Range(0..0));
671
672        // Single value
673        check_segment(&[42], &U64Segment::Range(42..43));
674
675        // Contiguous range
676        check_segment(
677            &(100..200).collect::<Vec<_>>(),
678            &U64Segment::Range(100..200),
679        );
680
681        // Range with a hole
682        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
683        check_segment(
684            &values,
685            &U64Segment::RangeWithHoles {
686                range: 0..1000,
687                holes: vec![100].into(),
688            },
689        );
690
691        // Range with every other value missing
692        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
693        check_segment(
694            &values,
695            &U64Segment::RangeWithBitmap {
696                range: 0..999,
697                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
698            },
699        );
700
701        // Sparse but sorted sequence
702        check_segment(
703            &[1, 7000, 24000],
704            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
705        );
706
707        // Sparse unsorted sequence
708        check_segment(
709            &[7000, 1, 24000],
710            &U64Segment::Array(vec![7000, 1, 24000].into()),
711        );
712    }
713
714    #[test]
715    fn test_with_new_high() {
716        // Test Range: contiguous sequence
717        let segment = U64Segment::Range(10..20);
718
719        // Test adding value that extends the range
720        let result = segment.clone().with_new_high(20).unwrap();
721        assert_eq!(result, U64Segment::Range(10..21));
722
723        // Test adding value that creates holes
724        let result = segment.with_new_high(25).unwrap();
725        assert_eq!(
726            result,
727            U64Segment::RangeWithHoles {
728                range: 10..26,
729                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
730            }
731        );
732
733        // Test RangeWithHoles: sequence with existing holes
734        let segment = U64Segment::RangeWithHoles {
735            range: 10..20,
736            holes: EncodedU64Array::U64(vec![15, 17]),
737        };
738
739        // Test adding value that extends the range without new holes
740        let result = segment.clone().with_new_high(20).unwrap();
741        assert_eq!(
742            result,
743            U64Segment::RangeWithHoles {
744                range: 10..21,
745                holes: EncodedU64Array::U64(vec![15, 17]),
746            }
747        );
748
749        // Test adding value that creates additional holes
750        let result = segment.with_new_high(25).unwrap();
751        assert_eq!(
752            result,
753            U64Segment::RangeWithHoles {
754                range: 10..26,
755                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
756            }
757        );
758
759        // Test RangeWithBitmap: sequence with bitmap representation
760        let mut bitmap = Bitmap::new_full(10);
761        bitmap.clear(3); // Clear position 3 (value 13)
762        bitmap.clear(7); // Clear position 7 (value 17)
763        let segment = U64Segment::RangeWithBitmap {
764            range: 10..20,
765            bitmap,
766        };
767
768        // Test adding value that extends the range without new holes
769        let result = segment.clone().with_new_high(20).unwrap();
770        let expected_bitmap = {
771            let mut b = Bitmap::new_full(11);
772            b.clear(3); // Clear position 3 (value 13)
773            b.clear(7); // Clear position 7 (value 17)
774            b
775        };
776        assert_eq!(
777            result,
778            U64Segment::RangeWithBitmap {
779                range: 10..21,
780                bitmap: expected_bitmap,
781            }
782        );
783
784        // Test adding value that creates additional holes
785        let result = segment.with_new_high(25).unwrap();
786        let expected_bitmap = {
787            let mut b = Bitmap::new_full(16);
788            b.clear(3); // Clear position 3 (value 13)
789            b.clear(7); // Clear position 7 (value 17)
790                        // Clear positions 10-14 (values 20-24)
791            for i in 10..15 {
792                b.clear(i);
793            }
794            b
795        };
796        assert_eq!(
797            result,
798            U64Segment::RangeWithBitmap {
799                range: 10..26,
800                bitmap: expected_bitmap,
801            }
802        );
803
804        // Test SortedArray: sparse sorted sequence
805        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
806
807        let result = segment.with_new_high(15).unwrap();
808        assert_eq!(
809            result,
810            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
811        );
812
813        // Test Array: unsorted sequence
814        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
815
816        let result = segment.with_new_high(15).unwrap();
817        assert_eq!(
818            result,
819            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
820        );
821
822        // Test edge cases
823        // Empty segment
824        let segment = U64Segment::Range(0..0);
825        let result = segment.with_new_high(5).unwrap();
826        assert_eq!(result, U64Segment::Range(5..6));
827
828        // Single value segment
829        let segment = U64Segment::Range(42..43);
830        let result = segment.with_new_high(50).unwrap();
831        assert_eq!(
832            result,
833            U64Segment::RangeWithHoles {
834                range: 42..51,
835                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
836            }
837        );
838    }
839
840    #[test]
841    fn test_with_new_high_assertion() {
842        let segment = U64Segment::Range(10..20);
843        // This should return an error because 15 is not higher than the current maximum 19
844        let result = segment.with_new_high(15);
845        assert!(result.is_err());
846        let error = result.unwrap_err();
847        assert!(error
848            .to_string()
849            .contains("New value 15 must be higher than current maximum 19"));
850    }
851
852    #[test]
853    fn test_with_new_high_assertion_equal() {
854        let segment = U64Segment::Range(1..6);
855        // This should return an error because 5 is not higher than the current maximum 5
856        let result = segment.with_new_high(5);
857        assert!(result.is_err());
858        let error = result.unwrap_err();
859        assert!(error
860            .to_string()
861            .contains("New value 5 must be higher than current maximum 5"));
862    }
863
864    #[test]
865    fn test_contains() {
866        // Test Range: contiguous sequence
867        let segment = U64Segment::Range(10..20);
868        assert!(segment.contains(10), "Should contain 10");
869        assert!(segment.contains(15), "Should contain 15");
870        assert!(segment.contains(19), "Should contain 19");
871        assert!(!segment.contains(9), "Should not contain 9");
872        assert!(!segment.contains(20), "Should not contain 20");
873        assert!(!segment.contains(25), "Should not contain 25");
874
875        // Test RangeWithHoles: sequence with holes
876        let segment = U64Segment::RangeWithHoles {
877            range: 10..20,
878            holes: EncodedU64Array::U64(vec![15, 17]),
879        };
880        assert!(segment.contains(10), "Should contain 10");
881        assert!(segment.contains(14), "Should contain 14");
882        assert!(!segment.contains(15), "Should not contain 15 (hole)");
883        assert!(segment.contains(16), "Should contain 16");
884        assert!(!segment.contains(17), "Should not contain 17 (hole)");
885        assert!(segment.contains(18), "Should contain 18");
886        assert!(
887            !segment.contains(20),
888            "Should not contain 20 (out of range)"
889        );
890
891        // Test RangeWithBitmap: sequence with bitmap
892        let mut bitmap = Bitmap::new_full(10);
893        bitmap.clear(3); // Clear position 3 (value 13)
894        bitmap.clear(7); // Clear position 7 (value 17)
895        let segment = U64Segment::RangeWithBitmap {
896            range: 10..20,
897            bitmap,
898        };
899        assert!(segment.contains(10), "Should contain 10");
900        assert!(segment.contains(12), "Should contain 12");
901        assert!(
902            !segment.contains(13),
903            "Should not contain 13 (cleared in bitmap)"
904        );
905        assert!(segment.contains(16), "Should contain 16");
906        assert!(
907            !segment.contains(17),
908            "Should not contain 17 (cleared in bitmap)"
909        );
910        assert!(segment.contains(19), "Should contain 19");
911        assert!(
912            !segment.contains(20),
913            "Should not contain 20 (out of range)"
914        );
915
916        // Test SortedArray: sparse sorted sequence
917        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
918        assert!(segment.contains(1), "Should contain 1");
919        assert!(segment.contains(5), "Should contain 5");
920        assert!(segment.contains(10), "Should contain 10");
921        assert!(!segment.contains(0), "Should not contain 0");
922        assert!(!segment.contains(3), "Should not contain 3");
923        assert!(!segment.contains(15), "Should not contain 15");
924
925        // Test Array: unsorted sequence
926        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
927        assert!(segment.contains(1), "Should contain 1");
928        assert!(segment.contains(5), "Should contain 5");
929        assert!(segment.contains(10), "Should contain 10");
930        assert!(!segment.contains(0), "Should not contain 0");
931        assert!(!segment.contains(3), "Should not contain 3");
932        assert!(!segment.contains(15), "Should not contain 15");
933
934        // Test empty segment
935        let segment = U64Segment::Range(0..0);
936        assert!(
937            !segment.contains(0),
938            "Empty segment should not contain anything"
939        );
940        assert!(
941            !segment.contains(5),
942            "Empty segment should not contain anything"
943        );
944    }
945}