lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8use snafu::location;
9
10/// Different ways to represent a sequence of distinct u64s.
11///
12/// This is designed to be especially efficient for sequences that are sorted,
13/// but not meaningfully larger than a Vec<u64> in the worst case.
14///
15/// The representation is chosen based on the properties of the sequence:
16///                                                           
17///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
18///    │                ▼                                 
19///    │                No                                
20///    │                ▼                                 
21///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
22///    │                ▼                                 
23///    │                No─────────────► SortedArray      
24///    ▼                                                    
25///    No──────────────────────────────► Array            
26///
27/// "Dense" is decided based on the estimated byte size of the representation.
28///
29/// Size of RangeWithBitMap for N values:
30///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
31/// Size of SortedArray for N values (assuming u16 packed):
32///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
33///
34#[derive(Debug, PartialEq, Eq, Clone)]
35pub enum U64Segment {
36    /// A contiguous sorted range of row ids.
37    ///
38    /// Total size: 16 bytes
39    Range(Range<u64>),
40    /// A sorted range of row ids, that is mostly contiguous.
41    ///
42    /// Total size: 24 bytes + n_holes * 4 bytes
43    /// Use when: 32 * n_holes < max - min
44    RangeWithHoles {
45        range: Range<u64>,
46        /// Bitmap of offsets from the start of the range that are holes.
47        /// This is sorted, so binary search can be used. It's typically
48        /// relatively small.
49        holes: EncodedU64Array,
50    },
51    /// A sorted range of row ids, that is mostly contiguous.
52    ///
53    /// Bitmap is 1 when the value is present, 0 when it's missing.
54    ///
55    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
56    /// Use when: max - min > 16 * len
57    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
58    /// A sorted array of row ids, that is sparse.
59    ///
60    /// Total size: 24 bytes + 2 * n_values bytes
61    SortedArray(EncodedU64Array),
62    /// An array of row ids, that is not sorted.
63    Array(EncodedU64Array),
64}
65
66impl DeepSizeOf for U64Segment {
67    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
68        match self {
69            Self::Range(_) => 0,
70            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
71            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
72            Self::SortedArray(array) => array.deep_size_of_children(context),
73            Self::Array(array) => array.deep_size_of_children(context),
74        }
75    }
76}
77
78/// Statistics about a segment of u64s.
79#[derive(Debug)]
80struct SegmentStats {
81    /// Min value in the segment.
82    min: u64,
83    /// Max value in the segment
84    max: u64,
85    /// Total number of values in the segment
86    count: u64,
87    /// Whether the segment is sorted
88    sorted: bool,
89}
90
91impl SegmentStats {
92    fn n_holes(&self) -> u64 {
93        debug_assert!(self.sorted);
94        if self.count == 0 {
95            0
96        } else {
97            let total_slots = self.max - self.min + 1;
98            total_slots - self.count
99        }
100    }
101}
102
103impl U64Segment {
104    /// Return the values that are missing from the slice.
105    fn holes_in_slice<'a>(
106        range: RangeInclusive<u64>,
107        existing: impl IntoIterator<Item = u64> + 'a,
108    ) -> impl Iterator<Item = u64> + 'a {
109        let mut existing = existing.into_iter().peekable();
110        range.filter(move |val| {
111            if let Some(&existing_val) = existing.peek() {
112                if existing_val == *val {
113                    existing.next();
114                    return false;
115                }
116            }
117            true
118        })
119    }
120
121    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
122        let mut sorted = true;
123        let mut min = u64::MAX;
124        let mut max = 0;
125        let mut count = 0;
126
127        for val in values {
128            count += 1;
129            if val < min {
130                min = val;
131            }
132            if val > max {
133                max = val;
134            }
135            if sorted && count > 1 && val < max {
136                sorted = false;
137            }
138        }
139
140        if count == 0 {
141            min = 0;
142            max = 0;
143        }
144
145        SegmentStats {
146            min,
147            max,
148            count,
149            sorted,
150        }
151    }
152
153    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
154        let n_holes = stats.n_holes();
155        let total_slots = stats.max - stats.min + 1;
156
157        let range_with_holes = 24 + 4 * n_holes as usize;
158        let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize;
159        let sorted_array = 24 + 2 * stats.count as usize;
160
161        [range_with_holes, range_with_bitmap, sorted_array]
162    }
163
164    fn from_stats_and_sequence(
165        stats: SegmentStats,
166        sequence: impl IntoIterator<Item = u64>,
167    ) -> Self {
168        if stats.sorted {
169            let n_holes = stats.n_holes();
170            if stats.count == 0 {
171                Self::Range(0..0)
172            } else if n_holes == 0 {
173                Self::Range(stats.min..(stats.max + 1))
174            } else {
175                let sizes = Self::sorted_sequence_sizes(&stats);
176                let min_size = sizes.iter().min().unwrap();
177                if min_size == &sizes[0] {
178                    let range = stats.min..(stats.max + 1);
179                    let mut holes =
180                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
181                    holes.sort_unstable();
182                    let holes = EncodedU64Array::from(holes);
183
184                    Self::RangeWithHoles { range, holes }
185                } else if min_size == &sizes[1] {
186                    let range = stats.min..(stats.max + 1);
187                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
188
189                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
190                        let offset = (hole - stats.min) as usize;
191                        bitmap.clear(offset);
192                    }
193
194                    Self::RangeWithBitmap { range, bitmap }
195                } else {
196                    // Must use array, but at least it's sorted
197                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
198                }
199            }
200        } else {
201            // Must use array
202            Self::Array(EncodedU64Array::from_iter(sequence))
203        }
204    }
205
206    pub fn from_slice(slice: &[u64]) -> Self {
207        let stats = Self::compute_stats(slice.iter().copied());
208        Self::from_stats_and_sequence(stats, slice.iter().copied())
209    }
210}
211
212impl U64Segment {
213    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
214        match self {
215            Self::Range(range) => Box::new(range.clone()),
216            Self::RangeWithHoles { range, holes } => {
217                Box::new((range.start..range.end).filter(move |&val| {
218                    // TODO: we could write a more optimal version of this
219                    // iterator, but would need special handling to make it
220                    // double ended.
221                    holes.binary_search(val).is_err()
222                }))
223            }
224            Self::RangeWithBitmap { range, bitmap } => {
225                Box::new((range.start..range.end).filter(|val| {
226                    let offset = (val - range.start) as usize;
227                    bitmap.get(offset)
228                }))
229            }
230            Self::SortedArray(array) => Box::new(array.iter()),
231            Self::Array(array) => Box::new(array.iter()),
232        }
233    }
234
235    pub fn len(&self) -> usize {
236        match self {
237            Self::Range(range) => (range.end - range.start) as usize,
238            Self::RangeWithHoles { range, holes } => {
239                let holes = holes.iter().count();
240                (range.end - range.start) as usize - holes
241            }
242            Self::RangeWithBitmap { range, bitmap } => {
243                let holes = bitmap.count_zeros();
244                (range.end - range.start) as usize - holes
245            }
246            Self::SortedArray(array) => array.len(),
247            Self::Array(array) => array.len(),
248        }
249    }
250
251    pub fn is_empty(&self) -> bool {
252        self.len() == 0
253    }
254
255    /// Get the min and max value of the segment, excluding tombstones.
256    pub fn range(&self) -> Option<RangeInclusive<u64>> {
257        match self {
258            Self::Range(range) if range.is_empty() => None,
259            Self::Range(range)
260            | Self::RangeWithBitmap { range, .. }
261            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
262            Self::SortedArray(array) => {
263                // We can assume that the array is sorted.
264                let min_value = array.first().unwrap();
265                let max_value = array.last().unwrap();
266                Some(min_value..=max_value)
267            }
268            Self::Array(array) => {
269                let min_value = array.min().unwrap();
270                let max_value = array.max().unwrap();
271                Some(min_value..=max_value)
272            }
273        }
274    }
275
276    pub fn slice(&self, offset: usize, len: usize) -> Self {
277        match self {
278            Self::Range(range) => {
279                let start = range.start + offset as u64;
280                Self::Range(start..(start + len as u64))
281            }
282            Self::RangeWithHoles { range, holes } => {
283                let start = range.start + offset as u64;
284                let end = start + len as u64;
285
286                let start = holes.binary_search(start).unwrap_or_else(|x| x) as u64;
287                let end = holes.binary_search(end).unwrap_or_else(|x| x) as u64;
288                let holes_len = end - start;
289
290                if holes_len == 0 {
291                    Self::Range(start..end)
292                } else {
293                    let holes = holes.slice(start as usize, holes_len as usize);
294                    Self::RangeWithHoles {
295                        range: start..end,
296                        holes,
297                    }
298                }
299            }
300            Self::RangeWithBitmap { range, bitmap } => {
301                let start = range.start + offset as u64;
302                let end = start + len as u64;
303
304                let bitmap = bitmap.slice(offset, len);
305                if bitmap.count_ones() == len {
306                    // Bitmap no longer serves a purpose
307                    Self::Range(start..end)
308                    // TODO: could also have a case where we switch back to RangeWithHoles
309                } else {
310                    Self::RangeWithBitmap {
311                        range: start..end,
312                        bitmap: bitmap.into(),
313                    }
314                }
315            }
316            Self::SortedArray(array) => Self::SortedArray(array.slice(offset, len)),
317            Self::Array(array) => Self::Array(array.slice(offset, len)),
318        }
319    }
320
321    pub fn position(&self, val: u64) -> Option<usize> {
322        match self {
323            Self::Range(range) => {
324                if range.contains(&val) {
325                    Some((val - range.start) as usize)
326                } else {
327                    None
328                }
329            }
330            Self::RangeWithHoles { range, holes } => {
331                if range.contains(&val) && holes.binary_search(val).is_err() {
332                    let offset = (val - range.start) as usize;
333                    let holes = holes.iter().take_while(|&hole| hole < val).count();
334                    Some(offset - holes)
335                } else {
336                    None
337                }
338            }
339            Self::RangeWithBitmap { range, bitmap } => {
340                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
341                    let offset = (val - range.start) as usize;
342                    let num_zeros = bitmap.slice(0, offset).count_zeros();
343                    Some(offset - num_zeros)
344                } else {
345                    None
346                }
347            }
348            Self::SortedArray(array) => array.binary_search(val).ok(),
349            Self::Array(array) => array.iter().position(|v| v == val),
350        }
351    }
352
353    pub fn get(&self, i: usize) -> Option<u64> {
354        match self {
355            Self::Range(range) => match range.start.checked_add(i as u64) {
356                Some(val) if val < range.end => Some(val),
357                _ => None,
358            },
359            Self::RangeWithHoles { range, .. } => {
360                if i >= (range.end - range.start) as usize {
361                    return None;
362                }
363                self.iter().nth(i)
364            }
365            Self::RangeWithBitmap { range, .. } => {
366                if i >= (range.end - range.start) as usize {
367                    return None;
368                }
369                self.iter().nth(i)
370            }
371            Self::SortedArray(array) => array.get(i),
372            Self::Array(array) => array.get(i),
373        }
374    }
375
376    /// Check if a value is contained in the segment
377    pub fn contains(&self, val: u64) -> bool {
378        match self {
379            Self::Range(range) => range.contains(&val),
380            Self::RangeWithHoles { range, holes } => {
381                if !range.contains(&val) {
382                    return false;
383                }
384                // Check if the value is not in the holes
385                !holes.iter().any(|hole| hole == val)
386            }
387            Self::RangeWithBitmap { range, bitmap } => {
388                if !range.contains(&val) {
389                    return false;
390                }
391                // Check if the bitmap has the value set (not cleared)
392                let idx = (val - range.start) as usize;
393                bitmap.get(idx)
394            }
395            Self::SortedArray(array) => array.binary_search(val).is_ok(),
396            Self::Array(array) => array.iter().any(|v| v == val),
397        }
398    }
399
400    /// Produce a new segment that has [`val`] as the new highest value in the segment
401    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
402        // Check that the new value is higher than the current maximum
403        if let Some(range) = self.range() {
404            if val <= *range.end() {
405                return Err(lance_core::Error::invalid_input(
406                    format!(
407                        "New value {} must be higher than current maximum {}",
408                        val,
409                        range.end()
410                    ),
411                    location!(),
412                ));
413            }
414        }
415
416        Ok(match self {
417            Self::Range(range) => {
418                // Special case for empty range: create a range containing only the new value
419                if range.start == range.end {
420                    Self::Range(Range {
421                        start: val,
422                        end: val + 1,
423                    })
424                } else if val == range.end {
425                    Self::Range(Range {
426                        start: range.start,
427                        end: val + 1,
428                    })
429                } else {
430                    Self::RangeWithHoles {
431                        range: Range {
432                            start: range.start,
433                            end: val + 1,
434                        },
435                        holes: EncodedU64Array::U64((range.end..val).collect()),
436                    }
437                }
438            }
439            Self::RangeWithHoles { range, holes } => {
440                if val == range.end {
441                    Self::RangeWithHoles {
442                        range: Range {
443                            start: range.start,
444                            end: val + 1,
445                        },
446                        holes,
447                    }
448                } else {
449                    let mut new_holes: Vec<u64> = holes.iter().collect();
450                    new_holes.extend(range.end..val);
451                    Self::RangeWithHoles {
452                        range: Range {
453                            start: range.start,
454                            end: val + 1,
455                        },
456                        holes: EncodedU64Array::U64(new_holes),
457                    }
458                }
459            }
460            Self::RangeWithBitmap { range, bitmap } => {
461                let new_range = Range {
462                    start: range.start,
463                    end: val + 1,
464                };
465                let gap_size = (val - range.end) as usize;
466                let new_bitmap = bitmap
467                    .iter()
468                    .chain(std::iter::repeat_n(false, gap_size))
469                    .chain(std::iter::once(true))
470                    .collect::<Vec<bool>>();
471
472                Self::RangeWithBitmap {
473                    range: new_range,
474                    bitmap: Bitmap::from(new_bitmap.as_slice()),
475                }
476            }
477            Self::SortedArray(array) => match array {
478                EncodedU64Array::U64(mut vec) => {
479                    vec.push(val);
480                    Self::SortedArray(EncodedU64Array::U64(vec))
481                }
482                EncodedU64Array::U16 { base, offsets } => {
483                    if let Some(offset) = val.checked_sub(base) {
484                        if offset <= u16::MAX as u64 {
485                            let mut offsets = offsets;
486                            offsets.push(offset as u16);
487                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
488                        } else if offset <= u32::MAX as u64 {
489                            let mut u32_offsets: Vec<u32> =
490                                offsets.into_iter().map(|o| o as u32).collect();
491                            u32_offsets.push(offset as u32);
492                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
493                                base,
494                                offsets: u32_offsets,
495                            }));
496                        }
497                    }
498                    let mut new_array: Vec<u64> =
499                        offsets.into_iter().map(|o| base + o as u64).collect();
500                    new_array.push(val);
501                    Self::SortedArray(EncodedU64Array::from(new_array))
502                }
503                EncodedU64Array::U32 { base, mut offsets } => {
504                    if let Some(offset) = val.checked_sub(base) {
505                        if offset <= u32::MAX as u64 {
506                            offsets.push(offset as u32);
507                            return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
508                        }
509                    }
510                    let mut new_array: Vec<u64> =
511                        offsets.into_iter().map(|o| base + o as u64).collect();
512                    new_array.push(val);
513                    Self::SortedArray(EncodedU64Array::from(new_array))
514                }
515            },
516            Self::Array(array) => match array {
517                EncodedU64Array::U64(mut vec) => {
518                    vec.push(val);
519                    Self::Array(EncodedU64Array::U64(vec))
520                }
521                EncodedU64Array::U16 { base, offsets } => {
522                    if let Some(offset) = val.checked_sub(base) {
523                        if offset <= u16::MAX as u64 {
524                            let mut offsets = offsets;
525                            offsets.push(offset as u16);
526                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
527                        } else if offset <= u32::MAX as u64 {
528                            let mut u32_offsets: Vec<u32> =
529                                offsets.into_iter().map(|o| o as u32).collect();
530                            u32_offsets.push(offset as u32);
531                            return Ok(Self::Array(EncodedU64Array::U32 {
532                                base,
533                                offsets: u32_offsets,
534                            }));
535                        }
536                    }
537                    let mut new_array: Vec<u64> =
538                        offsets.into_iter().map(|o| base + o as u64).collect();
539                    new_array.push(val);
540                    Self::Array(EncodedU64Array::from(new_array))
541                }
542                EncodedU64Array::U32 { base, mut offsets } => {
543                    if let Some(offset) = val.checked_sub(base) {
544                        if offset <= u32::MAX as u64 {
545                            offsets.push(offset as u32);
546                            return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
547                        }
548                    }
549                    let mut new_array: Vec<u64> =
550                        offsets.into_iter().map(|o| base + o as u64).collect();
551                    new_array.push(val);
552                    Self::Array(EncodedU64Array::from(new_array))
553                }
554            },
555        })
556    }
557
558    /// Delete a set of row ids from the segment.
559    /// The row ids are assumed to be in the segment. (within the range, not
560    /// already deleted.)
561    /// They are also assumed to be ordered by appearance in the segment.
562    pub fn delete(&self, vals: &[u64]) -> Self {
563        // TODO: can we enforce these assumptions? or make them safer?
564        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
565
566        let make_new_iter = || {
567            let mut vals_iter = vals.iter().copied().peekable();
568            self.iter().filter(move |val| {
569                if let Some(&next_val) = vals_iter.peek() {
570                    if next_val == *val {
571                        vals_iter.next();
572                        return false;
573                    }
574                }
575                true
576            })
577        };
578        let stats = Self::compute_stats(make_new_iter());
579        Self::from_stats_and_sequence(stats, make_new_iter())
580    }
581
582    pub fn mask(&mut self, positions: &[u32]) {
583        if positions.is_empty() {
584            return;
585        }
586        if positions.len() == self.len() {
587            *self = Self::Range(0..0);
588            return;
589        }
590        let count = (self.len() - positions.len()) as u64;
591        let sorted = match self {
592            Self::Range(_) => true,
593            Self::RangeWithHoles { .. } => true,
594            Self::RangeWithBitmap { .. } => true,
595            Self::SortedArray(_) => true,
596            Self::Array(_) => false,
597        };
598        // To get minimum, need to find the first value that is not masked.
599        let first_unmasked = (0..self.len())
600            .zip(positions.iter().cycle())
601            .find(|(sequential_i, i)| **i != *sequential_i as u32)
602            .map(|(sequential_i, _)| sequential_i)
603            .unwrap();
604        let min = self.get(first_unmasked).unwrap();
605
606        let last_unmasked = (0..self.len())
607            .rev()
608            .zip(positions.iter().rev().cycle())
609            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
610            .map(|(sequential_i, _)| sequential_i)
611            .next()
612            .unwrap();
613        let max = self.get(last_unmasked).unwrap();
614
615        let stats = SegmentStats {
616            min,
617            max,
618            count,
619            sorted,
620        };
621
622        let mut positions = positions.iter().copied().peekable();
623        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
624            if let Some(next_pos) = positions.peek() {
625                if *next_pos == i as u32 {
626                    positions.next();
627                    return None;
628                }
629            }
630            Some(val)
631        });
632        *self = Self::from_stats_and_sequence(stats, sequence)
633    }
634}
635
636#[cfg(test)]
637mod test {
638    use super::*;
639
640    #[test]
641    fn test_segments() {
642        fn check_segment(values: &[u64], expected: &U64Segment) {
643            let segment = U64Segment::from_slice(values);
644            assert_eq!(segment, *expected);
645            assert_eq!(values.len(), segment.len());
646
647            let roundtripped = segment.iter().collect::<Vec<_>>();
648            assert_eq!(roundtripped, values);
649
650            let expected_min = values.iter().copied().min();
651            let expected_max = values.iter().copied().max();
652            match segment.range() {
653                Some(range) => {
654                    assert_eq!(range.start(), &expected_min.unwrap());
655                    assert_eq!(range.end(), &expected_max.unwrap());
656                }
657                None => {
658                    assert_eq!(expected_min, None);
659                    assert_eq!(expected_max, None);
660                }
661            }
662
663            for (i, value) in values.iter().enumerate() {
664                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
665                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
666            }
667
668            check_segment_iter(&segment);
669        }
670
671        fn check_segment_iter(segment: &U64Segment) {
672            // Should be able to iterate forwards and backwards, and get the same thing.
673            let forwards = segment.iter().collect::<Vec<_>>();
674            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
675            backwards.reverse();
676            assert_eq!(forwards, backwards);
677
678            // Should be able to pull from both sides in lockstep.
679            let mut expected = Vec::with_capacity(segment.len());
680            let mut actual = Vec::with_capacity(segment.len());
681            let mut iter = segment.iter();
682            // Alternating forwards and backwards
683            for i in 0..segment.len() {
684                if i % 2 == 0 {
685                    actual.push(iter.next().unwrap());
686                    expected.push(segment.get(i / 2).unwrap());
687                } else {
688                    let i = segment.len() - 1 - i / 2;
689                    actual.push(iter.next_back().unwrap());
690                    expected.push(segment.get(i).unwrap());
691                };
692            }
693            assert_eq!(expected, actual);
694        }
695
696        // Empty
697        check_segment(&[], &U64Segment::Range(0..0));
698
699        // Single value
700        check_segment(&[42], &U64Segment::Range(42..43));
701
702        // Contiguous range
703        check_segment(
704            &(100..200).collect::<Vec<_>>(),
705            &U64Segment::Range(100..200),
706        );
707
708        // Range with a hole
709        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
710        check_segment(
711            &values,
712            &U64Segment::RangeWithHoles {
713                range: 0..1000,
714                holes: vec![100].into(),
715            },
716        );
717
718        // Range with every other value missing
719        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
720        check_segment(
721            &values,
722            &U64Segment::RangeWithBitmap {
723                range: 0..999,
724                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
725            },
726        );
727
728        // Sparse but sorted sequence
729        check_segment(
730            &[1, 7000, 24000],
731            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
732        );
733
734        // Sparse unsorted sequence
735        check_segment(
736            &[7000, 1, 24000],
737            &U64Segment::Array(vec![7000, 1, 24000].into()),
738        );
739    }
740
741    #[test]
742    fn test_with_new_high() {
743        // Test Range: contiguous sequence
744        let segment = U64Segment::Range(10..20);
745
746        // Test adding value that extends the range
747        let result = segment.clone().with_new_high(20).unwrap();
748        assert_eq!(result, U64Segment::Range(10..21));
749
750        // Test adding value that creates holes
751        let result = segment.with_new_high(25).unwrap();
752        assert_eq!(
753            result,
754            U64Segment::RangeWithHoles {
755                range: 10..26,
756                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
757            }
758        );
759
760        // Test RangeWithHoles: sequence with existing holes
761        let segment = U64Segment::RangeWithHoles {
762            range: 10..20,
763            holes: EncodedU64Array::U64(vec![15, 17]),
764        };
765
766        // Test adding value that extends the range without new holes
767        let result = segment.clone().with_new_high(20).unwrap();
768        assert_eq!(
769            result,
770            U64Segment::RangeWithHoles {
771                range: 10..21,
772                holes: EncodedU64Array::U64(vec![15, 17]),
773            }
774        );
775
776        // Test adding value that creates additional holes
777        let result = segment.with_new_high(25).unwrap();
778        assert_eq!(
779            result,
780            U64Segment::RangeWithHoles {
781                range: 10..26,
782                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
783            }
784        );
785
786        // Test RangeWithBitmap: sequence with bitmap representation
787        let mut bitmap = Bitmap::new_full(10);
788        bitmap.clear(3); // Clear position 3 (value 13)
789        bitmap.clear(7); // Clear position 7 (value 17)
790        let segment = U64Segment::RangeWithBitmap {
791            range: 10..20,
792            bitmap,
793        };
794
795        // Test adding value that extends the range without new holes
796        let result = segment.clone().with_new_high(20).unwrap();
797        let expected_bitmap = {
798            let mut b = Bitmap::new_full(11);
799            b.clear(3); // Clear position 3 (value 13)
800            b.clear(7); // Clear position 7 (value 17)
801            b
802        };
803        assert_eq!(
804            result,
805            U64Segment::RangeWithBitmap {
806                range: 10..21,
807                bitmap: expected_bitmap,
808            }
809        );
810
811        // Test adding value that creates additional holes
812        let result = segment.with_new_high(25).unwrap();
813        let expected_bitmap = {
814            let mut b = Bitmap::new_full(16);
815            b.clear(3); // Clear position 3 (value 13)
816            b.clear(7); // Clear position 7 (value 17)
817                        // Clear positions 10-14 (values 20-24)
818            for i in 10..15 {
819                b.clear(i);
820            }
821            b
822        };
823        assert_eq!(
824            result,
825            U64Segment::RangeWithBitmap {
826                range: 10..26,
827                bitmap: expected_bitmap,
828            }
829        );
830
831        // Test SortedArray: sparse sorted sequence
832        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
833
834        let result = segment.with_new_high(15).unwrap();
835        assert_eq!(
836            result,
837            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
838        );
839
840        // Test Array: unsorted sequence
841        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
842
843        let result = segment.with_new_high(15).unwrap();
844        assert_eq!(
845            result,
846            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
847        );
848
849        // Test edge cases
850        // Empty segment
851        let segment = U64Segment::Range(0..0);
852        let result = segment.with_new_high(5).unwrap();
853        assert_eq!(result, U64Segment::Range(5..6));
854
855        // Single value segment
856        let segment = U64Segment::Range(42..43);
857        let result = segment.with_new_high(50).unwrap();
858        assert_eq!(
859            result,
860            U64Segment::RangeWithHoles {
861                range: 42..51,
862                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
863            }
864        );
865    }
866
867    #[test]
868    fn test_with_new_high_assertion() {
869        let segment = U64Segment::Range(10..20);
870        // This should return an error because 15 is not higher than the current maximum 19
871        let result = segment.with_new_high(15);
872        assert!(result.is_err());
873        let error = result.unwrap_err();
874        assert!(error
875            .to_string()
876            .contains("New value 15 must be higher than current maximum 19"));
877    }
878
879    #[test]
880    fn test_with_new_high_assertion_equal() {
881        let segment = U64Segment::Range(1..6);
882        // This should return an error because 5 is not higher than the current maximum 5
883        let result = segment.with_new_high(5);
884        assert!(result.is_err());
885        let error = result.unwrap_err();
886        assert!(error
887            .to_string()
888            .contains("New value 5 must be higher than current maximum 5"));
889    }
890
891    #[test]
892    fn test_contains() {
893        // Test Range: contiguous sequence
894        let segment = U64Segment::Range(10..20);
895        assert!(segment.contains(10), "Should contain 10");
896        assert!(segment.contains(15), "Should contain 15");
897        assert!(segment.contains(19), "Should contain 19");
898        assert!(!segment.contains(9), "Should not contain 9");
899        assert!(!segment.contains(20), "Should not contain 20");
900        assert!(!segment.contains(25), "Should not contain 25");
901
902        // Test RangeWithHoles: sequence with holes
903        let segment = U64Segment::RangeWithHoles {
904            range: 10..20,
905            holes: EncodedU64Array::U64(vec![15, 17]),
906        };
907        assert!(segment.contains(10), "Should contain 10");
908        assert!(segment.contains(14), "Should contain 14");
909        assert!(!segment.contains(15), "Should not contain 15 (hole)");
910        assert!(segment.contains(16), "Should contain 16");
911        assert!(!segment.contains(17), "Should not contain 17 (hole)");
912        assert!(segment.contains(18), "Should contain 18");
913        assert!(
914            !segment.contains(20),
915            "Should not contain 20 (out of range)"
916        );
917
918        // Test RangeWithBitmap: sequence with bitmap
919        let mut bitmap = Bitmap::new_full(10);
920        bitmap.clear(3); // Clear position 3 (value 13)
921        bitmap.clear(7); // Clear position 7 (value 17)
922        let segment = U64Segment::RangeWithBitmap {
923            range: 10..20,
924            bitmap,
925        };
926        assert!(segment.contains(10), "Should contain 10");
927        assert!(segment.contains(12), "Should contain 12");
928        assert!(
929            !segment.contains(13),
930            "Should not contain 13 (cleared in bitmap)"
931        );
932        assert!(segment.contains(16), "Should contain 16");
933        assert!(
934            !segment.contains(17),
935            "Should not contain 17 (cleared in bitmap)"
936        );
937        assert!(segment.contains(19), "Should contain 19");
938        assert!(
939            !segment.contains(20),
940            "Should not contain 20 (out of range)"
941        );
942
943        // Test SortedArray: sparse sorted sequence
944        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
945        assert!(segment.contains(1), "Should contain 1");
946        assert!(segment.contains(5), "Should contain 5");
947        assert!(segment.contains(10), "Should contain 10");
948        assert!(!segment.contains(0), "Should not contain 0");
949        assert!(!segment.contains(3), "Should not contain 3");
950        assert!(!segment.contains(15), "Should not contain 15");
951
952        // Test Array: unsorted sequence
953        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
954        assert!(segment.contains(1), "Should contain 1");
955        assert!(segment.contains(5), "Should contain 5");
956        assert!(segment.contains(10), "Should contain 10");
957        assert!(!segment.contains(0), "Should not contain 0");
958        assert!(!segment.contains(3), "Should not contain 3");
959        assert!(!segment.contains(15), "Should not contain 15");
960
961        // Test empty segment
962        let segment = U64Segment::Range(0..0);
963        assert!(
964            !segment.contains(0),
965            "Empty segment should not contain anything"
966        );
967        assert!(
968            !segment.contains(5),
969            "Empty segment should not contain anything"
970        );
971    }
972}