Skip to main content

lance_table/rowids/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use super::{RowIdSequence, U64Segment};
8use deepsize::DeepSizeOf;
9use lance_core::Result;
10use lance_core::utils::address::RowAddress;
11use lance_core::utils::deletion::DeletionVector;
12use rangemap::RangeInclusiveMap;
13
14/// An index of row ids
15///
16/// This index is used to map row ids to their corresponding addresses. These
17/// addresses correspond to physical positions in the dataset. See [RowAddress].
18///
19/// This structure only contains rows that physically exist. However, it may
20/// map to addresses that have been tombstoned. A separate tombstone index is
21/// used to track tombstoned rows.
22// (Implementation)
23// Disjoint ranges of row ids are stored as the keys of the map. The values are
24// a pair of segments. The first segment is the row ids, and the second segment
25// is the addresses.
26#[derive(Debug)]
27pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
28
29pub struct FragmentRowIdIndex {
30    pub fragment_id: u32,
31    pub row_id_sequence: Arc<RowIdSequence>,
32    pub deletion_vector: Arc<DeletionVector>,
33}
34
35impl RowIdIndex {
36    /// Create a new index from a list of fragment ids and their corresponding row id sequences.
37    pub fn new(fragment_indices: &[FragmentRowIdIndex]) -> Result<Self> {
38        let chunks = fragment_indices
39            .iter()
40            .flat_map(decompose_sequence)
41            .collect::<Vec<_>>();
42
43        let mut final_chunks = Vec::new();
44        for processed_chunk in prep_index_chunks(chunks) {
45            match processed_chunk {
46                RawIndexChunk::NonOverlapping(chunk) => {
47                    final_chunks.push(chunk);
48                }
49                RawIndexChunk::Overlapping(range, overlapping_chunks) => {
50                    debug_assert_eq!(
51                        range.end() - range.start() + 1,
52                        overlapping_chunks
53                            .iter()
54                            .map(|(_, (seq, _))| seq.len() as u64)
55                            .sum::<u64>(),
56                        "Wrong range for {:?}, chunks: {:?}",
57                        range,
58                        overlapping_chunks,
59                    );
60                    // Merge overlapping chunks.
61                    let merged_chunk = merge_overlapping_chunks(overlapping_chunks)?;
62                    final_chunks.push(merged_chunk);
63                }
64            }
65        }
66
67        Ok(Self(RangeInclusiveMap::from_iter(final_chunks)))
68    }
69
70    /// Get the address for a given row id.
71    ///
72    /// Will return None if the row id does not exist in the index.
73    pub fn get(&self, row_id: u64) -> Option<RowAddress> {
74        let (row_id_segment, address_segment) = self.0.get(&row_id)?;
75        let pos = row_id_segment.position(row_id)?;
76        let address = address_segment.get(pos)?;
77        Some(RowAddress::from(address))
78    }
79
80    /// Get addresses for many row ids in one pass over the index.
81    ///
82    /// Returns one entry per input id, in input order (`None` for missing).
83    /// Sorts a working copy of the input internally so the chunk iterator
84    /// is advanced at most once per chunk, amortizing the per-id tree walk
85    /// from O(N ยท log F) to O(F + N).
86    pub fn get_many(&self, row_ids: &[u64]) -> Vec<Option<RowAddress>> {
87        let n = row_ids.len();
88        let mut out = vec![None; n];
89        if n == 0 {
90            return out;
91        }
92
93        let mut sorted: Vec<(u64, usize)> = row_ids.iter().copied().zip(0..n).collect();
94        sorted.sort_unstable_by_key(|&(id, _)| id);
95
96        let mut chunks = self.0.iter().peekable();
97        for (id, orig_idx) in sorted {
98            // Advance past chunks that end before this id.
99            while let Some((range, _)) = chunks.peek() {
100                if *range.end() < id {
101                    chunks.next();
102                } else {
103                    break;
104                }
105            }
106            let Some((range, (row_id_seg, addr_seg))) = chunks.peek() else {
107                break;
108            };
109            if id < *range.start() {
110                continue; // falls in a gap between chunks
111            }
112            if let Some(pos) = row_id_seg.position(id)
113                && let Some(addr) = addr_seg.get(pos)
114            {
115                out[orig_idx] = Some(RowAddress::from(addr));
116            }
117        }
118        out
119    }
120}
121
122impl DeepSizeOf for RowIdIndex {
123    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
124        self.0
125            .iter()
126            .map(|(_, (row_id_segment, address_segment))| {
127                (2 * std::mem::size_of::<u64>())
128                    + std::mem::size_of::<(U64Segment, U64Segment)>()
129                    + row_id_segment.deep_size_of_children(context)
130                    + address_segment.deep_size_of_children(context)
131            })
132            .sum()
133    }
134}
135
136fn decompose_sequence(
137    frag_index: &FragmentRowIdIndex,
138) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
139    let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into();
140    let mut current_offset = 0u32;
141    let no_deletions = frag_index.deletion_vector.is_empty();
142
143    frag_index
144        .row_id_sequence
145        .0
146        .iter()
147        .filter_map(|segment| {
148            let segment_len = segment.len();
149
150            let result = if no_deletions {
151                decompose_segment_no_deletions(segment, start_address)
152            } else {
153                decompose_segment_with_deletions(
154                    segment,
155                    start_address,
156                    current_offset,
157                    &frag_index.deletion_vector,
158                )
159            };
160
161            current_offset += segment_len as u32;
162            start_address += segment_len as u64;
163
164            result
165        })
166        .collect()
167}
168
169/// Build an IndexChunk from a list of (row_id, address) pairs.
170fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option<IndexChunk> {
171    if pairs.is_empty() {
172        return None;
173    }
174    let (row_ids, addresses): (Vec<u64>, Vec<u64>) = pairs.into_iter().unzip();
175    let row_id_segment = U64Segment::from_iter(row_ids);
176    let address_segment = U64Segment::from_iter(addresses);
177    let coverage = row_id_segment.range()?;
178    Some((coverage, (row_id_segment, address_segment)))
179}
180
181/// Fast path: no deletions. O(1) for Range segments.
182fn decompose_segment_no_deletions(segment: &U64Segment, start_address: u64) -> Option<IndexChunk> {
183    match segment {
184        U64Segment::Range(range) if !range.is_empty() => {
185            let len = range.end - range.start;
186            let row_id_segment = U64Segment::Range(range.clone());
187            let address_segment = U64Segment::Range(start_address..start_address + len);
188            let coverage = range.start..=range.end - 1;
189            Some((coverage, (row_id_segment, address_segment)))
190        }
191        _ if segment.is_empty() => None,
192        _ => {
193            // Non-Range segments: must iterate to build address mapping.
194            let pairs: Vec<(u64, u64)> = segment
195                .iter()
196                .enumerate()
197                .map(|(i, row_id)| (row_id, start_address + i as u64))
198                .collect();
199            build_chunk_from_pairs(pairs)
200        }
201    }
202}
203
204/// Slow path: has deletions, must check each row.
205fn decompose_segment_with_deletions(
206    segment: &U64Segment,
207    start_address: u64,
208    current_offset: u32,
209    deletion_vector: &DeletionVector,
210) -> Option<IndexChunk> {
211    let pairs: Vec<(u64, u64)> = segment
212        .iter()
213        .enumerate()
214        .filter_map(|(i, row_id)| {
215            let row_offset = current_offset + i as u32;
216            if !deletion_vector.contains(row_offset) {
217                Some((row_id, start_address + i as u64))
218            } else {
219                None
220            }
221        })
222        .collect();
223    build_chunk_from_pairs(pairs)
224}
225
226type IndexChunk = (RangeInclusive<u64>, (U64Segment, U64Segment));
227
228#[derive(Debug)]
229enum RawIndexChunk {
230    NonOverlapping(IndexChunk),
231    Overlapping(RangeInclusive<u64>, Vec<IndexChunk>),
232}
233
234impl RawIndexChunk {
235    fn range_end(&self) -> u64 {
236        match self {
237            Self::NonOverlapping((range, _)) => *range.end(),
238            Self::Overlapping(range, _) => *range.end(),
239        }
240    }
241}
242
243/// Given a vector of index chunks, sort them and return an iterator of index chunks.
244///
245/// The iterator will yield chunks that are non-overlapping or a set of chunks
246/// that are overlapping.
247fn prep_index_chunks(mut chunks: Vec<IndexChunk>) -> impl Iterator<Item = RawIndexChunk> {
248    chunks.sort_by_key(|(range, _)| u64::MAX - *range.start());
249
250    let mut output = Vec::new();
251
252    // Start assuming non-overlapping in first chunk.
253    if let Some(first_chunk) = chunks.pop() {
254        output.push(RawIndexChunk::NonOverlapping(first_chunk));
255    } else {
256        // Early return for empty.
257        return output.into_iter();
258    }
259
260    let mut current_range = 0..=0;
261    let mut current_overlap = Vec::new();
262    while let Some(chunk) = chunks.pop() {
263        debug_assert_eq!(
264            current_overlap
265                .iter()
266                .map(|(range, _): &IndexChunk| *range.start())
267                .min()
268                .unwrap_or_default(),
269            *current_range.start(),
270        );
271        debug_assert_eq!(
272            current_overlap
273                .iter()
274                .map(|(range, _): &IndexChunk| *range.end())
275                .max()
276                .unwrap_or_default(),
277            *current_range.end(),
278        );
279
280        if current_overlap.is_empty() {
281            // We haven't found overlap yet.
282            let last_chunk_end = output.last().unwrap().range_end();
283            if *chunk.0.start() <= last_chunk_end {
284                // We have found overlap.
285                match output.pop().unwrap() {
286                    RawIndexChunk::NonOverlapping(chunk) => {
287                        current_overlap.push(chunk);
288                    }
289                    _ => unreachable!(),
290                }
291                current_overlap.push(chunk);
292
293                let range_start = *current_overlap.first().unwrap().0.start();
294                let range_end = *current_overlap
295                    .last()
296                    .unwrap()
297                    .0
298                    .end()
299                    .max(current_overlap.first().unwrap().0.end());
300                current_range = range_start..=range_end;
301            } else {
302                // We are still in non-overlapping space.
303                output.push(RawIndexChunk::NonOverlapping(chunk));
304            }
305        } else {
306            // We are making an overlap chunk
307            if chunk.0.start() <= current_range.end() {
308                // We are still in overlap.
309                let range_end = *chunk.0.end().max(current_range.end());
310                current_range = *current_range.start()..=range_end;
311
312                current_overlap.push(chunk);
313            } else {
314                // We have exited overlap.
315                output.push(RawIndexChunk::Overlapping(
316                    std::mem::replace(&mut current_range, 0..=0),
317                    std::mem::take(&mut current_overlap),
318                ));
319                output.push(RawIndexChunk::NonOverlapping(chunk));
320            }
321        }
322    }
323    debug_assert_eq!(
324        current_overlap
325            .iter()
326            .map(|(range, _): &IndexChunk| *range.start())
327            .min()
328            .unwrap_or_default(),
329        *current_range.start(),
330    );
331    debug_assert_eq!(
332        current_overlap
333            .iter()
334            .map(|(range, _): &IndexChunk| *range.end())
335            .max()
336            .unwrap_or_default(),
337        *current_range.end(),
338    );
339
340    if !current_overlap.is_empty() {
341        output.push(RawIndexChunk::Overlapping(
342            current_range.clone(),
343            current_overlap,
344        ));
345    }
346
347    output.into_iter()
348}
349
350fn merge_overlapping_chunks(overlapping_chunks: Vec<IndexChunk>) -> Result<IndexChunk> {
351    let total_capacity = overlapping_chunks
352        .iter()
353        .map(|(_, (row_ids, _))| row_ids.len())
354        .sum();
355    let mut values = Vec::with_capacity(total_capacity);
356    for (_, (row_ids, row_addrs)) in overlapping_chunks.iter() {
357        values.extend(row_ids.iter().zip(row_addrs.iter()));
358    }
359    values.sort_by_key(|(row_id, _)| *row_id);
360    let row_id_segment = U64Segment::from_iter(values.iter().map(|(row_id, _)| *row_id));
361    let address_segment = U64Segment::from_iter(values.iter().map(|(_, row_addr)| *row_addr));
362
363    let range = row_id_segment.range().unwrap();
364
365    Ok((range, (row_id_segment, address_segment)))
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371    use proptest::{prelude::Strategy, prop_assert_eq};
372
373    #[test]
374    fn test_new_index() {
375        let fragment_indices = vec![
376            FragmentRowIdIndex {
377                fragment_id: 10,
378                row_id_sequence: Arc::new(RowIdSequence(vec![
379                    U64Segment::Range(0..10),
380                    U64Segment::RangeWithHoles {
381                        range: 10..17,
382                        holes: vec![12, 15].into(),
383                    },
384                    U64Segment::SortedArray(vec![20, 25, 30].into()),
385                ])),
386                deletion_vector: Arc::new(DeletionVector::default()),
387            },
388            FragmentRowIdIndex {
389                fragment_id: 20,
390                row_id_sequence: Arc::new(RowIdSequence(vec![
391                    U64Segment::RangeWithBitmap {
392                        range: 17..20,
393                        bitmap: [true, false, true].as_slice().into(),
394                    },
395                    U64Segment::Array(vec![40, 50, 60].into()),
396                ])),
397                deletion_vector: Arc::new(DeletionVector::default()),
398            },
399        ];
400
401        let index = RowIdIndex::new(&fragment_indices).unwrap();
402
403        // Check various queries.
404        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
405        assert_eq!(index.get(15), None);
406        assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
407        assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
408        assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
409        assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
410        assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
411        assert_eq!(index.get(61), None);
412    }
413
414    #[test]
415    fn test_new_index_overlap() {
416        let fragment_indices = vec![
417            FragmentRowIdIndex {
418                fragment_id: 23,
419                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
420                    vec![3, 6, 9].into(),
421                )])),
422                deletion_vector: Arc::new(DeletionVector::default()),
423            },
424            FragmentRowIdIndex {
425                fragment_id: 42,
426                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
427                    vec![2, 5, 8].into(),
428                )])),
429                deletion_vector: Arc::new(DeletionVector::default()),
430            },
431            FragmentRowIdIndex {
432                fragment_id: 10,
433                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
434                    vec![1, 4, 7].into(),
435                )])),
436                deletion_vector: Arc::new(DeletionVector::default()),
437            },
438        ];
439
440        let index = RowIdIndex::new(&fragment_indices).unwrap();
441
442        // Check various queries.
443        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 0)));
444        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(42, 0)));
445        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(23, 0)));
446        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 1)));
447        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(42, 1)));
448        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(23, 1)));
449        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(10, 2)));
450        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(42, 2)));
451        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(23, 2)));
452    }
453
454    #[test]
455    fn test_new_index_unsorted_row_ids() {
456        // Test case with unsorted row ids within fragments
457        let fragment_indices = vec![
458            FragmentRowIdIndex {
459                fragment_id: 10,
460                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
461                    vec![9, 3, 6].into(), // Unsorted array
462                )])),
463                deletion_vector: Arc::new(DeletionVector::default()),
464            },
465            FragmentRowIdIndex {
466                fragment_id: 20,
467                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
468                    vec![8, 2, 5].into(), // Unsorted array
469                )])),
470                deletion_vector: Arc::new(DeletionVector::default()),
471            },
472            FragmentRowIdIndex {
473                fragment_id: 30,
474                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
475                    vec![7, 1, 4].into(), // Unsorted array
476                )])),
477                deletion_vector: Arc::new(DeletionVector::default()),
478            },
479        ];
480
481        let index = RowIdIndex::new(&fragment_indices).unwrap();
482
483        // Check that all row ids can be found regardless of their order in the segments
484        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(30, 1)));
485        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(20, 1)));
486        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(10, 1)));
487        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(30, 2)));
488        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 2)));
489        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(10, 2)));
490        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(30, 0)));
491        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(20, 0)));
492        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(10, 0)));
493
494        // Check that non-existent row ids return None
495        assert_eq!(index.get(0), None);
496        assert_eq!(index.get(10), None);
497    }
498
499    #[test]
500    fn test_new_index_partial_overlap() {
501        let fragment_indices = vec![
502            FragmentRowIdIndex {
503                fragment_id: 0,
504                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::RangeWithHoles {
505                    range: 0..100,
506                    holes: vec![50].into(),
507                }])),
508                deletion_vector: Arc::new(DeletionVector::default()),
509            },
510            FragmentRowIdIndex {
511                fragment_id: 1,
512                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(50..51)])),
513                deletion_vector: Arc::new(DeletionVector::default()),
514            },
515        ];
516
517        let index = RowIdIndex::new(&fragment_indices).unwrap();
518
519        // Check various queries.
520        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
521        assert_eq!(index.get(49), Some(RowAddress::new_from_parts(0, 49)));
522        assert_eq!(index.get(50), Some(RowAddress::new_from_parts(1, 0)));
523        assert_eq!(index.get(51), Some(RowAddress::new_from_parts(0, 50)));
524        assert_eq!(index.get(99), Some(RowAddress::new_from_parts(0, 98)));
525    }
526
527    #[test]
528    fn test_index_with_deletion_vector() {
529        let deletion_vector = DeletionVector::from_iter(vec![2, 3]);
530
531        let fragment_indices = vec![FragmentRowIdIndex {
532            fragment_id: 10,
533            row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..6)])),
534            deletion_vector: Arc::new(deletion_vector),
535        }];
536
537        let index = RowIdIndex::new(&fragment_indices).unwrap();
538
539        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
540        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 1)));
541        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
542        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(10, 5)));
543
544        assert_eq!(index.get(2), None);
545        assert_eq!(index.get(3), None);
546    }
547
548    #[test]
549    fn test_empty_fragment_sequences() {
550        let fragment_indices = vec![
551            FragmentRowIdIndex {
552                fragment_id: 10,
553                row_id_sequence: Arc::new(RowIdSequence(vec![])),
554                deletion_vector: Arc::new(DeletionVector::default()),
555            },
556            FragmentRowIdIndex {
557                fragment_id: 20,
558                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..8)])),
559                deletion_vector: Arc::new(DeletionVector::default()),
560            },
561        ];
562
563        let index = RowIdIndex::new(&fragment_indices).unwrap();
564
565        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
566        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(20, 2)));
567        assert_eq!(index.get(4), None);
568    }
569
570    #[test]
571    fn test_completely_empty_index() {
572        let fragment_indices = vec![];
573        let index = RowIdIndex::new(&fragment_indices).unwrap();
574
575        assert_eq!(index.get(0), None);
576        assert_eq!(index.get(100), None);
577    }
578
579    #[test]
580    fn test_non_overlapping_ranges() {
581        let fragment_indices = vec![
582            FragmentRowIdIndex {
583                fragment_id: 10,
584                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..5)])),
585                deletion_vector: Arc::new(DeletionVector::default()),
586            },
587            FragmentRowIdIndex {
588                fragment_id: 20,
589                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..10)])),
590                deletion_vector: Arc::new(DeletionVector::default()),
591            },
592            FragmentRowIdIndex {
593                fragment_id: 30,
594                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(10..15)])),
595                deletion_vector: Arc::new(DeletionVector::default()),
596            },
597        ];
598
599        let index = RowIdIndex::new(&fragment_indices).unwrap();
600
601        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
602        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
603        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
604        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(20, 4)));
605        assert_eq!(index.get(10), Some(RowAddress::new_from_parts(30, 0)));
606        assert_eq!(index.get(14), Some(RowAddress::new_from_parts(30, 4)));
607    }
608
609    fn arbitrary_row_ids(
610        num_fragments_range: std::ops::Range<usize>,
611        frag_size_range: std::ops::Range<usize>,
612    ) -> impl Strategy<Value = Vec<(u32, Arc<RowIdSequence>)>> {
613        let fragment_sizes = proptest::collection::vec(frag_size_range, num_fragments_range);
614        fragment_sizes.prop_flat_map(|fragment_sizes| {
615            let num_rows = fragment_sizes.iter().sum::<usize>() as u64;
616            let row_ids = 0..num_rows;
617            let row_ids = row_ids.collect::<Vec<_>>();
618            let row_ids_shuffled = proptest::strategy::Just(row_ids).prop_shuffle();
619            row_ids_shuffled.prop_map(move |row_ids| {
620                let mut sequences = Vec::with_capacity(fragment_sizes.len());
621                let mut i = 0;
622                for size in &fragment_sizes {
623                    let end = i + size;
624                    let sequence =
625                        RowIdSequence(vec![U64Segment::from_slice(row_ids[i..end].into())]);
626                    sequences.push((i as u32, Arc::new(sequence)));
627                    i = end;
628                }
629                sequences
630            })
631        })
632    }
633
634    #[test]
635    fn test_large_range_segments_no_deletions() {
636        // Simulates a real-world scenario: many fragments with large Range segments
637        // and no deletions. Before optimization, this would iterate over all rows
638        // (O(total_rows)). After optimization, it's O(num_fragments).
639        let rows_per_fragment = 250_000u64;
640        let num_fragments = 100u32;
641        let mut offset = 0u64;
642
643        let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
644            .map(|frag_id| {
645                let start = offset;
646                offset += rows_per_fragment;
647                FragmentRowIdIndex {
648                    fragment_id: frag_id,
649                    row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
650                        start..start + rows_per_fragment,
651                    )])),
652                    deletion_vector: Arc::new(DeletionVector::default()),
653                }
654            })
655            .collect();
656
657        let start = std::time::Instant::now();
658        let index = RowIdIndex::new(&fragment_indices).unwrap();
659        let elapsed = start.elapsed();
660
661        // Verify correctness at boundaries
662        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
663        assert_eq!(
664            index.get(rows_per_fragment - 1),
665            Some(RowAddress::new_from_parts(0, rows_per_fragment as u32 - 1))
666        );
667        assert_eq!(
668            index.get(rows_per_fragment),
669            Some(RowAddress::new_from_parts(1, 0))
670        );
671        let last_row = num_fragments as u64 * rows_per_fragment - 1;
672        assert_eq!(
673            index.get(last_row),
674            Some(RowAddress::new_from_parts(
675                num_fragments - 1,
676                rows_per_fragment as u32 - 1
677            ))
678        );
679        assert_eq!(index.get(last_row + 1), None);
680
681        // With the optimization, building an index for 25M rows across 100 fragments
682        // should complete in well under 1 second (typically < 1ms).
683        assert!(
684            elapsed.as_secs() < 1,
685            "Index build took {:?} for {} fragments x {} rows = {} total rows. \
686             This suggests the O(rows) -> O(fragments) optimization is not working.",
687            elapsed,
688            num_fragments,
689            rows_per_fragment,
690            num_fragments as u64 * rows_per_fragment,
691        );
692    }
693
694    #[test]
695    fn test_large_range_segments_with_deletions() {
696        let rows_per_fragment = 1_000u64;
697        let num_fragments = 10u32;
698        let mut offset = 0u64;
699
700        let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
701            .map(|frag_id| {
702                let start = offset;
703                offset += rows_per_fragment;
704
705                // Delete every 3rd row (offsets 0, 3, 6, ...) within each fragment.
706                let mut deleted = roaring::RoaringBitmap::new();
707                for i in (0..rows_per_fragment as u32).step_by(3) {
708                    deleted.insert(i);
709                }
710
711                FragmentRowIdIndex {
712                    fragment_id: frag_id,
713                    row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
714                        start..start + rows_per_fragment,
715                    )])),
716                    deletion_vector: Arc::new(DeletionVector::Bitmap(deleted)),
717                }
718            })
719            .collect();
720
721        let index = RowIdIndex::new(&fragment_indices).unwrap();
722
723        // Deleted rows (offset 0, 3, 6, ...) should not be found.
724        // Row ID 0 has offset 0 in fragment 0 -> deleted.
725        assert_eq!(index.get(0), None);
726        // Row ID 3 has offset 3 in fragment 0 -> deleted.
727        assert_eq!(index.get(3), None);
728
729        // Non-deleted rows should resolve correctly.
730        // Row ID 1 has offset 1 in fragment 0 -> address (frag=0, row=1).
731        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(0, 1)));
732        // Row ID 2 has offset 2 in fragment 0 -> address (frag=0, row=2).
733        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(0, 2)));
734        // Row ID 4 has offset 4 in fragment 0 -> address (frag=0, row=4).
735        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(0, 4)));
736
737        // Check second fragment: row IDs start at 1000.
738        // Row ID 1000 has offset 0 in fragment 1 -> deleted.
739        assert_eq!(index.get(rows_per_fragment), None);
740        // Row ID 1001 has offset 1 in fragment 1 -> address (frag=1, row=1).
741        assert_eq!(
742            index.get(rows_per_fragment + 1),
743            Some(RowAddress::new_from_parts(1, 1))
744        );
745
746        // Last fragment, last non-deleted row.
747        // Row ID 9999 has offset 999 in fragment 9 -> 999 % 3 == 0 -> deleted.
748        let last_row = num_fragments as u64 * rows_per_fragment - 1;
749        assert_eq!(index.get(last_row), None);
750        // Row ID 9998 has offset 998 -> 998 % 3 == 2 -> not deleted.
751        assert_eq!(
752            index.get(last_row - 1),
753            Some(RowAddress::new_from_parts(num_fragments - 1, 998))
754        );
755
756        // Out of range.
757        assert_eq!(index.get(last_row + 1), None);
758    }
759
760    proptest::proptest! {
761        #[test]
762        fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {
763            let fragment_indices: Vec<FragmentRowIdIndex> = row_ids
764                .iter()
765                .map(|(frag_id, sequence)| FragmentRowIdIndex {
766                    fragment_id: *frag_id,
767                    row_id_sequence: sequence.clone(),
768                    deletion_vector: Arc::new(DeletionVector::default()),
769                })
770                .collect();
771
772            let index = RowIdIndex::new(&fragment_indices).unwrap();
773            for (frag_id, sequence) in row_ids.iter() {
774                for (local_offset, row_id) in sequence.iter().enumerate() {
775                    prop_assert_eq!(
776                        index.get(row_id),
777                        Some(RowAddress::new_from_parts(*frag_id, local_offset as u32)),
778                        "Row id {} in sequence {:?} not found in index {:?}",
779                        row_id,
780                        sequence,
781                        index
782                    );
783                }
784            }
785        }
786    }
787}