Skip to main content

lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8
9/// Different ways to represent a sequence of distinct u64s.
10///
11/// This is designed to be especially efficient for sequences that are sorted,
12/// but not meaningfully larger than a `Vec<u64>` in the worst case.
13///
14/// The representation is chosen based on the properties of the sequence:
15///                                                           
16///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
17///    │                ▼                                 
18///    │                No                                
19///    │                ▼                                 
20///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
21///    │                ▼                                 
22///    │                No─────────────► SortedArray      
23///    ▼                                                    
24///    No──────────────────────────────► Array            
25///
26/// "Dense" is decided based on the estimated byte size of the representation.
27///
28/// Size of RangeWithBitMap for N values:
29///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
30/// Size of SortedArray for N values (assuming u16 packed):
31///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
32///
33#[derive(Debug, PartialEq, Eq, Clone)]
34pub enum U64Segment {
35    /// A contiguous sorted range of row ids.
36    ///
37    /// Total size: 16 bytes
38    Range(Range<u64>),
39    /// A sorted range of row ids, that is mostly contiguous.
40    ///
41    /// Total size: 24 bytes + n_holes * 4 bytes
42    /// Use when: 32 * n_holes < max - min
43    RangeWithHoles {
44        range: Range<u64>,
45        /// Bitmap of offsets from the start of the range that are holes.
46        /// This is sorted, so binary search can be used. It's typically
47        /// relatively small.
48        holes: EncodedU64Array,
49    },
50    /// A sorted range of row ids, that is mostly contiguous.
51    ///
52    /// Bitmap is 1 when the value is present, 0 when it's missing.
53    ///
54    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
55    /// Use when: max - min > 16 * len
56    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
57    /// A sorted array of row ids, that is sparse.
58    ///
59    /// Total size: 24 bytes + 2 * n_values bytes
60    SortedArray(EncodedU64Array),
61    /// An array of row ids, that is not sorted.
62    Array(EncodedU64Array),
63}
64
65impl DeepSizeOf for U64Segment {
66    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
67        match self {
68            Self::Range(_) => 0,
69            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
70            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
71            Self::SortedArray(array) => array.deep_size_of_children(context),
72            Self::Array(array) => array.deep_size_of_children(context),
73        }
74    }
75}
76
77/// Statistics about a segment of u64s.
78#[derive(Debug)]
79struct SegmentStats {
80    /// Min value in the segment.
81    min: u64,
82    /// Max value in the segment
83    max: u64,
84    /// Total number of values in the segment
85    count: u64,
86    /// Whether the segment is sorted
87    sorted: bool,
88}
89
90impl SegmentStats {
91    fn n_holes(&self) -> u64 {
92        debug_assert!(self.sorted);
93        if self.count == 0 {
94            0
95        } else {
96            let total_slots = self.max - self.min + 1;
97            total_slots - self.count
98        }
99    }
100}
101
102impl U64Segment {
103    /// Return the values that are missing from the slice.
104    fn holes_in_slice<'a>(
105        range: RangeInclusive<u64>,
106        existing: impl IntoIterator<Item = u64> + 'a,
107    ) -> impl Iterator<Item = u64> + 'a {
108        let mut existing = existing.into_iter().peekable();
109        range.filter(move |val| {
110            if let Some(&existing_val) = existing.peek()
111                && existing_val == *val
112            {
113                existing.next();
114                return false;
115            }
116            true
117        })
118    }
119
120    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
121        let mut sorted = true;
122        let mut min = u64::MAX;
123        let mut max = 0;
124        let mut count = 0;
125
126        for val in values {
127            count += 1;
128            if val < min {
129                min = val;
130            }
131            if val > max {
132                max = val;
133            }
134            if sorted && count > 1 && val < max {
135                sorted = false;
136            }
137        }
138
139        if count == 0 {
140            min = 0;
141            max = 0;
142        }
143
144        SegmentStats {
145            min,
146            max,
147            count,
148            sorted,
149        }
150    }
151
152    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
153        let n_holes = stats.n_holes();
154        let total_slots = stats.max - stats.min + 1;
155
156        let range_with_holes = 24 + 4 * n_holes as usize;
157        let range_with_bitmap = 24 + (total_slots as f64 / 8.0).ceil() as usize;
158        let sorted_array = 24 + 2 * stats.count as usize;
159
160        [range_with_holes, range_with_bitmap, sorted_array]
161    }
162
163    fn from_stats_and_sequence(
164        stats: SegmentStats,
165        sequence: impl IntoIterator<Item = u64>,
166    ) -> Self {
167        if stats.sorted {
168            let n_holes = stats.n_holes();
169            if stats.count == 0 {
170                Self::Range(0..0)
171            } else if n_holes == 0 {
172                Self::Range(stats.min..(stats.max + 1))
173            } else {
174                let sizes = Self::sorted_sequence_sizes(&stats);
175                let min_size = sizes.iter().min().unwrap();
176                if min_size == &sizes[0] {
177                    let range = stats.min..(stats.max + 1);
178                    let mut holes =
179                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
180                    holes.sort_unstable();
181                    let holes = EncodedU64Array::from(holes);
182
183                    Self::RangeWithHoles { range, holes }
184                } else if min_size == &sizes[1] {
185                    let range = stats.min..(stats.max + 1);
186                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
187
188                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
189                        let offset = (hole - stats.min) as usize;
190                        bitmap.clear(offset);
191                    }
192
193                    Self::RangeWithBitmap { range, bitmap }
194                } else {
195                    // Must use array, but at least it's sorted
196                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
197                }
198            }
199        } else {
200            // Must use array
201            Self::Array(EncodedU64Array::from_iter(sequence))
202        }
203    }
204
205    pub fn from_slice(slice: &[u64]) -> Self {
206        Self::from_iter(slice.iter().copied())
207    }
208}
209
210impl FromIterator<u64> for U64Segment {
211    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
212        let values: Vec<u64> = iter.into_iter().collect();
213        let stats = Self::compute_stats(values.iter().copied());
214        Self::from_stats_and_sequence(stats, values)
215    }
216}
217
218impl U64Segment {
219    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
220        match self {
221            Self::Range(range) => Box::new(range.clone()),
222            Self::RangeWithHoles { range, holes } => {
223                Box::new((range.start..range.end).filter(move |&val| {
224                    // TODO: we could write a more optimal version of this
225                    // iterator, but would need special handling to make it
226                    // double ended.
227                    holes.binary_search(val).is_err()
228                }))
229            }
230            Self::RangeWithBitmap { range, bitmap } => {
231                Box::new((range.start..range.end).filter(|val| {
232                    let offset = (val - range.start) as usize;
233                    bitmap.get(offset)
234                }))
235            }
236            Self::SortedArray(array) => Box::new(array.iter()),
237            Self::Array(array) => Box::new(array.iter()),
238        }
239    }
240
241    pub fn len(&self) -> usize {
242        match self {
243            Self::Range(range) => (range.end - range.start) as usize,
244            Self::RangeWithHoles { range, holes } => {
245                let holes = holes.iter().count();
246                (range.end - range.start) as usize - holes
247            }
248            Self::RangeWithBitmap { range, bitmap } => {
249                let holes = bitmap.count_zeros();
250                (range.end - range.start) as usize - holes
251            }
252            Self::SortedArray(array) => array.len(),
253            Self::Array(array) => array.len(),
254        }
255    }
256
257    pub fn is_empty(&self) -> bool {
258        self.len() == 0
259    }
260
261    /// Get the min and max value of the segment, excluding tombstones.
262    pub fn range(&self) -> Option<RangeInclusive<u64>> {
263        match self {
264            Self::Range(range) if range.is_empty() => None,
265            Self::Range(range)
266            | Self::RangeWithBitmap { range, .. }
267            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
268            Self::SortedArray(array) => {
269                // We can assume that the array is sorted.
270                let min_value = array.first().unwrap();
271                let max_value = array.last().unwrap();
272                Some(min_value..=max_value)
273            }
274            Self::Array(array) => {
275                let min_value = array.min().unwrap();
276                let max_value = array.max().unwrap();
277                Some(min_value..=max_value)
278            }
279        }
280    }
281
282    pub fn slice(&self, offset: usize, len: usize) -> Self {
283        if len == 0 {
284            return Self::Range(0..0);
285        }
286
287        let values: Vec<u64> = self.iter().skip(offset).take(len).collect();
288
289        // `from_slice` will compute stats and select the best representation.
290        Self::from_slice(&values)
291    }
292
293    pub fn position(&self, val: u64) -> Option<usize> {
294        match self {
295            Self::Range(range) => {
296                if range.contains(&val) {
297                    Some((val - range.start) as usize)
298                } else {
299                    None
300                }
301            }
302            Self::RangeWithHoles { range, holes } => {
303                if range.contains(&val) && holes.binary_search(val).is_err() {
304                    let offset = (val - range.start) as usize;
305                    let holes = holes.iter().take_while(|&hole| hole < val).count();
306                    Some(offset - holes)
307                } else {
308                    None
309                }
310            }
311            Self::RangeWithBitmap { range, bitmap } => {
312                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
313                    let offset = (val - range.start) as usize;
314                    let num_zeros = bitmap.slice(0, offset).count_zeros();
315                    Some(offset - num_zeros)
316                } else {
317                    None
318                }
319            }
320            Self::SortedArray(array) => array.binary_search(val).ok(),
321            Self::Array(array) => array.iter().position(|v| v == val),
322        }
323    }
324
325    pub fn get(&self, i: usize) -> Option<u64> {
326        match self {
327            Self::Range(range) => match range.start.checked_add(i as u64) {
328                Some(val) if val < range.end => Some(val),
329                _ => None,
330            },
331            Self::RangeWithHoles { range, .. } => {
332                if i >= (range.end - range.start) as usize {
333                    return None;
334                }
335                self.iter().nth(i)
336            }
337            Self::RangeWithBitmap { range, .. } => {
338                if i >= (range.end - range.start) as usize {
339                    return None;
340                }
341                self.iter().nth(i)
342            }
343            Self::SortedArray(array) => array.get(i),
344            Self::Array(array) => array.get(i),
345        }
346    }
347
348    /// Check if a value is contained in the segment
349    pub fn contains(&self, val: u64) -> bool {
350        match self {
351            Self::Range(range) => range.contains(&val),
352            Self::RangeWithHoles { range, holes } => {
353                if !range.contains(&val) {
354                    return false;
355                }
356                // Check if the value is not in the holes
357                !holes.iter().any(|hole| hole == val)
358            }
359            Self::RangeWithBitmap { range, bitmap } => {
360                if !range.contains(&val) {
361                    return false;
362                }
363                // Check if the bitmap has the value set (not cleared)
364                let idx = (val - range.start) as usize;
365                bitmap.get(idx)
366            }
367            Self::SortedArray(array) => array.binary_search(val).is_ok(),
368            Self::Array(array) => array.iter().any(|v| v == val),
369        }
370    }
371
372    /// Produce a new segment that has `val` as the new highest value in the segment
373    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
374        // Check that the new value is higher than the current maximum
375        if let Some(range) = self.range()
376            && val <= *range.end()
377        {
378            return Err(lance_core::Error::invalid_input(format!(
379                "New value {} must be higher than current maximum {}",
380                val,
381                range.end()
382            )));
383        }
384
385        Ok(match self {
386            Self::Range(range) => {
387                // Special case for empty range: create a range containing only the new value
388                if range.start == range.end {
389                    Self::Range(Range {
390                        start: val,
391                        end: val + 1,
392                    })
393                } else if val == range.end {
394                    Self::Range(Range {
395                        start: range.start,
396                        end: val + 1,
397                    })
398                } else {
399                    Self::RangeWithHoles {
400                        range: Range {
401                            start: range.start,
402                            end: val + 1,
403                        },
404                        holes: EncodedU64Array::U64((range.end..val).collect()),
405                    }
406                }
407            }
408            Self::RangeWithHoles { range, holes } => {
409                if val == range.end {
410                    Self::RangeWithHoles {
411                        range: Range {
412                            start: range.start,
413                            end: val + 1,
414                        },
415                        holes,
416                    }
417                } else {
418                    let mut new_holes: Vec<u64> = holes.iter().collect();
419                    new_holes.extend(range.end..val);
420                    Self::RangeWithHoles {
421                        range: Range {
422                            start: range.start,
423                            end: val + 1,
424                        },
425                        holes: EncodedU64Array::U64(new_holes),
426                    }
427                }
428            }
429            Self::RangeWithBitmap { range, bitmap } => {
430                let new_range = Range {
431                    start: range.start,
432                    end: val + 1,
433                };
434                let gap_size = (val - range.end) as usize;
435                let new_bitmap = bitmap
436                    .iter()
437                    .chain(std::iter::repeat_n(false, gap_size))
438                    .chain(std::iter::once(true))
439                    .collect::<Vec<bool>>();
440
441                Self::RangeWithBitmap {
442                    range: new_range,
443                    bitmap: Bitmap::from(new_bitmap.as_slice()),
444                }
445            }
446            Self::SortedArray(array) => match array {
447                EncodedU64Array::U64(mut vec) => {
448                    vec.push(val);
449                    Self::SortedArray(EncodedU64Array::U64(vec))
450                }
451                EncodedU64Array::U16 { base, offsets } => {
452                    if let Some(offset) = val.checked_sub(base) {
453                        if offset <= u16::MAX as u64 {
454                            let mut offsets = offsets;
455                            offsets.push(offset as u16);
456                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
457                        } else if offset <= u32::MAX as u64 {
458                            let mut u32_offsets: Vec<u32> =
459                                offsets.into_iter().map(|o| o as u32).collect();
460                            u32_offsets.push(offset as u32);
461                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
462                                base,
463                                offsets: u32_offsets,
464                            }));
465                        }
466                    }
467                    let mut new_array: Vec<u64> =
468                        offsets.into_iter().map(|o| base + o as u64).collect();
469                    new_array.push(val);
470                    Self::SortedArray(EncodedU64Array::from(new_array))
471                }
472                EncodedU64Array::U32 { base, mut offsets } => {
473                    if let Some(offset) = val.checked_sub(base)
474                        && offset <= u32::MAX as u64
475                    {
476                        offsets.push(offset as u32);
477                        return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
478                    }
479                    let mut new_array: Vec<u64> =
480                        offsets.into_iter().map(|o| base + o as u64).collect();
481                    new_array.push(val);
482                    Self::SortedArray(EncodedU64Array::from(new_array))
483                }
484            },
485            Self::Array(array) => match array {
486                EncodedU64Array::U64(mut vec) => {
487                    vec.push(val);
488                    Self::Array(EncodedU64Array::U64(vec))
489                }
490                EncodedU64Array::U16 { base, offsets } => {
491                    if let Some(offset) = val.checked_sub(base) {
492                        if offset <= u16::MAX as u64 {
493                            let mut offsets = offsets;
494                            offsets.push(offset as u16);
495                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
496                        } else if offset <= u32::MAX as u64 {
497                            let mut u32_offsets: Vec<u32> =
498                                offsets.into_iter().map(|o| o as u32).collect();
499                            u32_offsets.push(offset as u32);
500                            return Ok(Self::Array(EncodedU64Array::U32 {
501                                base,
502                                offsets: u32_offsets,
503                            }));
504                        }
505                    }
506                    let mut new_array: Vec<u64> =
507                        offsets.into_iter().map(|o| base + o as u64).collect();
508                    new_array.push(val);
509                    Self::Array(EncodedU64Array::from(new_array))
510                }
511                EncodedU64Array::U32 { base, mut offsets } => {
512                    if let Some(offset) = val.checked_sub(base)
513                        && offset <= u32::MAX as u64
514                    {
515                        offsets.push(offset as u32);
516                        return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
517                    }
518                    let mut new_array: Vec<u64> =
519                        offsets.into_iter().map(|o| base + o as u64).collect();
520                    new_array.push(val);
521                    Self::Array(EncodedU64Array::from(new_array))
522                }
523            },
524        })
525    }
526
527    /// Delete a set of row ids from the segment.
528    /// The row ids are assumed to be in the segment. (within the range, not
529    /// already deleted.)
530    /// They are also assumed to be ordered by appearance in the segment.
531    pub fn delete(&self, vals: &[u64]) -> Self {
532        // TODO: can we enforce these assumptions? or make them safer?
533        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
534
535        let make_new_iter = || {
536            let mut vals_iter = vals.iter().copied().peekable();
537            self.iter().filter(move |val| {
538                if let Some(&next_val) = vals_iter.peek()
539                    && next_val == *val
540                {
541                    vals_iter.next();
542                    return false;
543                }
544                true
545            })
546        };
547        let stats = Self::compute_stats(make_new_iter());
548        Self::from_stats_and_sequence(stats, make_new_iter())
549    }
550
551    pub fn mask(&mut self, positions: &[u32]) {
552        if positions.is_empty() {
553            return;
554        }
555        if positions.len() == self.len() {
556            *self = Self::Range(0..0);
557            return;
558        }
559        let count = (self.len() - positions.len()) as u64;
560        let sorted = match self {
561            Self::Range(_) => true,
562            Self::RangeWithHoles { .. } => true,
563            Self::RangeWithBitmap { .. } => true,
564            Self::SortedArray(_) => true,
565            Self::Array(_) => false,
566        };
567        // To get minimum, need to find the first value that is not masked.
568        let first_unmasked = (0..self.len())
569            .zip(positions.iter().cycle())
570            .find(|(sequential_i, i)| **i != *sequential_i as u32)
571            .map(|(sequential_i, _)| sequential_i)
572            .unwrap();
573        let min = self.get(first_unmasked).unwrap();
574
575        let last_unmasked = (0..self.len())
576            .rev()
577            .zip(positions.iter().rev().cycle())
578            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
579            .map(|(sequential_i, _)| sequential_i)
580            .next()
581            .unwrap();
582        let max = self.get(last_unmasked).unwrap();
583
584        let stats = SegmentStats {
585            min,
586            max,
587            count,
588            sorted,
589        };
590
591        let mut positions = positions.iter().copied().peekable();
592        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
593            if let Some(next_pos) = positions.peek()
594                && *next_pos == i as u32
595            {
596                positions.next();
597                return None;
598            }
599            Some(val)
600        });
601        *self = Self::from_stats_and_sequence(stats, sequence)
602    }
603}
604
605#[cfg(test)]
606mod test {
607    use super::*;
608
609    #[test]
610    fn test_segments() {
611        fn check_segment(values: &[u64], expected: &U64Segment) {
612            let segment = U64Segment::from_slice(values);
613            assert_eq!(segment, *expected);
614            assert_eq!(values.len(), segment.len());
615
616            let roundtripped = segment.iter().collect::<Vec<_>>();
617            assert_eq!(roundtripped, values);
618
619            let expected_min = values.iter().copied().min();
620            let expected_max = values.iter().copied().max();
621            match segment.range() {
622                Some(range) => {
623                    assert_eq!(range.start(), &expected_min.unwrap());
624                    assert_eq!(range.end(), &expected_max.unwrap());
625                }
626                None => {
627                    assert_eq!(expected_min, None);
628                    assert_eq!(expected_max, None);
629                }
630            }
631
632            for (i, value) in values.iter().enumerate() {
633                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
634                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
635            }
636
637            check_segment_iter(&segment);
638        }
639
640        fn check_segment_iter(segment: &U64Segment) {
641            // Should be able to iterate forwards and backwards, and get the same thing.
642            let forwards = segment.iter().collect::<Vec<_>>();
643            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
644            backwards.reverse();
645            assert_eq!(forwards, backwards);
646
647            // Should be able to pull from both sides in lockstep.
648            let mut expected = Vec::with_capacity(segment.len());
649            let mut actual = Vec::with_capacity(segment.len());
650            let mut iter = segment.iter();
651            // Alternating forwards and backwards
652            for i in 0..segment.len() {
653                if i % 2 == 0 {
654                    actual.push(iter.next().unwrap());
655                    expected.push(segment.get(i / 2).unwrap());
656                } else {
657                    let i = segment.len() - 1 - i / 2;
658                    actual.push(iter.next_back().unwrap());
659                    expected.push(segment.get(i).unwrap());
660                };
661            }
662            assert_eq!(expected, actual);
663        }
664
665        // Empty
666        check_segment(&[], &U64Segment::Range(0..0));
667
668        // Single value
669        check_segment(&[42], &U64Segment::Range(42..43));
670
671        // Contiguous range
672        check_segment(
673            &(100..200).collect::<Vec<_>>(),
674            &U64Segment::Range(100..200),
675        );
676
677        // Range with a hole
678        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
679        check_segment(
680            &values,
681            &U64Segment::RangeWithHoles {
682                range: 0..1000,
683                holes: vec![100].into(),
684            },
685        );
686
687        // Range with every other value missing
688        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
689        check_segment(
690            &values,
691            &U64Segment::RangeWithBitmap {
692                range: 0..999,
693                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
694            },
695        );
696
697        // Sparse but sorted sequence
698        check_segment(
699            &[1, 7000, 24000],
700            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
701        );
702
703        // Sparse unsorted sequence
704        check_segment(
705            &[7000, 1, 24000],
706            &U64Segment::Array(vec![7000, 1, 24000].into()),
707        );
708    }
709
710    #[test]
711    fn test_with_new_high() {
712        // Test Range: contiguous sequence
713        let segment = U64Segment::Range(10..20);
714
715        // Test adding value that extends the range
716        let result = segment.clone().with_new_high(20).unwrap();
717        assert_eq!(result, U64Segment::Range(10..21));
718
719        // Test adding value that creates holes
720        let result = segment.with_new_high(25).unwrap();
721        assert_eq!(
722            result,
723            U64Segment::RangeWithHoles {
724                range: 10..26,
725                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
726            }
727        );
728
729        // Test RangeWithHoles: sequence with existing holes
730        let segment = U64Segment::RangeWithHoles {
731            range: 10..20,
732            holes: EncodedU64Array::U64(vec![15, 17]),
733        };
734
735        // Test adding value that extends the range without new holes
736        let result = segment.clone().with_new_high(20).unwrap();
737        assert_eq!(
738            result,
739            U64Segment::RangeWithHoles {
740                range: 10..21,
741                holes: EncodedU64Array::U64(vec![15, 17]),
742            }
743        );
744
745        // Test adding value that creates additional holes
746        let result = segment.with_new_high(25).unwrap();
747        assert_eq!(
748            result,
749            U64Segment::RangeWithHoles {
750                range: 10..26,
751                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
752            }
753        );
754
755        // Test RangeWithBitmap: sequence with bitmap representation
756        let mut bitmap = Bitmap::new_full(10);
757        bitmap.clear(3); // Clear position 3 (value 13)
758        bitmap.clear(7); // Clear position 7 (value 17)
759        let segment = U64Segment::RangeWithBitmap {
760            range: 10..20,
761            bitmap,
762        };
763
764        // Test adding value that extends the range without new holes
765        let result = segment.clone().with_new_high(20).unwrap();
766        let expected_bitmap = {
767            let mut b = Bitmap::new_full(11);
768            b.clear(3); // Clear position 3 (value 13)
769            b.clear(7); // Clear position 7 (value 17)
770            b
771        };
772        assert_eq!(
773            result,
774            U64Segment::RangeWithBitmap {
775                range: 10..21,
776                bitmap: expected_bitmap,
777            }
778        );
779
780        // Test adding value that creates additional holes
781        let result = segment.with_new_high(25).unwrap();
782        let expected_bitmap = {
783            let mut b = Bitmap::new_full(16);
784            b.clear(3); // Clear position 3 (value 13)
785            b.clear(7); // Clear position 7 (value 17)
786            // Clear positions 10-14 (values 20-24)
787            for i in 10..15 {
788                b.clear(i);
789            }
790            b
791        };
792        assert_eq!(
793            result,
794            U64Segment::RangeWithBitmap {
795                range: 10..26,
796                bitmap: expected_bitmap,
797            }
798        );
799
800        // Test SortedArray: sparse sorted sequence
801        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
802
803        let result = segment.with_new_high(15).unwrap();
804        assert_eq!(
805            result,
806            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
807        );
808
809        // Test Array: unsorted sequence
810        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
811
812        let result = segment.with_new_high(15).unwrap();
813        assert_eq!(
814            result,
815            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
816        );
817
818        // Test edge cases
819        // Empty segment
820        let segment = U64Segment::Range(0..0);
821        let result = segment.with_new_high(5).unwrap();
822        assert_eq!(result, U64Segment::Range(5..6));
823
824        // Single value segment
825        let segment = U64Segment::Range(42..43);
826        let result = segment.with_new_high(50).unwrap();
827        assert_eq!(
828            result,
829            U64Segment::RangeWithHoles {
830                range: 42..51,
831                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
832            }
833        );
834    }
835
836    #[test]
837    fn test_with_new_high_assertion() {
838        let segment = U64Segment::Range(10..20);
839        // This should return an error because 15 is not higher than the current maximum 19
840        let result = segment.with_new_high(15);
841        assert!(result.is_err());
842        let error = result.unwrap_err();
843        assert!(
844            error
845                .to_string()
846                .contains("New value 15 must be higher than current maximum 19")
847        );
848    }
849
850    #[test]
851    fn test_with_new_high_assertion_equal() {
852        let segment = U64Segment::Range(1..6);
853        // This should return an error because 5 is not higher than the current maximum 5
854        let result = segment.with_new_high(5);
855        assert!(result.is_err());
856        let error = result.unwrap_err();
857        assert!(
858            error
859                .to_string()
860                .contains("New value 5 must be higher than current maximum 5")
861        );
862    }
863
864    #[test]
865    fn test_contains() {
866        // Test Range: contiguous sequence
867        let segment = U64Segment::Range(10..20);
868        assert!(segment.contains(10), "Should contain 10");
869        assert!(segment.contains(15), "Should contain 15");
870        assert!(segment.contains(19), "Should contain 19");
871        assert!(!segment.contains(9), "Should not contain 9");
872        assert!(!segment.contains(20), "Should not contain 20");
873        assert!(!segment.contains(25), "Should not contain 25");
874
875        // Test RangeWithHoles: sequence with holes
876        let segment = U64Segment::RangeWithHoles {
877            range: 10..20,
878            holes: EncodedU64Array::U64(vec![15, 17]),
879        };
880        assert!(segment.contains(10), "Should contain 10");
881        assert!(segment.contains(14), "Should contain 14");
882        assert!(!segment.contains(15), "Should not contain 15 (hole)");
883        assert!(segment.contains(16), "Should contain 16");
884        assert!(!segment.contains(17), "Should not contain 17 (hole)");
885        assert!(segment.contains(18), "Should contain 18");
886        assert!(
887            !segment.contains(20),
888            "Should not contain 20 (out of range)"
889        );
890
891        // Test RangeWithBitmap: sequence with bitmap
892        let mut bitmap = Bitmap::new_full(10);
893        bitmap.clear(3); // Clear position 3 (value 13)
894        bitmap.clear(7); // Clear position 7 (value 17)
895        let segment = U64Segment::RangeWithBitmap {
896            range: 10..20,
897            bitmap,
898        };
899        assert!(segment.contains(10), "Should contain 10");
900        assert!(segment.contains(12), "Should contain 12");
901        assert!(
902            !segment.contains(13),
903            "Should not contain 13 (cleared in bitmap)"
904        );
905        assert!(segment.contains(16), "Should contain 16");
906        assert!(
907            !segment.contains(17),
908            "Should not contain 17 (cleared in bitmap)"
909        );
910        assert!(segment.contains(19), "Should contain 19");
911        assert!(
912            !segment.contains(20),
913            "Should not contain 20 (out of range)"
914        );
915
916        // Test SortedArray: sparse sorted sequence
917        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
918        assert!(segment.contains(1), "Should contain 1");
919        assert!(segment.contains(5), "Should contain 5");
920        assert!(segment.contains(10), "Should contain 10");
921        assert!(!segment.contains(0), "Should not contain 0");
922        assert!(!segment.contains(3), "Should not contain 3");
923        assert!(!segment.contains(15), "Should not contain 15");
924
925        // Test Array: unsorted sequence
926        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
927        assert!(segment.contains(1), "Should contain 1");
928        assert!(segment.contains(5), "Should contain 5");
929        assert!(segment.contains(10), "Should contain 10");
930        assert!(!segment.contains(0), "Should not contain 0");
931        assert!(!segment.contains(3), "Should not contain 3");
932        assert!(!segment.contains(15), "Should not contain 15");
933
934        // Test empty segment
935        let segment = U64Segment::Range(0..0);
936        assert!(
937            !segment.contains(0),
938            "Empty segment should not contain anything"
939        );
940        assert!(
941            !segment.contains(5),
942            "Empty segment should not contain anything"
943        );
944    }
945}