Skip to main content

lance_table/rowids/
segment.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::{Range, RangeInclusive};
5
6use super::{bitmap::Bitmap, encoded_array::EncodedU64Array};
7use deepsize::DeepSizeOf;
8
9/// Convert an estimated serialized byte cost from `u128` to `usize`, saturating
10/// at [`usize::MAX`] when the value does not fit (infeasible encodings).
11#[inline]
12fn u128_byte_cost_to_usize(v: u128) -> usize {
13    usize::try_from(v).unwrap_or(usize::MAX)
14}
15
16/// Different ways to represent a sequence of distinct u64s.
17///
18/// This is designed to be especially efficient for sequences that are sorted,
19/// but not meaningfully larger than a `Vec<u64>` in the worst case.
20///
21/// The representation is chosen based on the properties of the sequence:
22///                                                           
23///  Sorted?───►Yes ───►Contiguous?─► Yes─► Range            
24///    │                ▼                                 
25///    │                No                                
26///    │                ▼                                 
27///    │              Dense?─────► Yes─► RangeWithBitmap/RangeWithHoles
28///    │                ▼                                 
29///    │                No─────────────► SortedArray      
30///    ▼                                                    
31///    No──────────────────────────────► Array            
32///
33/// "Dense" is decided based on the estimated byte size of the representation.
34///
35/// Size of RangeWithBitMap for N values:
36///     8 bytes + 8 bytes + ceil((max - min) / 8) bytes
37/// Size of SortedArray for N values (assuming u16 packed):
38///     8 bytes + 8 bytes + 8 bytes + 2 bytes * N
39///
40#[derive(Debug, PartialEq, Eq, Clone)]
41pub enum U64Segment {
42    /// A contiguous sorted range of row ids.
43    ///
44    /// Total size: 16 bytes
45    Range(Range<u64>),
46    /// A sorted range of row ids, that is mostly contiguous.
47    ///
48    /// Total size: 24 bytes + n_holes * 4 bytes
49    /// Use when: 32 * n_holes < max - min
50    RangeWithHoles {
51        range: Range<u64>,
52        /// Bitmap of offsets from the start of the range that are holes.
53        /// This is sorted, so binary search can be used. It's typically
54        /// relatively small.
55        holes: EncodedU64Array,
56    },
57    /// A sorted range of row ids, that is mostly contiguous.
58    ///
59    /// Bitmap is 1 when the value is present, 0 when it's missing.
60    ///
61    /// Total size: 24 bytes + ceil((max - min) / 8) bytes
62    /// Use when: max - min > 16 * len
63    RangeWithBitmap { range: Range<u64>, bitmap: Bitmap },
64    /// A sorted array of row ids, that is sparse.
65    ///
66    /// Total size: 24 bytes + 2 * n_values bytes
67    SortedArray(EncodedU64Array),
68    /// An array of row ids, that is not sorted.
69    Array(EncodedU64Array),
70}
71
72impl DeepSizeOf for U64Segment {
73    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
74        match self {
75            Self::Range(_) => 0,
76            Self::RangeWithHoles { holes, .. } => holes.deep_size_of_children(context),
77            Self::RangeWithBitmap { bitmap, .. } => bitmap.deep_size_of_children(context),
78            Self::SortedArray(array) => array.deep_size_of_children(context),
79            Self::Array(array) => array.deep_size_of_children(context),
80        }
81    }
82}
83
84/// Statistics about a segment of u64s.
85#[derive(Debug)]
86struct SegmentStats {
87    /// Min value in the segment.
88    min: u64,
89    /// Max value in the segment
90    max: u64,
91    /// Total number of values in the segment
92    count: u64,
93    /// Whether the segment is sorted
94    sorted: bool,
95}
96
97impl SegmentStats {
98    /// Number of missing values ("holes") in the range `[min, max]`.
99    ///
100    /// Returns `u128` because the total slot count `max - min + 1` can be up
101    /// to `2^64` (when `min = 0, max = u64::MAX`), which exceeds `u64::MAX`.
102    fn n_holes(&self) -> u128 {
103        debug_assert!(self.sorted);
104        if self.count == 0 {
105            0
106        } else {
107            let total_slots = self.max as u128 - self.min as u128 + 1;
108            total_slots - self.count as u128
109        }
110    }
111}
112
113impl U64Segment {
114    /// Return the values that are missing from the slice.
115    fn holes_in_slice<'a>(
116        range: RangeInclusive<u64>,
117        existing: impl IntoIterator<Item = u64> + 'a,
118    ) -> impl Iterator<Item = u64> + 'a {
119        let mut existing = existing.into_iter().peekable();
120        range.filter(move |val| {
121            if let Some(&existing_val) = existing.peek()
122                && existing_val == *val
123            {
124                existing.next();
125                return false;
126            }
127            true
128        })
129    }
130
131    fn compute_stats(values: impl IntoIterator<Item = u64>) -> SegmentStats {
132        let mut sorted = true;
133        let mut min = u64::MAX;
134        let mut max = 0;
135        let mut count = 0;
136
137        for val in values {
138            count += 1;
139            if val < min {
140                min = val;
141            }
142            if val > max {
143                max = val;
144            }
145            if sorted && count > 1 && val < max {
146                sorted = false;
147            }
148        }
149
150        if count == 0 {
151            min = 0;
152            max = 0;
153        }
154
155        SegmentStats {
156            min,
157            max,
158            count,
159            sorted,
160        }
161    }
162
163    /// Estimate the serialized byte size of each sorted encoding variant.
164    ///
165    /// All arithmetic is performed in `u128` to avoid overflow when the range
166    /// span `max - min + 1` approaches or exceeds `2^64`. Infeasible sizes
167    /// saturate to `usize::MAX` so they always lose the `min()` comparison.
168    fn sorted_sequence_sizes(stats: &SegmentStats) -> [usize; 3] {
169        let n_holes = stats.n_holes();
170        let total_slots = stats.max as u128 - stats.min as u128 + 1;
171
172        let range_with_holes = 24u128.saturating_add(4u128.saturating_mul(n_holes));
173        let range_with_bitmap = 24u128.saturating_add(total_slots.div_ceil(8));
174        let sorted_array = 24u128.saturating_add(2u128.saturating_mul(stats.count as u128));
175
176        [
177            u128_byte_cost_to_usize(range_with_holes),
178            u128_byte_cost_to_usize(range_with_bitmap),
179            u128_byte_cost_to_usize(sorted_array),
180        ]
181    }
182
183    fn from_stats_and_sequence(
184        stats: SegmentStats,
185        sequence: impl IntoIterator<Item = u64>,
186    ) -> Self {
187        if stats.sorted {
188            let n_holes = stats.n_holes();
189            // Range-backed encodings store an exclusive end as `Range<u64>`,
190            // which cannot represent `u64::MAX + 1`. Compute the end once and
191            // gate all range-backed branches on its representability.
192            let exclusive_end = stats.max.checked_add(1);
193            if stats.count == 0 {
194                Self::Range(0..0)
195            } else if n_holes == 0 && exclusive_end.is_some() {
196                Self::Range(stats.min..exclusive_end.unwrap())
197            } else if let Some(end) = exclusive_end {
198                let sizes = Self::sorted_sequence_sizes(&stats);
199                let min_size = sizes.iter().min().unwrap();
200                if min_size == &sizes[0] {
201                    let range = stats.min..end;
202                    let mut holes =
203                        Self::holes_in_slice(stats.min..=stats.max, sequence).collect::<Vec<_>>();
204                    holes.sort_unstable();
205                    let holes = EncodedU64Array::from(holes);
206                    Self::RangeWithHoles { range, holes }
207                } else if min_size == &sizes[1] {
208                    let range = stats.min..end;
209                    let mut bitmap = Bitmap::new_full((stats.max - stats.min) as usize + 1);
210                    for hole in Self::holes_in_slice(stats.min..=stats.max, sequence) {
211                        let offset = (hole - stats.min) as usize;
212                        bitmap.clear(offset);
213                    }
214                    Self::RangeWithBitmap { range, bitmap }
215                } else {
216                    Self::SortedArray(EncodedU64Array::from_iter(sequence))
217                }
218            } else {
219                // max == u64::MAX: exclusive end is unrepresentable in Range<u64>,
220                // so no range-backed encoding can be used.
221                Self::SortedArray(EncodedU64Array::from_iter(sequence))
222            }
223        } else {
224            Self::Array(EncodedU64Array::from_iter(sequence))
225        }
226    }
227
228    pub fn from_slice(slice: &[u64]) -> Self {
229        Self::from_iter(slice.iter().copied())
230    }
231}
232
233impl FromIterator<u64> for U64Segment {
234    fn from_iter<T: IntoIterator<Item = u64>>(iter: T) -> Self {
235        let values: Vec<u64> = iter.into_iter().collect();
236        let stats = Self::compute_stats(values.iter().copied());
237        Self::from_stats_and_sequence(stats, values)
238    }
239}
240
241impl U64Segment {
242    pub fn iter(&self) -> Box<dyn DoubleEndedIterator<Item = u64> + '_> {
243        match self {
244            Self::Range(range) => Box::new(range.clone()),
245            Self::RangeWithHoles { range, holes } => {
246                Box::new((range.start..range.end).filter(move |&val| {
247                    // TODO: we could write a more optimal version of this
248                    // iterator, but would need special handling to make it
249                    // double ended.
250                    holes.binary_search(val).is_err()
251                }))
252            }
253            Self::RangeWithBitmap { range, bitmap } => {
254                Box::new((range.start..range.end).filter(|val| {
255                    let offset = (val - range.start) as usize;
256                    bitmap.get(offset)
257                }))
258            }
259            Self::SortedArray(array) => Box::new(array.iter()),
260            Self::Array(array) => Box::new(array.iter()),
261        }
262    }
263
264    pub fn len(&self) -> usize {
265        match self {
266            Self::Range(range) => (range.end - range.start) as usize,
267            Self::RangeWithHoles { range, holes } => {
268                let holes = holes.iter().count();
269                (range.end - range.start) as usize - holes
270            }
271            Self::RangeWithBitmap { range, bitmap } => {
272                let holes = bitmap.count_zeros();
273                (range.end - range.start) as usize - holes
274            }
275            Self::SortedArray(array) => array.len(),
276            Self::Array(array) => array.len(),
277        }
278    }
279
280    pub fn is_empty(&self) -> bool {
281        self.len() == 0
282    }
283
284    /// Get the min and max value of the segment, excluding tombstones.
285    pub fn range(&self) -> Option<RangeInclusive<u64>> {
286        match self {
287            Self::Range(range) if range.is_empty() => None,
288            Self::Range(range)
289            | Self::RangeWithBitmap { range, .. }
290            | Self::RangeWithHoles { range, .. } => Some(range.start..=(range.end - 1)),
291            Self::SortedArray(array) => {
292                // We can assume that the array is sorted.
293                let min_value = array.first().unwrap();
294                let max_value = array.last().unwrap();
295                Some(min_value..=max_value)
296            }
297            Self::Array(array) => {
298                let min_value = array.min().unwrap();
299                let max_value = array.max().unwrap();
300                Some(min_value..=max_value)
301            }
302        }
303    }
304
305    pub fn slice(&self, offset: usize, len: usize) -> Self {
306        if len == 0 {
307            return Self::Range(0..0);
308        }
309
310        let values: Vec<u64> = self.iter().skip(offset).take(len).collect();
311
312        // `from_slice` will compute stats and select the best representation.
313        Self::from_slice(&values)
314    }
315
316    pub fn position(&self, val: u64) -> Option<usize> {
317        match self {
318            Self::Range(range) => {
319                if range.contains(&val) {
320                    Some((val - range.start) as usize)
321                } else {
322                    None
323                }
324            }
325            Self::RangeWithHoles { range, holes } => {
326                if !range.contains(&val) {
327                    return None;
328                }
329                // binary_search returns Err(idx) where idx is the count of holes
330                // strictly less than val (holes are unique and sorted).
331                match holes.binary_search(val) {
332                    Ok(_) => None,
333                    Err(num_holes_before) => {
334                        let offset = (val - range.start) as usize;
335                        Some(offset - num_holes_before)
336                    }
337                }
338            }
339            Self::RangeWithBitmap { range, bitmap } => {
340                if range.contains(&val) && bitmap.get((val - range.start) as usize) {
341                    let offset = (val - range.start) as usize;
342                    let num_zeros = bitmap.slice(0, offset).count_zeros();
343                    Some(offset - num_zeros)
344                } else {
345                    None
346                }
347            }
348            Self::SortedArray(array) => array.binary_search(val).ok(),
349            Self::Array(array) => array.iter().position(|v| v == val),
350        }
351    }
352
353    pub fn get(&self, i: usize) -> Option<u64> {
354        match self {
355            Self::Range(range) => match range.start.checked_add(i as u64) {
356                Some(val) if val < range.end => Some(val),
357                _ => None,
358            },
359            Self::RangeWithHoles { range, holes } => {
360                let len = (range.end - range.start) as usize - holes.len();
361                if i >= len {
362                    return None;
363                }
364                // The i-th surviving value v satisfies v = range.start + i + k,
365                // where k = |{h ∈ holes : h < v}|. holes[k] - k is monotone
366                // non-decreasing in k (holes are sorted and unique), so binary
367                // search for the smallest k such that holes[k] - k > range.start + i.
368                let target = range.start + i as u64;
369                let mut lo = 0usize;
370                let mut hi = holes.len();
371                while lo < hi {
372                    let mid = (lo + hi) / 2;
373                    let h = holes.get(mid).unwrap();
374                    if h.saturating_sub(mid as u64) > target {
375                        hi = mid;
376                    } else {
377                        lo = mid + 1;
378                    }
379                }
380                Some(range.start + i as u64 + lo as u64)
381            }
382            Self::RangeWithBitmap { range, bitmap } => {
383                // Find the i-th set bit (a "select1") via byte-wise popcount.
384                // Bytes past `bitmap.len()` are zero-padded by construction
385                // (Bitmap::new_full), so popcount counts only valid positions.
386                let mut remaining = i;
387                for (byte_idx, &byte) in bitmap.data.iter().enumerate() {
388                    let ones = byte.count_ones() as usize;
389                    if remaining < ones {
390                        let mut b = byte;
391                        for _ in 0..remaining {
392                            b &= b - 1; // clear lowest set bit
393                        }
394                        let bit = b.trailing_zeros() as usize;
395                        return Some(range.start + (byte_idx * 8 + bit) as u64);
396                    }
397                    remaining -= ones;
398                }
399                None
400            }
401            Self::SortedArray(array) => array.get(i),
402            Self::Array(array) => array.get(i),
403        }
404    }
405
406    /// Check if a value is contained in the segment
407    pub fn contains(&self, val: u64) -> bool {
408        match self {
409            Self::Range(range) => range.contains(&val),
410            Self::RangeWithHoles { range, holes } => {
411                if !range.contains(&val) {
412                    return false;
413                }
414                // Check if the value is not in the holes
415                !holes.iter().any(|hole| hole == val)
416            }
417            Self::RangeWithBitmap { range, bitmap } => {
418                if !range.contains(&val) {
419                    return false;
420                }
421                // Check if the bitmap has the value set (not cleared)
422                let idx = (val - range.start) as usize;
423                bitmap.get(idx)
424            }
425            Self::SortedArray(array) => array.binary_search(val).is_ok(),
426            Self::Array(array) => array.iter().any(|v| v == val),
427        }
428    }
429
430    /// Produce a new segment that has `val` as the new highest value in the segment
431    pub fn with_new_high(self, val: u64) -> lance_core::Result<Self> {
432        // Check that the new value is higher than the current maximum
433        if let Some(range) = self.range()
434            && val <= *range.end()
435        {
436            return Err(lance_core::Error::invalid_input(format!(
437                "New value {} must be higher than current maximum {}",
438                val,
439                range.end()
440            )));
441        }
442
443        Ok(match self {
444            Self::Range(range) => {
445                // Special case for empty range: create a range containing only the new value
446                if range.start == range.end {
447                    Self::Range(Range {
448                        start: val,
449                        end: val + 1,
450                    })
451                } else if val == range.end {
452                    Self::Range(Range {
453                        start: range.start,
454                        end: val + 1,
455                    })
456                } else {
457                    Self::RangeWithHoles {
458                        range: Range {
459                            start: range.start,
460                            end: val + 1,
461                        },
462                        holes: EncodedU64Array::U64((range.end..val).collect()),
463                    }
464                }
465            }
466            Self::RangeWithHoles { range, holes } => {
467                if val == range.end {
468                    Self::RangeWithHoles {
469                        range: Range {
470                            start: range.start,
471                            end: val + 1,
472                        },
473                        holes,
474                    }
475                } else {
476                    let mut new_holes: Vec<u64> = holes.iter().collect();
477                    new_holes.extend(range.end..val);
478                    Self::RangeWithHoles {
479                        range: Range {
480                            start: range.start,
481                            end: val + 1,
482                        },
483                        holes: EncodedU64Array::U64(new_holes),
484                    }
485                }
486            }
487            Self::RangeWithBitmap { range, bitmap } => {
488                let new_range = Range {
489                    start: range.start,
490                    end: val + 1,
491                };
492                let gap_size = (val - range.end) as usize;
493                let new_bitmap = bitmap
494                    .iter()
495                    .chain(std::iter::repeat_n(false, gap_size))
496                    .chain(std::iter::once(true))
497                    .collect::<Vec<bool>>();
498
499                Self::RangeWithBitmap {
500                    range: new_range,
501                    bitmap: Bitmap::from(new_bitmap.as_slice()),
502                }
503            }
504            Self::SortedArray(array) => match array {
505                EncodedU64Array::U64(mut vec) => {
506                    vec.push(val);
507                    Self::SortedArray(EncodedU64Array::U64(vec))
508                }
509                EncodedU64Array::U16 { base, offsets } => {
510                    if let Some(offset) = val.checked_sub(base) {
511                        if offset <= u16::MAX as u64 {
512                            let mut offsets = offsets;
513                            offsets.push(offset as u16);
514                            return Ok(Self::SortedArray(EncodedU64Array::U16 { base, offsets }));
515                        } else if offset <= u32::MAX as u64 {
516                            let mut u32_offsets: Vec<u32> =
517                                offsets.into_iter().map(|o| o as u32).collect();
518                            u32_offsets.push(offset as u32);
519                            return Ok(Self::SortedArray(EncodedU64Array::U32 {
520                                base,
521                                offsets: u32_offsets,
522                            }));
523                        }
524                    }
525                    let mut new_array: Vec<u64> =
526                        offsets.into_iter().map(|o| base + o as u64).collect();
527                    new_array.push(val);
528                    Self::SortedArray(EncodedU64Array::from(new_array))
529                }
530                EncodedU64Array::U32 { base, mut offsets } => {
531                    if let Some(offset) = val.checked_sub(base)
532                        && offset <= u32::MAX as u64
533                    {
534                        offsets.push(offset as u32);
535                        return Ok(Self::SortedArray(EncodedU64Array::U32 { base, offsets }));
536                    }
537                    let mut new_array: Vec<u64> =
538                        offsets.into_iter().map(|o| base + o as u64).collect();
539                    new_array.push(val);
540                    Self::SortedArray(EncodedU64Array::from(new_array))
541                }
542            },
543            Self::Array(array) => match array {
544                EncodedU64Array::U64(mut vec) => {
545                    vec.push(val);
546                    Self::Array(EncodedU64Array::U64(vec))
547                }
548                EncodedU64Array::U16 { base, offsets } => {
549                    if let Some(offset) = val.checked_sub(base) {
550                        if offset <= u16::MAX as u64 {
551                            let mut offsets = offsets;
552                            offsets.push(offset as u16);
553                            return Ok(Self::Array(EncodedU64Array::U16 { base, offsets }));
554                        } else if offset <= u32::MAX as u64 {
555                            let mut u32_offsets: Vec<u32> =
556                                offsets.into_iter().map(|o| o as u32).collect();
557                            u32_offsets.push(offset as u32);
558                            return Ok(Self::Array(EncodedU64Array::U32 {
559                                base,
560                                offsets: u32_offsets,
561                            }));
562                        }
563                    }
564                    let mut new_array: Vec<u64> =
565                        offsets.into_iter().map(|o| base + o as u64).collect();
566                    new_array.push(val);
567                    Self::Array(EncodedU64Array::from(new_array))
568                }
569                EncodedU64Array::U32 { base, mut offsets } => {
570                    if let Some(offset) = val.checked_sub(base)
571                        && offset <= u32::MAX as u64
572                    {
573                        offsets.push(offset as u32);
574                        return Ok(Self::Array(EncodedU64Array::U32 { base, offsets }));
575                    }
576                    let mut new_array: Vec<u64> =
577                        offsets.into_iter().map(|o| base + o as u64).collect();
578                    new_array.push(val);
579                    Self::Array(EncodedU64Array::from(new_array))
580                }
581            },
582        })
583    }
584
585    /// Delete a set of row ids from the segment.
586    /// The row ids are assumed to be in the segment. (within the range, not
587    /// already deleted.)
588    /// They are also assumed to be ordered by appearance in the segment.
589    pub fn delete(&self, vals: &[u64]) -> Self {
590        // TODO: can we enforce these assumptions? or make them safer?
591        debug_assert!(vals.iter().all(|&val| self.range().unwrap().contains(&val)));
592
593        let make_new_iter = || {
594            let mut vals_iter = vals.iter().copied().peekable();
595            self.iter().filter(move |val| {
596                if let Some(&next_val) = vals_iter.peek()
597                    && next_val == *val
598                {
599                    vals_iter.next();
600                    return false;
601                }
602                true
603            })
604        };
605        let stats = Self::compute_stats(make_new_iter());
606        Self::from_stats_and_sequence(stats, make_new_iter())
607    }
608
609    pub fn mask(&mut self, positions: &[u32]) {
610        if positions.is_empty() {
611            return;
612        }
613        if positions.len() == self.len() {
614            *self = Self::Range(0..0);
615            return;
616        }
617        let count = (self.len() - positions.len()) as u64;
618        let sorted = match self {
619            Self::Range(_) => true,
620            Self::RangeWithHoles { .. } => true,
621            Self::RangeWithBitmap { .. } => true,
622            Self::SortedArray(_) => true,
623            Self::Array(_) => false,
624        };
625        // To get minimum, need to find the first value that is not masked.
626        let first_unmasked = (0..self.len())
627            .zip(positions.iter().cycle())
628            .find(|(sequential_i, i)| **i != *sequential_i as u32)
629            .map(|(sequential_i, _)| sequential_i)
630            .unwrap();
631        let min = self.get(first_unmasked).unwrap();
632
633        let last_unmasked = (0..self.len())
634            .rev()
635            .zip(positions.iter().rev().cycle())
636            .filter(|(sequential_i, i)| **i != *sequential_i as u32)
637            .map(|(sequential_i, _)| sequential_i)
638            .next()
639            .unwrap();
640        let max = self.get(last_unmasked).unwrap();
641
642        let stats = SegmentStats {
643            min,
644            max,
645            count,
646            sorted,
647        };
648
649        let mut positions = positions.iter().copied().peekable();
650        let sequence = self.iter().enumerate().filter_map(move |(i, val)| {
651            if let Some(next_pos) = positions.peek()
652                && *next_pos == i as u32
653            {
654                positions.next();
655                return None;
656            }
657            Some(val)
658        });
659        *self = Self::from_stats_and_sequence(stats, sequence)
660    }
661}
662
663#[cfg(test)]
664mod test {
665    use super::*;
666
667    #[test]
668    fn test_segments() {
669        fn check_segment(values: &[u64], expected: &U64Segment) {
670            let segment = U64Segment::from_slice(values);
671            assert_eq!(segment, *expected);
672            assert_eq!(values.len(), segment.len());
673
674            let roundtripped = segment.iter().collect::<Vec<_>>();
675            assert_eq!(roundtripped, values);
676
677            let expected_min = values.iter().copied().min();
678            let expected_max = values.iter().copied().max();
679            match segment.range() {
680                Some(range) => {
681                    assert_eq!(range.start(), &expected_min.unwrap());
682                    assert_eq!(range.end(), &expected_max.unwrap());
683                }
684                None => {
685                    assert_eq!(expected_min, None);
686                    assert_eq!(expected_max, None);
687                }
688            }
689
690            for (i, value) in values.iter().enumerate() {
691                assert_eq!(segment.get(i), Some(*value), "i = {}", i);
692                assert_eq!(segment.position(*value), Some(i), "i = {}", i);
693            }
694
695            check_segment_iter(&segment);
696        }
697
698        fn check_segment_iter(segment: &U64Segment) {
699            // Should be able to iterate forwards and backwards, and get the same thing.
700            let forwards = segment.iter().collect::<Vec<_>>();
701            let mut backwards = segment.iter().rev().collect::<Vec<_>>();
702            backwards.reverse();
703            assert_eq!(forwards, backwards);
704
705            // Should be able to pull from both sides in lockstep.
706            let mut expected = Vec::with_capacity(segment.len());
707            let mut actual = Vec::with_capacity(segment.len());
708            let mut iter = segment.iter();
709            // Alternating forwards and backwards
710            for i in 0..segment.len() {
711                if i % 2 == 0 {
712                    actual.push(iter.next().unwrap());
713                    expected.push(segment.get(i / 2).unwrap());
714                } else {
715                    let i = segment.len() - 1 - i / 2;
716                    actual.push(iter.next_back().unwrap());
717                    expected.push(segment.get(i).unwrap());
718                };
719            }
720            assert_eq!(expected, actual);
721        }
722
723        // Empty
724        check_segment(&[], &U64Segment::Range(0..0));
725
726        // Single value
727        check_segment(&[42], &U64Segment::Range(42..43));
728
729        // Contiguous range
730        check_segment(
731            &(100..200).collect::<Vec<_>>(),
732            &U64Segment::Range(100..200),
733        );
734
735        // Range with a hole
736        let values = (0..1000).filter(|&x| x != 100).collect::<Vec<_>>();
737        check_segment(
738            &values,
739            &U64Segment::RangeWithHoles {
740                range: 0..1000,
741                holes: vec![100].into(),
742            },
743        );
744
745        // Range with every other value missing
746        let values = (0..1000).filter(|&x| x % 2 == 0).collect::<Vec<_>>();
747        check_segment(
748            &values,
749            &U64Segment::RangeWithBitmap {
750                range: 0..999,
751                bitmap: Bitmap::from((0..999).map(|x| x % 2 == 0).collect::<Vec<_>>().as_slice()),
752            },
753        );
754
755        // Sparse but sorted sequence
756        check_segment(
757            &[1, 7000, 24000],
758            &U64Segment::SortedArray(vec![1, 7000, 24000].into()),
759        );
760
761        // Sparse unsorted sequence
762        check_segment(
763            &[7000, 1, 24000],
764            &U64Segment::Array(vec![7000, 1, 24000].into()),
765        );
766    }
767
768    #[test]
769    fn test_segment_overflow_boundary() {
770        // Sparse range spanning i64::MAX — the original overflow reproducer.
771        // n_holes ≈ 2^63, which overflows `4 * n_holes as usize` without u128 arithmetic.
772        let values: Vec<u64> = vec![0, 1, 2, 100, i64::MAX as u64];
773        let segment = U64Segment::from_slice(&values);
774        assert!(
775            matches!(segment, U64Segment::SortedArray(_)),
776            "sparse range spanning i64::MAX should be SortedArray, got {:?}",
777            std::mem::discriminant(&segment)
778        );
779        assert_eq!(segment.len(), 5);
780        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
781
782        // Two values at u64 extremes — triggers n_holes() total_slots overflow
783        // (u64::MAX - 0 + 1 wraps to 0 without u128).
784        let values: Vec<u64> = vec![0, u64::MAX];
785        let segment = U64Segment::from_slice(&values);
786        assert!(
787            matches!(segment, U64Segment::SortedArray(_)),
788            "full u64 span should be SortedArray, got {:?}",
789            std::mem::discriminant(&segment)
790        );
791        assert_eq!(segment.len(), 2);
792        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
793
794        // Small dense set near u64::MAX — cost estimation correctly prefers a
795        // range-backed encoding, but Range<u64> cannot represent u64::MAX + 1
796        // as the exclusive end. Must fall back to SortedArray.
797        let values: Vec<u64> = vec![u64::MAX - 3, u64::MAX - 1, u64::MAX];
798        let segment = U64Segment::from_slice(&values);
799        assert!(
800            matches!(segment, U64Segment::SortedArray(_)),
801            "dense set near u64::MAX should be SortedArray (exclusive end unrepresentable), got {:?}",
802            std::mem::discriminant(&segment)
803        );
804        assert_eq!(segment.len(), 3);
805        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
806
807        // Single value at u64::MAX — contiguous range with n_holes == 0, but
808        // exclusive end u64::MAX + 1 overflows.
809        let values: Vec<u64> = vec![u64::MAX];
810        let segment = U64Segment::from_slice(&values);
811        assert!(
812            matches!(segment, U64Segment::SortedArray(_)),
813            "single u64::MAX should be SortedArray, got {:?}",
814            std::mem::discriminant(&segment)
815        );
816        assert_eq!(segment.len(), 1);
817        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
818
819        // Contiguous range ending just below u64::MAX — exclusive end is
820        // representable, so Range encoding should still be used.
821        let values: Vec<u64> = vec![u64::MAX - 3, u64::MAX - 2, u64::MAX - 1];
822        let segment = U64Segment::from_slice(&values);
823        assert_eq!(segment, U64Segment::Range((u64::MAX - 3)..u64::MAX));
824        assert_eq!(segment.len(), 3);
825        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
826
827        // Regression: normal dense range with few holes still picks RangeWithHoles.
828        // Needs total_slots > 32 * n_holes for RangeWithHoles to beat RangeWithBitmap.
829        let values: Vec<u64> = (100..1100).filter(|&x| x != 500).collect();
830        let segment = U64Segment::from_slice(&values);
831        assert_eq!(
832            segment,
833            U64Segment::RangeWithHoles {
834                range: 100..1100,
835                holes: vec![500].into(),
836            }
837        );
838        assert_eq!(segment.len(), 999);
839        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
840
841        // Regression: small dense range with hole picks RangeWithBitmap.
842        let values: Vec<u64> = vec![100, 101, 102, 103, 105];
843        let segment = U64Segment::from_slice(&values);
844        assert!(
845            matches!(segment, U64Segment::RangeWithBitmap { .. }),
846            "small dense range with hole should be RangeWithBitmap, got {:?}",
847            std::mem::discriminant(&segment)
848        );
849        assert_eq!(segment.len(), 5);
850        assert_eq!(segment.iter().collect::<Vec<_>>(), values);
851    }
852
853    #[test]
854    fn test_u128_byte_cost_to_usize() {
855        assert_eq!(super::u128_byte_cost_to_usize(0), 0);
856        assert_eq!(super::u128_byte_cost_to_usize(42), 42);
857        assert_eq!(
858            super::u128_byte_cost_to_usize(usize::MAX as u128),
859            usize::MAX
860        );
861        assert_eq!(super::u128_byte_cost_to_usize(u128::MAX), usize::MAX);
862    }
863
864    #[test]
865    fn test_sorted_sequence_sizes_sparse_span_saturates_range_with_holes_cost() {
866        let stats = super::SegmentStats {
867            min: 0,
868            max: i64::MAX as u64,
869            count: 5,
870            sorted: true,
871        };
872        let sizes = U64Segment::sorted_sequence_sizes(&stats);
873        assert_eq!(sizes[0], usize::MAX);
874        assert!(sizes[2] < sizes[0]);
875    }
876
877    #[test]
878    fn test_sorted_sequence_sizes_sorted_array_cost_saturates() {
879        // Nearly full [0, u64::MAX] with one hole: count = u64::MAX, n_holes = 1.
880        // SortedArray cost 24 + 2 * u64::MAX does not fit in usize on 64-bit.
881        let stats = super::SegmentStats {
882            min: 0,
883            max: u64::MAX,
884            count: u64::MAX,
885            sorted: true,
886        };
887        let sizes = U64Segment::sorted_sequence_sizes(&stats);
888        assert_eq!(sizes[2], usize::MAX);
889    }
890
891    #[test]
892    fn test_sorted_sequence_sizes_full_span_bitmap_cost() {
893        // Synthetic stats: full [0, u64::MAX] slot space; exercises `range_with_bitmap`
894        // cost path (always fits in `usize` on 64-bit targets).
895        let stats = super::SegmentStats {
896            min: 0,
897            max: u64::MAX,
898            count: 1,
899            sorted: true,
900        };
901        let sizes = U64Segment::sorted_sequence_sizes(&stats);
902        assert!(sizes[1] < sizes[0]);
903        assert!(sizes[1] < usize::MAX);
904    }
905
906    #[test]
907    fn test_with_new_high() {
908        // Test Range: contiguous sequence
909        let segment = U64Segment::Range(10..20);
910
911        // Test adding value that extends the range
912        let result = segment.clone().with_new_high(20).unwrap();
913        assert_eq!(result, U64Segment::Range(10..21));
914
915        // Test adding value that creates holes
916        let result = segment.with_new_high(25).unwrap();
917        assert_eq!(
918            result,
919            U64Segment::RangeWithHoles {
920                range: 10..26,
921                holes: EncodedU64Array::U64(vec![20, 21, 22, 23, 24]),
922            }
923        );
924
925        // Test RangeWithHoles: sequence with existing holes
926        let segment = U64Segment::RangeWithHoles {
927            range: 10..20,
928            holes: EncodedU64Array::U64(vec![15, 17]),
929        };
930
931        // Test adding value that extends the range without new holes
932        let result = segment.clone().with_new_high(20).unwrap();
933        assert_eq!(
934            result,
935            U64Segment::RangeWithHoles {
936                range: 10..21,
937                holes: EncodedU64Array::U64(vec![15, 17]),
938            }
939        );
940
941        // Test adding value that creates additional holes
942        let result = segment.with_new_high(25).unwrap();
943        assert_eq!(
944            result,
945            U64Segment::RangeWithHoles {
946                range: 10..26,
947                holes: EncodedU64Array::U64(vec![15, 17, 20, 21, 22, 23, 24]),
948            }
949        );
950
951        // Test RangeWithBitmap: sequence with bitmap representation
952        let mut bitmap = Bitmap::new_full(10);
953        bitmap.clear(3); // Clear position 3 (value 13)
954        bitmap.clear(7); // Clear position 7 (value 17)
955        let segment = U64Segment::RangeWithBitmap {
956            range: 10..20,
957            bitmap,
958        };
959
960        // Test adding value that extends the range without new holes
961        let result = segment.clone().with_new_high(20).unwrap();
962        let expected_bitmap = {
963            let mut b = Bitmap::new_full(11);
964            b.clear(3); // Clear position 3 (value 13)
965            b.clear(7); // Clear position 7 (value 17)
966            b
967        };
968        assert_eq!(
969            result,
970            U64Segment::RangeWithBitmap {
971                range: 10..21,
972                bitmap: expected_bitmap,
973            }
974        );
975
976        // Test adding value that creates additional holes
977        let result = segment.with_new_high(25).unwrap();
978        let expected_bitmap = {
979            let mut b = Bitmap::new_full(16);
980            b.clear(3); // Clear position 3 (value 13)
981            b.clear(7); // Clear position 7 (value 17)
982            // Clear positions 10-14 (values 20-24)
983            for i in 10..15 {
984                b.clear(i);
985            }
986            b
987        };
988        assert_eq!(
989            result,
990            U64Segment::RangeWithBitmap {
991                range: 10..26,
992                bitmap: expected_bitmap,
993            }
994        );
995
996        // Test SortedArray: sparse sorted sequence
997        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
998
999        let result = segment.with_new_high(15).unwrap();
1000        assert_eq!(
1001            result,
1002            U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10, 15]))
1003        );
1004
1005        // Test Array: unsorted sequence
1006        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
1007
1008        let result = segment.with_new_high(15).unwrap();
1009        assert_eq!(
1010            result,
1011            U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1, 15]))
1012        );
1013
1014        // Test edge cases
1015        // Empty segment
1016        let segment = U64Segment::Range(0..0);
1017        let result = segment.with_new_high(5).unwrap();
1018        assert_eq!(result, U64Segment::Range(5..6));
1019
1020        // Single value segment
1021        let segment = U64Segment::Range(42..43);
1022        let result = segment.with_new_high(50).unwrap();
1023        assert_eq!(
1024            result,
1025            U64Segment::RangeWithHoles {
1026                range: 42..51,
1027                holes: EncodedU64Array::U64(vec![43, 44, 45, 46, 47, 48, 49]),
1028            }
1029        );
1030    }
1031
1032    #[test]
1033    fn test_with_new_high_assertion() {
1034        let segment = U64Segment::Range(10..20);
1035        // This should return an error because 15 is not higher than the current maximum 19
1036        let result = segment.with_new_high(15);
1037        assert!(result.is_err());
1038        let error = result.unwrap_err();
1039        assert!(
1040            error
1041                .to_string()
1042                .contains("New value 15 must be higher than current maximum 19")
1043        );
1044    }
1045
1046    #[test]
1047    fn test_with_new_high_assertion_equal() {
1048        let segment = U64Segment::Range(1..6);
1049        // This should return an error because 5 is not higher than the current maximum 5
1050        let result = segment.with_new_high(5);
1051        assert!(result.is_err());
1052        let error = result.unwrap_err();
1053        assert!(
1054            error
1055                .to_string()
1056                .contains("New value 5 must be higher than current maximum 5")
1057        );
1058    }
1059
1060    #[test]
1061    fn test_contains() {
1062        // Test Range: contiguous sequence
1063        let segment = U64Segment::Range(10..20);
1064        assert!(segment.contains(10), "Should contain 10");
1065        assert!(segment.contains(15), "Should contain 15");
1066        assert!(segment.contains(19), "Should contain 19");
1067        assert!(!segment.contains(9), "Should not contain 9");
1068        assert!(!segment.contains(20), "Should not contain 20");
1069        assert!(!segment.contains(25), "Should not contain 25");
1070
1071        // Test RangeWithHoles: sequence with holes
1072        let segment = U64Segment::RangeWithHoles {
1073            range: 10..20,
1074            holes: EncodedU64Array::U64(vec![15, 17]),
1075        };
1076        assert!(segment.contains(10), "Should contain 10");
1077        assert!(segment.contains(14), "Should contain 14");
1078        assert!(!segment.contains(15), "Should not contain 15 (hole)");
1079        assert!(segment.contains(16), "Should contain 16");
1080        assert!(!segment.contains(17), "Should not contain 17 (hole)");
1081        assert!(segment.contains(18), "Should contain 18");
1082        assert!(
1083            !segment.contains(20),
1084            "Should not contain 20 (out of range)"
1085        );
1086
1087        // Test RangeWithBitmap: sequence with bitmap
1088        let mut bitmap = Bitmap::new_full(10);
1089        bitmap.clear(3); // Clear position 3 (value 13)
1090        bitmap.clear(7); // Clear position 7 (value 17)
1091        let segment = U64Segment::RangeWithBitmap {
1092            range: 10..20,
1093            bitmap,
1094        };
1095        assert!(segment.contains(10), "Should contain 10");
1096        assert!(segment.contains(12), "Should contain 12");
1097        assert!(
1098            !segment.contains(13),
1099            "Should not contain 13 (cleared in bitmap)"
1100        );
1101        assert!(segment.contains(16), "Should contain 16");
1102        assert!(
1103            !segment.contains(17),
1104            "Should not contain 17 (cleared in bitmap)"
1105        );
1106        assert!(segment.contains(19), "Should contain 19");
1107        assert!(
1108            !segment.contains(20),
1109            "Should not contain 20 (out of range)"
1110        );
1111
1112        // Test SortedArray: sparse sorted sequence
1113        let segment = U64Segment::SortedArray(EncodedU64Array::U64(vec![1, 5, 10]));
1114        assert!(segment.contains(1), "Should contain 1");
1115        assert!(segment.contains(5), "Should contain 5");
1116        assert!(segment.contains(10), "Should contain 10");
1117        assert!(!segment.contains(0), "Should not contain 0");
1118        assert!(!segment.contains(3), "Should not contain 3");
1119        assert!(!segment.contains(15), "Should not contain 15");
1120
1121        // Test Array: unsorted sequence
1122        let segment = U64Segment::Array(EncodedU64Array::U64(vec![10, 5, 1]));
1123        assert!(segment.contains(1), "Should contain 1");
1124        assert!(segment.contains(5), "Should contain 5");
1125        assert!(segment.contains(10), "Should contain 10");
1126        assert!(!segment.contains(0), "Should not contain 0");
1127        assert!(!segment.contains(3), "Should not contain 3");
1128        assert!(!segment.contains(15), "Should not contain 15");
1129
1130        // Test empty segment
1131        let segment = U64Segment::Range(0..0);
1132        assert!(
1133            !segment.contains(0),
1134            "Empty segment should not contain anything"
1135        );
1136        assert!(
1137            !segment.contains(5),
1138            "Empty segment should not contain anything"
1139        );
1140    }
1141}