Skip to main content

lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8
9/// Convert an estimated serialized byte cost from `u128` to `usize`, saturating
10/// at [`usize::MAX`] when the value does not fit (infeasible encodings).
11#[inline]
12fn u128_byte_cost_to_usize(v: u128) -> usize {
13    usize::try_from(v).unwrap_or(usize::MAX)
14}
15
16/// Different ways to represent a sequence of distinct u64s.
17///
18/// This is designed to be especially efficient for sequences that are sorted,
19/// but not meaningfully larger than a `Vec<u64>` in the worst case.
20///
21/// The representation is chosen based on the properties of the sequence:
22///                                                           
23///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
24///    │                ▼                                 
25///    │                No                                
26///    │                ▼                                 
27///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
28///    │                ▼                                 
29///    │                No─────────────► SortedArray      
30///    ▼                                                    
31///    No──────────────────────────────► Array            
32///
33/// "Dense" is decided based on the estimated byte size of the representation.
34///
35/// Size of RangeWithBitMap for N values:
36///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
37/// Size of SortedArray for N values (assuming u16 packed):
38///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
39///
40#[derive(Debug, PartialEq, Eq, Clone)]
41pub enum U64Segment {
42    /// A contiguous sorted range of row ids.
43    ///
44    /// Total size: 16 bytes
45    Range(Range<u64>),
46    /// A sorted range of row ids, that is mostly contiguous.
47    ///
48    /// Total size: 24 bytes + n_holes * 4 bytes
49    /// Use when: 32 * n_holes < max - min
50    RangeWithHoles {
51        range: Range<u64>,
52        /// Bitmap of offsets from the start of the range that are holes.
53        /// This is sorted, so binary search can be used. It's typically
54        /// relatively small.
55        holes: EncodedU64Array,
56    },
57    /// A sorted range of row ids, that is mostly contiguous.
58    ///
59    /// Bitmap is 1 when the value is present, 0 when it's missing.
60    ///
61    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
62    /// Use when: max - min > 16 * len
63    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
64    /// A sorted array of row ids, that is sparse.
65    ///
66    /// Total size: 24 bytes + 2 * n_values bytes
67    SortedArray(EncodedU64Array),
68    /// An array of row ids, that is not sorted.
69    Array(EncodedU64Array),
70}
71
72impl DeepSizeOf for U64Segment {
73    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
74        match self {
75            Self::Range(_) => 0,
76            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
77            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
78            Self::SortedArray(array) => array.deep_size_of_children(context),
79            Self::Array(array) => array.deep_size_of_children(context),
80        }
81    }
82}
83
84/// Statistics about a segment of u64s.
85#[derive(Debug)]
86struct SegmentStats {
87    /// Min value in the segment.
88    min: u64,
89    /// Max value in the segment
90    max: u64,
91    /// Total number of values in the segment
92    count: u64,
93    /// Whether the segment is sorted
94    sorted: bool,
95}
96
97impl SegmentStats {
98    /// Number of missing values ("holes") in the range `[min, max]`.
99    ///
100    /// Returns `u128` because the total slot count `max - min + 1` can be up
101    /// to `2^64` (when `min = 0, max = u64::MAX`), which exceeds `u64::MAX`.
102    fn n_holes(&self) -> u128 {
103        debug_assert!(self.sorted);
104        if self.count == 0 {
105            0
106        } else {
107            let total_slots = self.max as u128 - self.min as u128 + 1;
108            total_slots - self.count as u128
109        }
110    }
111}
112
113impl U64Segment {
114    /// Return the values that are missing from the slice.
115    fn holes_in_slice<'a>(
116        range: RangeInclusive<u64>,
117        existing: impl IntoIterator<Item = u64> + 'a,
118    ) -> impl Iterator<Item = u64> + 'a {
119        let mut existing = existing.into_iter().peekable();
120        range.filter(move |val| {
121            if let Some(&existing_val) = existing.peek()
122                && existing_val == *val
123            {
124                existing.next();
125                return false;
126            }
127            true
128        })
129    }
130
131    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
132        let mut sorted = true;
133        let mut min = u64::MAX;
134        let mut max = 0;
135        let mut count = 0;
136
137        for val in values {
138            count += 1;
139            if val < min {
140                min = val;
141            }
142            if val > max {
143                max = val;
144            }
145            if sorted && count > 1 && val < max {
146                sorted = false;
147            }
148        }
149
150        if count == 0 {
151            min = 0;
152            max = 0;
153        }
154
155        SegmentStats {
156            min,
157            max,
158            count,
159            sorted,
160        }
161    }
162
163    /// Estimate the serialized byte size of each sorted encoding variant.
164    ///
165    /// All arithmetic is performed in `u128` to avoid overflow when the range
166    /// span `max - min + 1` approaches or exceeds `2^64`. Infeasible sizes
167    /// saturate to `usize::MAX` so they always lose the `min()` comparison.
168    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
169        let n_holes = stats.n_holes();
170        let total_slots = stats.max as u128 - stats.min as u128 + 1;
171
172        let range_with_holes = 24u128.saturating_add(4u128.saturating_mul(n_holes));
173        let range_with_bitmap = 24u128.saturating_add(total_slots.div_ceil(8));
174        let sorted_array = 24u128.saturating_add(2u128.saturating_mul(stats.count as u128));
175
176        [
177            u128_byte_cost_to_usize(range_with_holes),
178            u128_byte_cost_to_usize(range_with_bitmap),
179            u128_byte_cost_to_usize(sorted_array),
180        ]
181    }
182
183    fn from_stats_and_sequence(
184        stats: SegmentStats,
185        sequence: impl IntoIterator<Item = u64>,
186    ) -> Self {
187        if stats.sorted {
188            let n_holes = stats.n_holes();
189            // Range-backed encodings store an exclusive end as `Range<u64>`,
190            // which cannot represent `u64::MAX + 1`. Compute the end once and
191            // gate all range-backed branches on its representability.
192            let exclusive_end = stats.max.checked_add(1);
193            if stats.count == 0 {
194                Self::Range(0..0)
195            } else if n_holes == 0 && exclusive_end.is_some() {
196                Self::Range(stats.min..exclusive_end.unwrap())
197            } else if let Some(end) = exclusive_end {
198                let sizes = Self::sorted_sequence_sizes(&stats);
199                let min_size = sizes.iter().min().unwrap();
200                if min_size == &sizes[0] {
201                    let range = stats.min..end;
202                    let mut holes =
203                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
204                    holes.sort_unstable();
205                    let holes = EncodedU64Array::from(holes);
206                    Self::RangeWithHoles { range, holes }
207                } else if min_size == &sizes[1] {
208                    let range = stats.min..end;
209                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
210                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
211                        let offset = (hole - stats.min) as usize;
212                        bitmap.clear(offset);
213                    }
214                    Self::RangeWithBitmap { range, bitmap }
215                } else {
216                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
217                }
218            } else {
219                // max == u64::MAX: exclusive end is unrepresentable in Range<u64>,
220                // so no range-backed encoding can be used.
221                Self::SortedArray(EncodedU64Array::from_iter(sequence))
222            }
223        } else {
224            Self::Array(EncodedU64Array::from_iter(sequence))
225        }
226    }
227
228    pub fn from_slice(slice: &[u64]) -> Self {
229        Self::from_iter(slice.iter().copied())
230    }
231}
232
233impl FromIterator<u64> for U64Segment {
234    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
235        let values: Vec<u64> = iter.into_iter().collect();
236        let stats = Self::compute_stats(values.iter().copied());
237        Self::from_stats_and_sequence(stats, values)
238    }
239}
240
241impl U64Segment {
242    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
243        match self {
244            Self::Range(range) => Box::new(range.clone()),
245            Self::RangeWithHoles { range, holes } => {
246                Box::new((range.start..range.end).filter(move |&val| {
247                    // TODO: we could write a more optimal version of this
248                    // iterator, but would need special handling to make it
249                    // double ended.
250                    holes.binary_search(val).is_err()
251                }))
252            }
253            Self::RangeWithBitmap { range, bitmap } => {
254                Box::new((range.start..range.end).filter(|val| {
255                    let offset = (val - range.start) as usize;
256                    bitmap.get(offset)
257                }))
258            }
259            Self::SortedArray(array) => Box::new(array.iter()),
260            Self::Array(array) => Box::new(array.iter()),
261        }
262    }
263
264    pub fn len(&self) -> usize {
265        match self {
266            Self::Range(range) => (range.end - range.start) as usize,
267            Self::RangeWithHoles { range, holes } => {
268                let holes = holes.iter().count();
269                (range.end - range.start) as usize - holes
270            }
271            Self::RangeWithBitmap { range, bitmap } => {
272                let holes = bitmap.count_zeros();
273                (range.end - range.start) as usize - holes
274            }
275            Self::SortedArray(array) => array.len(),
276            Self::Array(array) => array.len(),
277        }
278    }
279
280    pub fn is_empty(&self) -> bool {
281        self.len() == 0
282    }
283
284    /// Get the min and max value of the segment, excluding tombstones.
285    pub fn range(&self) -> Option<RangeInclusive<u64>> {
286        match self {
287            Self::Range(range) if range.is_empty() => None,
288            Self::Range(range)
289            | Self::RangeWithBitmap { range, .. }
290            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
291            Self::SortedArray(array) => {
292                // We can assume that the array is sorted.
293                let min_value = array.first().unwrap();
294                let max_value = array.last().unwrap();
295                Some(min_value..=max_value)
296            }
297            Self::Array(array) => {
298                let min_value = array.min().unwrap();
299                let max_value = array.max().unwrap();
300                Some(min_value..=max_value)
301            }
302        }
303    }
304
305    pub fn slice(&self, offset: usize, len: usize) -> Self {
306        if len == 0 {
307            return Self::Range(0..0);
308        }
309
310        let values: Vec<u64> = self.iter().skip(offset).take(len).collect();
311
312        // `from_slice` will compute stats and select the best representation.
313        Self::from_slice(&values)
314    }
315
316    pub fn position(&self, val: u64) -> Option<usize> {
317        match self {
318            Self::Range(range) => {
319                if range.contains(&val) {
320                    Some((val - range.start) as usize)
321                } else {
322                    None
323                }
324            }
325            Self::RangeWithHoles { range, holes } => {
326                if range.contains(&val) && holes.binary_search(val).is_err() {
327                    let offset = (val - range.start) as usize;
328                    let holes = holes.iter().take_while(|&hole| hole < val).count();
329                    Some(offset - holes)
330                } else {
331                    None
332                }
333            }
334            Self::RangeWithBitmap { range, bitmap } => {
335                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
336                    let offset = (val - range.start) as usize;
337                    let num_zeros = bitmap.slice(0, offset).count_zeros();
338                    Some(offset - num_zeros)
339                } else {
340                    None
341                }
342            }
343            Self::SortedArray(array) => array.binary_search(val).ok(),
344            Self::Array(array) => array.iter().position(|v| v == val),
345        }
346    }
347
348    pub fn get(&self, i: usize) -> Option<u64> {
349        match self {
350            Self::Range(range) => match range.start.checked_add(i as u64) {
351                Some(val) if val < range.end => Some(val),
352                _ => None,
353            },
354            Self::RangeWithHoles { range, .. } => {
355                if i >= (range.end - range.start) as usize {
356                    return None;
357                }
358                self.iter().nth(i)
359            }
360            Self::RangeWithBitmap { range, .. } => {
361                if i >= (range.end - range.start) as usize {
362                    return None;
363                }
364                self.iter().nth(i)
365            }
366            Self::SortedArray(array) => array.get(i),
367            Self::Array(array) => array.get(i),
368        }
369    }
370
371    /// Check if a value is contained in the segment
372    pub fn contains(&self, val: u64) -> bool {
373        match self {
374            Self::Range(range) => range.contains(&val),
375            Self::RangeWithHoles { range, holes } => {
376                if !range.contains(&val) {
377                    return false;
378                }
379                // Check if the value is not in the holes
380                !holes.iter().any(|hole| hole == val)
381            }
382            Self::RangeWithBitmap { range, bitmap } => {
383                if !range.contains(&val) {
384                    return false;
385                }
386                // Check if the bitmap has the value set (not cleared)
387                let idx = (val - range.start) as usize;
388                bitmap.get(idx)
389            }
390            Self::SortedArray(array) => array.binary_search(val).is_ok(),
391            Self::Array(array) => array.iter().any(|v| v == val),
392        }
393    }
394
395    /// Produce a new segment that has `val` as the new highest value in the segment
396    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
397        // Check that the new value is higher than the current maximum
398        if let Some(range) = self.range()
399            && val <= *range.end()
400        {
401            return Err(lance_core::Error::invalid_input(format!(
402                "New value {} must be higher than current maximum {}",
403                val,
404                range.end()
405            )));
406        }
407
408        Ok(match self {
409            Self::Range(range) => {
410                // Special case for empty range: create a range containing only the new value
411                if range.start == range.end {
412                    Self::Range(Range {
413                        start: val,
414                        end: val + 1,
415                    })
416                } else if val == range.end {
417                    Self::Range(Range {
418                        start: range.start,
419                        end: val + 1,
420                    })
421                } else {
422                    Self::RangeWithHoles {
423                        range: Range {
424                            start: range.start,
425                            end: val + 1,
426                        },
427                        holes: EncodedU64Array::U64((range.end..val).collect()),
428                    }
429                }
430            }
431            Self::RangeWithHoles { range, holes } => {
432                if val == range.end {
433                    Self::RangeWithHoles {
434                        range: Range {
435                            start: range.start,
436                            end: val + 1,
437                        },
438                        holes,
439                    }
440                } else {
441                    let mut new_holes: Vec<u64> = holes.iter().collect();
442                    new_holes.extend(range.end..val);
443                    Self::RangeWithHoles {
444                        range: Range {
445                            start: range.start,
446                            end: val + 1,
447                        },
448                        holes: EncodedU64Array::U64(new_holes),
449                    }
450                }
451            }
452            Self::RangeWithBitmap { range, bitmap } => {
453                let new_range = Range {
454                    start: range.start,
455                    end: val + 1,
456                };
457                let gap_size = (val - range.end) as usize;
458                let new_bitmap = bitmap
459                    .iter()
460                    .chain(std::iter::repeat_n(false, gap_size))
461                    .chain(std::iter::once(true))
462                    .collect::<Vec<bool>>();
463
464                Self::RangeWithBitmap {
465                    range: new_range,
466                    bitmap: Bitmap::from(new_bitmap.as_slice()),
467                }
468            }
469            Self::SortedArray(array) => match array {
470                EncodedU64Array::U64(mut vec) => {
471                    vec.push(val);
472                    Self::SortedArray(EncodedU64Array::U64(vec))
473                }
474                EncodedU64Array::U16 { base, offsets } => {
475                    if let Some(offset) = val.checked_sub(base) {
476                        if offset <= u16::MAX as u64 {
477                            let mut offsets = offsets;
478                            offsets.push(offset as u16);
479                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
480                        } else if offset <= u32::MAX as u64 {
481                            let mut u32_offsets: Vec<u32> =
482                                offsets.into_iter().map(|o| o as u32).collect();
483                            u32_offsets.push(offset as u32);
484                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
485                                base,
486                                offsets: u32_offsets,
487                            }));
488                        }
489                    }
490                    let mut new_array: Vec<u64> =
491                        offsets.into_iter().map(|o| base + o as u64).collect();
492                    new_array.push(val);
493                    Self::SortedArray(EncodedU64Array::from(new_array))
494                }
495                EncodedU64Array::U32 { base, mut offsets } => {
496                    if let Some(offset) = val.checked_sub(base)
497                        && offset <= u32::MAX as u64
498                    {
499                        offsets.push(offset as u32);
500                        return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
501                    }
502                    let mut new_array: Vec<u64> =
503                        offsets.into_iter().map(|o| base + o as u64).collect();
504                    new_array.push(val);
505                    Self::SortedArray(EncodedU64Array::from(new_array))
506                }
507            },
508            Self::Array(array) => match array {
509                EncodedU64Array::U64(mut vec) => {
510                    vec.push(val);
511                    Self::Array(EncodedU64Array::U64(vec))
512                }
513                EncodedU64Array::U16 { base, offsets } => {
514                    if let Some(offset) = val.checked_sub(base) {
515                        if offset <= u16::MAX as u64 {
516                            let mut offsets = offsets;
517                            offsets.push(offset as u16);
518                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
519                        } else if offset <= u32::MAX as u64 {
520                            let mut u32_offsets: Vec<u32> =
521                                offsets.into_iter().map(|o| o as u32).collect();
522                            u32_offsets.push(offset as u32);
523                            return Ok(Self::Array(EncodedU64Array::U32 {
524                                base,
525                                offsets: u32_offsets,
526                            }));
527                        }
528                    }
529                    let mut new_array: Vec<u64> =
530                        offsets.into_iter().map(|o| base + o as u64).collect();
531                    new_array.push(val);
532                    Self::Array(EncodedU64Array::from(new_array))
533                }
534                EncodedU64Array::U32 { base, mut offsets } => {
535                    if let Some(offset) = val.checked_sub(base)
536                        && offset <= u32::MAX as u64
537                    {
538                        offsets.push(offset as u32);
539                        return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
540                    }
541                    let mut new_array: Vec<u64> =
542                        offsets.into_iter().map(|o| base + o as u64).collect();
543                    new_array.push(val);
544                    Self::Array(EncodedU64Array::from(new_array))
545                }
546            },
547        })
548    }
549
550    /// Delete a set of row ids from the segment.
551    /// The row ids are assumed to be in the segment. (within the range, not
552    /// already deleted.)
553    /// They are also assumed to be ordered by appearance in the segment.
554    pub fn delete(&self, vals: &[u64]) -> Self {
555        // TODO: can we enforce these assumptions? or make them safer?
556        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
557
558        let make_new_iter = || {
559            let mut vals_iter = vals.iter().copied().peekable();
560            self.iter().filter(move |val| {
561                if let Some(&next_val) = vals_iter.peek()
562                    && next_val == *val
563                {
564                    vals_iter.next();
565                    return false;
566                }
567                true
568            })
569        };
570        let stats = Self::compute_stats(make_new_iter());
571        Self::from_stats_and_sequence(stats, make_new_iter())
572    }
573
574    pub fn mask(&mut self, positions: &[u32]) {
575        if positions.is_empty() {
576            return;
577        }
578        if positions.len() == self.len() {
579            *self = Self::Range(0..0);
580            return;
581        }
582        let count = (self.len() - positions.len()) as u64;
583        let sorted = match self {
584            Self::Range(_) => true,
585            Self::RangeWithHoles { .. } => true,
586            Self::RangeWithBitmap { .. } => true,
587            Self::SortedArray(_) => true,
588            Self::Array(_) => false,
589        };
590        // To get minimum, need to find the first value that is not masked.
591        let first_unmasked = (0..self.len())
592            .zip(positions.iter().cycle())
593            .find(|(sequential_i, i)| **i != *sequential_i as u32)
594            .map(|(sequential_i, _)| sequential_i)
595            .unwrap();
596        let min = self.get(first_unmasked).unwrap();
597
598        let last_unmasked = (0..self.len())
599            .rev()
600            .zip(positions.iter().rev().cycle())
601            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
602            .map(|(sequential_i, _)| sequential_i)
603            .next()
604            .unwrap();
605        let max = self.get(last_unmasked).unwrap();
606
607        let stats = SegmentStats {
608            min,
609            max,
610            count,
611            sorted,
612        };
613
614        let mut positions = positions.iter().copied().peekable();
615        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
616            if let Some(next_pos) = positions.peek()
617                && *next_pos == i as u32
618            {
619                positions.next();
620                return None;
621            }
622            Some(val)
623        });
624        *self = Self::from_stats_and_sequence(stats, sequence)
625    }
626}
627
628#[cfg(test)]
629mod test {
630    use super::*;
631
632    #[test]
633    fn test_segments() {
634        fn check_segment(values: &[u64], expected: &U64Segment) {
635            let segment = U64Segment::from_slice(values);
636            assert_eq!(segment, *expected);
637            assert_eq!(values.len(), segment.len());
638
639            let roundtripped = segment.iter().collect::<Vec<_>>();
640            assert_eq!(roundtripped, values);
641
642            let expected_min = values.iter().copied().min();
643            let expected_max = values.iter().copied().max();
644            match segment.range() {
645                Some(range) => {
646                    assert_eq!(range.start(), &expected_min.unwrap());
647                    assert_eq!(range.end(), &expected_max.unwrap());
648                }
649                None => {
650                    assert_eq!(expected_min, None);
651                    assert_eq!(expected_max, None);
652                }
653            }
654
655            for (i, value) in values.iter().enumerate() {
656                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
657                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
658            }
659
660            check_segment_iter(&segment);
661        }
662
663        fn check_segment_iter(segment: &U64Segment) {
664            // Should be able to iterate forwards and backwards, and get the same thing.
665            let forwards = segment.iter().collect::<Vec<_>>();
666            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
667            backwards.reverse();
668            assert_eq!(forwards, backwards);
669
670            // Should be able to pull from both sides in lockstep.
671            let mut expected = Vec::with_capacity(segment.len());
672            let mut actual = Vec::with_capacity(segment.len());
673            let mut iter = segment.iter();
674            // Alternating forwards and backwards
675            for i in 0..segment.len() {
676                if i % 2 == 0 {
677                    actual.push(iter.next().unwrap());
678                    expected.push(segment.get(i / 2).unwrap());
679                } else {
680                    let i = segment.len() - 1 - i / 2;
681                    actual.push(iter.next_back().unwrap());
682                    expected.push(segment.get(i).unwrap());
683                };
684            }
685            assert_eq!(expected, actual);
686        }
687
688        // Empty
689        check_segment(&[], &U64Segment::Range(0..0));
690
691        // Single value
692        check_segment(&[42], &U64Segment::Range(42..43));
693
694        // Contiguous range
695        check_segment(
696            &(100..200).collect::<Vec<_>>(),
697            &U64Segment::Range(100..200),
698        );
699
700        // Range with a hole
701        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
702        check_segment(
703            &values,
704            &U64Segment::RangeWithHoles {
705                range: 0..1000,
706                holes: vec![100].into(),
707            },
708        );
709
710        // Range with every other value missing
711        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
712        check_segment(
713            &values,
714            &U64Segment::RangeWithBitmap {
715                range: 0..999,
716                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
717            },
718        );
719
720        // Sparse but sorted sequence
721        check_segment(
722            &[1, 7000, 24000],
723            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
724        );
725
726        // Sparse unsorted sequence
727        check_segment(
728            &[7000, 1, 24000],
729            &U64Segment::Array(vec![7000, 1, 24000].into()),
730        );
731    }
732
733    #[test]
734    fn test_segment_overflow_boundary() {
735        // Sparse range spanning i64::MAX — the original overflow reproducer.
736        // n_holes ≈ 2^63, which overflows `4 * n_holes as usize` without u128 arithmetic.
737        let values: Vec<u64> = vec![0, 1, 2, 100, i64::MAX as u64];
738        let segment = U64Segment::from_slice(&values);
739        assert!(
740            matches!(segment, U64Segment::SortedArray(_)),
741            "sparse range spanning i64::MAX should be SortedArray, got {:?}",
742            std::mem::discriminant(&segment)
743        );
744        assert_eq!(segment.len(), 5);
745        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
746
747        // Two values at u64 extremes — triggers n_holes() total_slots overflow
748        // (u64::MAX - 0 + 1 wraps to 0 without u128).
749        let values: Vec<u64> = vec![0, u64::MAX];
750        let segment = U64Segment::from_slice(&values);
751        assert!(
752            matches!(segment, U64Segment::SortedArray(_)),
753            "full u64 span should be SortedArray, got {:?}",
754            std::mem::discriminant(&segment)
755        );
756        assert_eq!(segment.len(), 2);
757        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
758
759        // Small dense set near u64::MAX — cost estimation correctly prefers a
760        // range-backed encoding, but Range<u64> cannot represent u64::MAX + 1
761        // as the exclusive end. Must fall back to SortedArray.
762        let values: Vec<u64> = vec![u64::MAX - 3, u64::MAX - 1, u64::MAX];
763        let segment = U64Segment::from_slice(&values);
764        assert!(
765            matches!(segment, U64Segment::SortedArray(_)),
766            "dense set near u64::MAX should be SortedArray (exclusive end unrepresentable), got {:?}",
767            std::mem::discriminant(&segment)
768        );
769        assert_eq!(segment.len(), 3);
770        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
771
772        // Single value at u64::MAX — contiguous range with n_holes == 0, but
773        // exclusive end u64::MAX + 1 overflows.
774        let values: Vec<u64> = vec![u64::MAX];
775        let segment = U64Segment::from_slice(&values);
776        assert!(
777            matches!(segment, U64Segment::SortedArray(_)),
778            "single u64::MAX should be SortedArray, got {:?}",
779            std::mem::discriminant(&segment)
780        );
781        assert_eq!(segment.len(), 1);
782        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
783
784        // Contiguous range ending just below u64::MAX — exclusive end is
785        // representable, so Range encoding should still be used.
786        let values: Vec<u64> = vec![u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
787        let segment = U64Segment::from_slice(&values);
788        assert_eq!(segment, U64Segment::Range((u64::MAX - 3)..u64::MAX));
789        assert_eq!(segment.len(), 3);
790        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
791
792        // Regression: normal dense range with few holes still picks RangeWithHoles.
793        // Needs total_slots > 32 * n_holes for RangeWithHoles to beat RangeWithBitmap.
794        let values: Vec<u64> = (100..1100).filter(|&x| x != 500).collect();
795        let segment = U64Segment::from_slice(&values);
796        assert_eq!(
797            segment,
798            U64Segment::RangeWithHoles {
799                range: 100..1100,
800                holes: vec![500].into(),
801            }
802        );
803        assert_eq!(segment.len(), 999);
804        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
805
806        // Regression: small dense range with hole picks RangeWithBitmap.
807        let values: Vec<u64> = vec![100, 101, 102, 103, 105];
808        let segment = U64Segment::from_slice(&values);
809        assert!(
810            matches!(segment, U64Segment::RangeWithBitmap { .. }),
811            "small dense range with hole should be RangeWithBitmap, got {:?}",
812            std::mem::discriminant(&segment)
813        );
814        assert_eq!(segment.len(), 5);
815        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
816    }
817
818    #[test]
819    fn test_u128_byte_cost_to_usize() {
820        assert_eq!(super::u128_byte_cost_to_usize(0), 0);
821        assert_eq!(super::u128_byte_cost_to_usize(42), 42);
822        assert_eq!(
823            super::u128_byte_cost_to_usize(usize::MAX as u128),
824            usize::MAX
825        );
826        assert_eq!(super::u128_byte_cost_to_usize(u128::MAX), usize::MAX);
827    }
828
829    #[test]
830    fn test_sorted_sequence_sizes_sparse_span_saturates_range_with_holes_cost() {
831        let stats = super::SegmentStats {
832            min: 0,
833            max: i64::MAX as u64,
834            count: 5,
835            sorted: true,
836        };
837        let sizes = U64Segment::sorted_sequence_sizes(&stats);
838        assert_eq!(sizes[0], usize::MAX);
839        assert!(sizes[2] < sizes[0]);
840    }
841
842    #[test]
843    fn test_sorted_sequence_sizes_sorted_array_cost_saturates() {
844        // Nearly full [0, u64::MAX] with one hole: count = u64::MAX, n_holes = 1.
845        // SortedArray cost 24 + 2 * u64::MAX does not fit in usize on 64-bit.
846        let stats = super::SegmentStats {
847            min: 0,
848            max: u64::MAX,
849            count: u64::MAX,
850            sorted: true,
851        };
852        let sizes = U64Segment::sorted_sequence_sizes(&stats);
853        assert_eq!(sizes[2], usize::MAX);
854    }
855
856    #[test]
857    fn test_sorted_sequence_sizes_full_span_bitmap_cost() {
858        // Synthetic stats: full [0, u64::MAX] slot space; exercises `range_with_bitmap`
859        // cost path (always fits in `usize` on 64-bit targets).
860        let stats = super::SegmentStats {
861            min: 0,
862            max: u64::MAX,
863            count: 1,
864            sorted: true,
865        };
866        let sizes = U64Segment::sorted_sequence_sizes(&stats);
867        assert!(sizes[1] < sizes[0]);
868        assert!(sizes[1] < usize::MAX);
869    }
870
871    #[test]
872    fn test_with_new_high() {
873        // Test Range: contiguous sequence
874        let segment = U64Segment::Range(10..20);
875
876        // Test adding value that extends the range
877        let result = segment.clone().with_new_high(20).unwrap();
878        assert_eq!(result, U64Segment::Range(10..21));
879
880        // Test adding value that creates holes
881        let result = segment.with_new_high(25).unwrap();
882        assert_eq!(
883            result,
884            U64Segment::RangeWithHoles {
885                range: 10..26,
886                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
887            }
888        );
889
890        // Test RangeWithHoles: sequence with existing holes
891        let segment = U64Segment::RangeWithHoles {
892            range: 10..20,
893            holes: EncodedU64Array::U64(vec![15, 17]),
894        };
895
896        // Test adding value that extends the range without new holes
897        let result = segment.clone().with_new_high(20).unwrap();
898        assert_eq!(
899            result,
900            U64Segment::RangeWithHoles {
901                range: 10..21,
902                holes: EncodedU64Array::U64(vec![15, 17]),
903            }
904        );
905
906        // Test adding value that creates additional holes
907        let result = segment.with_new_high(25).unwrap();
908        assert_eq!(
909            result,
910            U64Segment::RangeWithHoles {
911                range: 10..26,
912                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
913            }
914        );
915
916        // Test RangeWithBitmap: sequence with bitmap representation
917        let mut bitmap = Bitmap::new_full(10);
918        bitmap.clear(3); // Clear position 3 (value 13)
919        bitmap.clear(7); // Clear position 7 (value 17)
920        let segment = U64Segment::RangeWithBitmap {
921            range: 10..20,
922            bitmap,
923        };
924
925        // Test adding value that extends the range without new holes
926        let result = segment.clone().with_new_high(20).unwrap();
927        let expected_bitmap = {
928            let mut b = Bitmap::new_full(11);
929            b.clear(3); // Clear position 3 (value 13)
930            b.clear(7); // Clear position 7 (value 17)
931            b
932        };
933        assert_eq!(
934            result,
935            U64Segment::RangeWithBitmap {
936                range: 10..21,
937                bitmap: expected_bitmap,
938            }
939        );
940
941        // Test adding value that creates additional holes
942        let result = segment.with_new_high(25).unwrap();
943        let expected_bitmap = {
944            let mut b = Bitmap::new_full(16);
945            b.clear(3); // Clear position 3 (value 13)
946            b.clear(7); // Clear position 7 (value 17)
947            // Clear positions 10-14 (values 20-24)
948            for i in 10..15 {
949                b.clear(i);
950            }
951            b
952        };
953        assert_eq!(
954            result,
955            U64Segment::RangeWithBitmap {
956                range: 10..26,
957                bitmap: expected_bitmap,
958            }
959        );
960
961        // Test SortedArray: sparse sorted sequence
962        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
963
964        let result = segment.with_new_high(15).unwrap();
965        assert_eq!(
966            result,
967            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
968        );
969
970        // Test Array: unsorted sequence
971        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
972
973        let result = segment.with_new_high(15).unwrap();
974        assert_eq!(
975            result,
976            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
977        );
978
979        // Test edge cases
980        // Empty segment
981        let segment = U64Segment::Range(0..0);
982        let result = segment.with_new_high(5).unwrap();
983        assert_eq!(result, U64Segment::Range(5..6));
984
985        // Single value segment
986        let segment = U64Segment::Range(42..43);
987        let result = segment.with_new_high(50).unwrap();
988        assert_eq!(
989            result,
990            U64Segment::RangeWithHoles {
991                range: 42..51,
992                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
993            }
994        );
995    }
996
997    #[test]
998    fn test_with_new_high_assertion() {
999        let segment = U64Segment::Range(10..20);
1000        // This should return an error because 15 is not higher than the current maximum 19
1001        let result = segment.with_new_high(15);
1002        assert!(result.is_err());
1003        let error = result.unwrap_err();
1004        assert!(
1005            error
1006                .to_string()
1007                .contains("New value 15 must be higher than current maximum 19")
1008        );
1009    }
1010
1011    #[test]
1012    fn test_with_new_high_assertion_equal() {
1013        let segment = U64Segment::Range(1..6);
1014        // This should return an error because 5 is not higher than the current maximum 5
1015        let result = segment.with_new_high(5);
1016        assert!(result.is_err());
1017        let error = result.unwrap_err();
1018        assert!(
1019            error
1020                .to_string()
1021                .contains("New value 5 must be higher than current maximum 5")
1022        );
1023    }
1024
1025    #[test]
1026    fn test_contains() {
1027        // Test Range: contiguous sequence
1028        let segment = U64Segment::Range(10..20);
1029        assert!(segment.contains(10), "Should contain 10");
1030        assert!(segment.contains(15), "Should contain 15");
1031        assert!(segment.contains(19), "Should contain 19");
1032        assert!(!segment.contains(9), "Should not contain 9");
1033        assert!(!segment.contains(20), "Should not contain 20");
1034        assert!(!segment.contains(25), "Should not contain 25");
1035
1036        // Test RangeWithHoles: sequence with holes
1037        let segment = U64Segment::RangeWithHoles {
1038            range: 10..20,
1039            holes: EncodedU64Array::U64(vec![15, 17]),
1040        };
1041        assert!(segment.contains(10), "Should contain 10");
1042        assert!(segment.contains(14), "Should contain 14");
1043        assert!(!segment.contains(15), "Should not contain 15 (hole)");
1044        assert!(segment.contains(16), "Should contain 16");
1045        assert!(!segment.contains(17), "Should not contain 17 (hole)");
1046        assert!(segment.contains(18), "Should contain 18");
1047        assert!(
1048            !segment.contains(20),
1049            "Should not contain 20 (out of range)"
1050        );
1051
1052        // Test RangeWithBitmap: sequence with bitmap
1053        let mut bitmap = Bitmap::new_full(10);
1054        bitmap.clear(3); // Clear position 3 (value 13)
1055        bitmap.clear(7); // Clear position 7 (value 17)
1056        let segment = U64Segment::RangeWithBitmap {
1057            range: 10..20,
1058            bitmap,
1059        };
1060        assert!(segment.contains(10), "Should contain 10");
1061        assert!(segment.contains(12), "Should contain 12");
1062        assert!(
1063            !segment.contains(13),
1064            "Should not contain 13 (cleared in bitmap)"
1065        );
1066        assert!(segment.contains(16), "Should contain 16");
1067        assert!(
1068            !segment.contains(17),
1069            "Should not contain 17 (cleared in bitmap)"
1070        );
1071        assert!(segment.contains(19), "Should contain 19");
1072        assert!(
1073            !segment.contains(20),
1074            "Should not contain 20 (out of range)"
1075        );
1076
1077        // Test SortedArray: sparse sorted sequence
1078        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
1079        assert!(segment.contains(1), "Should contain 1");
1080        assert!(segment.contains(5), "Should contain 5");
1081        assert!(segment.contains(10), "Should contain 10");
1082        assert!(!segment.contains(0), "Should not contain 0");
1083        assert!(!segment.contains(3), "Should not contain 3");
1084        assert!(!segment.contains(15), "Should not contain 15");
1085
1086        // Test Array: unsorted sequence
1087        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
1088        assert!(segment.contains(1), "Should contain 1");
1089        assert!(segment.contains(5), "Should contain 5");
1090        assert!(segment.contains(10), "Should contain 10");
1091        assert!(!segment.contains(0), "Should not contain 0");
1092        assert!(!segment.contains(3), "Should not contain 3");
1093        assert!(!segment.contains(15), "Should not contain 15");
1094
1095        // Test empty segment
1096        let segment = U64Segment::Range(0..0);
1097        assert!(
1098            !segment.contains(0),
1099            "Empty segment should not contain anything"
1100        );
1101        assert!(
1102            !segment.contains(5),
1103            "Empty segment should not contain anything"
1104        );
1105    }
1106}