Skip to main content

lance_table/rowids/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use super::{RowIdSequence, U64Segment};
8use deepsize::DeepSizeOf;
9use lance_core::Result;
10use lance_core::utils::address::RowAddress;
11use lance_core::utils::deletion::DeletionVector;
12use rangemap::RangeInclusiveMap;
13
14/// An index of row ids
15///
16/// This index is used to map row ids to their corresponding addresses. These
17/// addresses correspond to physical positions in the dataset. See [RowAddress].
18///
19/// This structure only contains rows that physically exist. However, it may
20/// map to addresses that have been tombstoned. A separate tombstone index is
21/// used to track tombstoned rows.
22// (Implementation)
23// Disjoint ranges of row ids are stored as the keys of the map. The values are
24// a pair of segments. The first segment is the row ids, and the second segment
25// is the addresses.
26#[derive(Debug)]
27pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
28
29pub struct FragmentRowIdIndex {
30    pub fragment_id: u32,
31    pub row_id_sequence: Arc<RowIdSequence>,
32    pub deletion_vector: Arc<DeletionVector>,
33}
34
35impl RowIdIndex {
36    /// Create a new index from a list of fragment ids and their corresponding row id sequences.
37    pub fn new(fragment_indices: &[FragmentRowIdIndex]) -> Result<Self> {
38        let chunks = fragment_indices
39            .iter()
40            .flat_map(decompose_sequence)
41            .collect::<Vec<_>>();
42
43        let mut final_chunks = Vec::new();
44        for processed_chunk in prep_index_chunks(chunks) {
45            match processed_chunk {
46                RawIndexChunk::NonOverlapping(chunk) => {
47                    final_chunks.push(chunk);
48                }
49                RawIndexChunk::Overlapping(range, overlapping_chunks) => {
50                    debug_assert_eq!(
51                        range.end() - range.start() + 1,
52                        overlapping_chunks
53                            .iter()
54                            .map(|(_, (seq, _))| seq.len() as u64)
55                            .sum::<u64>(),
56                        "Wrong range for {:?}, chunks: {:?}",
57                        range,
58                        overlapping_chunks,
59                    );
60                    // Merge overlapping chunks.
61                    let merged_chunk = merge_overlapping_chunks(overlapping_chunks)?;
62                    final_chunks.push(merged_chunk);
63                }
64            }
65        }
66
67        Ok(Self(RangeInclusiveMap::from_iter(final_chunks)))
68    }
69
70    /// Get the address for a given row id.
71    ///
72    /// Will return None if the row id does not exist in the index.
73    pub fn get(&self, row_id: u64) -> Option<RowAddress> {
74        let (row_id_segment, address_segment) = self.0.get(&row_id)?;
75        let pos = row_id_segment.position(row_id)?;
76        let address = address_segment.get(pos)?;
77        Some(RowAddress::from(address))
78    }
79}
80
81impl DeepSizeOf for RowIdIndex {
82    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
83        self.0
84            .iter()
85            .map(|(_, (row_id_segment, address_segment))| {
86                (2 * std::mem::size_of::<u64>())
87                    + std::mem::size_of::<(U64Segment, U64Segment)>()
88                    + row_id_segment.deep_size_of_children(context)
89                    + address_segment.deep_size_of_children(context)
90            })
91            .sum()
92    }
93}
94
95fn decompose_sequence(
96    frag_index: &FragmentRowIdIndex,
97) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
98    let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into();
99    let mut current_offset = 0u32;
100    let no_deletions = frag_index.deletion_vector.is_empty();
101
102    frag_index
103        .row_id_sequence
104        .0
105        .iter()
106        .filter_map(|segment| {
107            let segment_len = segment.len();
108
109            let result = if no_deletions {
110                decompose_segment_no_deletions(segment, start_address)
111            } else {
112                decompose_segment_with_deletions(
113                    segment,
114                    start_address,
115                    current_offset,
116                    &frag_index.deletion_vector,
117                )
118            };
119
120            current_offset += segment_len as u32;
121            start_address += segment_len as u64;
122
123            result
124        })
125        .collect()
126}
127
128/// Build an IndexChunk from a list of (row_id, address) pairs.
129fn build_chunk_from_pairs(pairs: Vec<(u64, u64)>) -> Option<IndexChunk> {
130    if pairs.is_empty() {
131        return None;
132    }
133    let (row_ids, addresses): (Vec<u64>, Vec<u64>) = pairs.into_iter().unzip();
134    let row_id_segment = U64Segment::from_iter(row_ids);
135    let address_segment = U64Segment::from_iter(addresses);
136    let coverage = row_id_segment.range()?;
137    Some((coverage, (row_id_segment, address_segment)))
138}
139
140/// Fast path: no deletions. O(1) for Range segments.
141fn decompose_segment_no_deletions(segment: &U64Segment, start_address: u64) -> Option<IndexChunk> {
142    match segment {
143        U64Segment::Range(range) if !range.is_empty() => {
144            let len = range.end - range.start;
145            let row_id_segment = U64Segment::Range(range.clone());
146            let address_segment = U64Segment::Range(start_address..start_address + len);
147            let coverage = range.start..=range.end - 1;
148            Some((coverage, (row_id_segment, address_segment)))
149        }
150        _ if segment.is_empty() => None,
151        _ => {
152            // Non-Range segments: must iterate to build address mapping.
153            let pairs: Vec<(u64, u64)> = segment
154                .iter()
155                .enumerate()
156                .map(|(i, row_id)| (row_id, start_address + i as u64))
157                .collect();
158            build_chunk_from_pairs(pairs)
159        }
160    }
161}
162
163/// Slow path: has deletions, must check each row.
164fn decompose_segment_with_deletions(
165    segment: &U64Segment,
166    start_address: u64,
167    current_offset: u32,
168    deletion_vector: &DeletionVector,
169) -> Option<IndexChunk> {
170    let pairs: Vec<(u64, u64)> = segment
171        .iter()
172        .enumerate()
173        .filter_map(|(i, row_id)| {
174            let row_offset = current_offset + i as u32;
175            if !deletion_vector.contains(row_offset) {
176                Some((row_id, start_address + i as u64))
177            } else {
178                None
179            }
180        })
181        .collect();
182    build_chunk_from_pairs(pairs)
183}
184
185type IndexChunk = (RangeInclusive<u64>, (U64Segment, U64Segment));
186
187#[derive(Debug)]
188enum RawIndexChunk {
189    NonOverlapping(IndexChunk),
190    Overlapping(RangeInclusive<u64>, Vec<IndexChunk>),
191}
192
193impl RawIndexChunk {
194    fn range_end(&self) -> u64 {
195        match self {
196            Self::NonOverlapping((range, _)) => *range.end(),
197            Self::Overlapping(range, _) => *range.end(),
198        }
199    }
200}
201
202/// Given a vector of index chunks, sort them and return an iterator of index chunks.
203///
204/// The iterator will yield chunks that are non-overlapping or a set of chunks
205/// that are overlapping.
206fn prep_index_chunks(mut chunks: Vec<IndexChunk>) -> impl Iterator<Item = RawIndexChunk> {
207    chunks.sort_by_key(|(range, _)| u64::MAX - *range.start());
208
209    let mut output = Vec::new();
210
211    // Start assuming non-overlapping in first chunk.
212    if let Some(first_chunk) = chunks.pop() {
213        output.push(RawIndexChunk::NonOverlapping(first_chunk));
214    } else {
215        // Early return for empty.
216        return output.into_iter();
217    }
218
219    let mut current_range = 0..=0;
220    let mut current_overlap = Vec::new();
221    while let Some(chunk) = chunks.pop() {
222        debug_assert_eq!(
223            current_overlap
224                .iter()
225                .map(|(range, _): &IndexChunk| *range.start())
226                .min()
227                .unwrap_or_default(),
228            *current_range.start(),
229        );
230        debug_assert_eq!(
231            current_overlap
232                .iter()
233                .map(|(range, _): &IndexChunk| *range.end())
234                .max()
235                .unwrap_or_default(),
236            *current_range.end(),
237        );
238
239        if current_overlap.is_empty() {
240            // We haven't found overlap yet.
241            let last_chunk_end = output.last().unwrap().range_end();
242            if *chunk.0.start() <= last_chunk_end {
243                // We have found overlap.
244                match output.pop().unwrap() {
245                    RawIndexChunk::NonOverlapping(chunk) => {
246                        current_overlap.push(chunk);
247                    }
248                    _ => unreachable!(),
249                }
250                current_overlap.push(chunk);
251
252                let range_start = *current_overlap.first().unwrap().0.start();
253                let range_end = *current_overlap
254                    .last()
255                    .unwrap()
256                    .0
257                    .end()
258                    .max(current_overlap.first().unwrap().0.end());
259                current_range = range_start..=range_end;
260            } else {
261                // We are still in non-overlapping space.
262                output.push(RawIndexChunk::NonOverlapping(chunk));
263            }
264        } else {
265            // We are making an overlap chunk
266            if chunk.0.start() <= current_range.end() {
267                // We are still in overlap.
268                let range_end = *chunk.0.end().max(current_range.end());
269                current_range = *current_range.start()..=range_end;
270
271                current_overlap.push(chunk);
272            } else {
273                // We have exited overlap.
274                output.push(RawIndexChunk::Overlapping(
275                    std::mem::replace(&mut current_range, 0..=0),
276                    std::mem::take(&mut current_overlap),
277                ));
278                output.push(RawIndexChunk::NonOverlapping(chunk));
279            }
280        }
281    }
282    debug_assert_eq!(
283        current_overlap
284            .iter()
285            .map(|(range, _): &IndexChunk| *range.start())
286            .min()
287            .unwrap_or_default(),
288        *current_range.start(),
289    );
290    debug_assert_eq!(
291        current_overlap
292            .iter()
293            .map(|(range, _): &IndexChunk| *range.end())
294            .max()
295            .unwrap_or_default(),
296        *current_range.end(),
297    );
298
299    if !current_overlap.is_empty() {
300        output.push(RawIndexChunk::Overlapping(
301            current_range.clone(),
302            current_overlap,
303        ));
304    }
305
306    output.into_iter()
307}
308
309fn merge_overlapping_chunks(overlapping_chunks: Vec<IndexChunk>) -> Result<IndexChunk> {
310    let total_capacity = overlapping_chunks
311        .iter()
312        .map(|(_, (row_ids, _))| row_ids.len())
313        .sum();
314    let mut values = Vec::with_capacity(total_capacity);
315    for (_, (row_ids, row_addrs)) in overlapping_chunks.iter() {
316        values.extend(row_ids.iter().zip(row_addrs.iter()));
317    }
318    values.sort_by_key(|(row_id, _)| *row_id);
319    let row_id_segment = U64Segment::from_iter(values.iter().map(|(row_id, _)| *row_id));
320    let address_segment = U64Segment::from_iter(values.iter().map(|(_, row_addr)| *row_addr));
321
322    let range = row_id_segment.range().unwrap();
323
324    Ok((range, (row_id_segment, address_segment)))
325}
326
327#[cfg(test)]
328mod tests {
329    use super::*;
330    use proptest::{prelude::Strategy, prop_assert_eq};
331
332    #[test]
333    fn test_new_index() {
334        let fragment_indices = vec![
335            FragmentRowIdIndex {
336                fragment_id: 10,
337                row_id_sequence: Arc::new(RowIdSequence(vec![
338                    U64Segment::Range(0..10),
339                    U64Segment::RangeWithHoles {
340                        range: 10..17,
341                        holes: vec![12, 15].into(),
342                    },
343                    U64Segment::SortedArray(vec![20, 25, 30].into()),
344                ])),
345                deletion_vector: Arc::new(DeletionVector::default()),
346            },
347            FragmentRowIdIndex {
348                fragment_id: 20,
349                row_id_sequence: Arc::new(RowIdSequence(vec![
350                    U64Segment::RangeWithBitmap {
351                        range: 17..20,
352                        bitmap: [true, false, true].as_slice().into(),
353                    },
354                    U64Segment::Array(vec![40, 50, 60].into()),
355                ])),
356                deletion_vector: Arc::new(DeletionVector::default()),
357            },
358        ];
359
360        let index = RowIdIndex::new(&fragment_indices).unwrap();
361
362        // Check various queries.
363        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
364        assert_eq!(index.get(15), None);
365        assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
366        assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
367        assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
368        assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
369        assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
370        assert_eq!(index.get(61), None);
371    }
372
373    #[test]
374    fn test_new_index_overlap() {
375        let fragment_indices = vec![
376            FragmentRowIdIndex {
377                fragment_id: 23,
378                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
379                    vec![3, 6, 9].into(),
380                )])),
381                deletion_vector: Arc::new(DeletionVector::default()),
382            },
383            FragmentRowIdIndex {
384                fragment_id: 42,
385                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
386                    vec![2, 5, 8].into(),
387                )])),
388                deletion_vector: Arc::new(DeletionVector::default()),
389            },
390            FragmentRowIdIndex {
391                fragment_id: 10,
392                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
393                    vec![1, 4, 7].into(),
394                )])),
395                deletion_vector: Arc::new(DeletionVector::default()),
396            },
397        ];
398
399        let index = RowIdIndex::new(&fragment_indices).unwrap();
400
401        // Check various queries.
402        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 0)));
403        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(42, 0)));
404        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(23, 0)));
405        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 1)));
406        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(42, 1)));
407        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(23, 1)));
408        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(10, 2)));
409        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(42, 2)));
410        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(23, 2)));
411    }
412
413    #[test]
414    fn test_new_index_unsorted_row_ids() {
415        // Test case with unsorted row ids within fragments
416        let fragment_indices = vec![
417            FragmentRowIdIndex {
418                fragment_id: 10,
419                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
420                    vec![9, 3, 6].into(), // Unsorted array
421                )])),
422                deletion_vector: Arc::new(DeletionVector::default()),
423            },
424            FragmentRowIdIndex {
425                fragment_id: 20,
426                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
427                    vec![8, 2, 5].into(), // Unsorted array
428                )])),
429                deletion_vector: Arc::new(DeletionVector::default()),
430            },
431            FragmentRowIdIndex {
432                fragment_id: 30,
433                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
434                    vec![7, 1, 4].into(), // Unsorted array
435                )])),
436                deletion_vector: Arc::new(DeletionVector::default()),
437            },
438        ];
439
440        let index = RowIdIndex::new(&fragment_indices).unwrap();
441
442        // Check that all row ids can be found regardless of their order in the segments
443        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(30, 1)));
444        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(20, 1)));
445        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(10, 1)));
446        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(30, 2)));
447        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 2)));
448        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(10, 2)));
449        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(30, 0)));
450        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(20, 0)));
451        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(10, 0)));
452
453        // Check that non-existent row ids return None
454        assert_eq!(index.get(0), None);
455        assert_eq!(index.get(10), None);
456    }
457
458    #[test]
459    fn test_new_index_partial_overlap() {
460        let fragment_indices = vec![
461            FragmentRowIdIndex {
462                fragment_id: 0,
463                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::RangeWithHoles {
464                    range: 0..100,
465                    holes: vec![50].into(),
466                }])),
467                deletion_vector: Arc::new(DeletionVector::default()),
468            },
469            FragmentRowIdIndex {
470                fragment_id: 1,
471                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(50..51)])),
472                deletion_vector: Arc::new(DeletionVector::default()),
473            },
474        ];
475
476        let index = RowIdIndex::new(&fragment_indices).unwrap();
477
478        // Check various queries.
479        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
480        assert_eq!(index.get(49), Some(RowAddress::new_from_parts(0, 49)));
481        assert_eq!(index.get(50), Some(RowAddress::new_from_parts(1, 0)));
482        assert_eq!(index.get(51), Some(RowAddress::new_from_parts(0, 50)));
483        assert_eq!(index.get(99), Some(RowAddress::new_from_parts(0, 98)));
484    }
485
486    #[test]
487    fn test_index_with_deletion_vector() {
488        let deletion_vector = DeletionVector::from_iter(vec![2, 3]);
489
490        let fragment_indices = vec![FragmentRowIdIndex {
491            fragment_id: 10,
492            row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..6)])),
493            deletion_vector: Arc::new(deletion_vector),
494        }];
495
496        let index = RowIdIndex::new(&fragment_indices).unwrap();
497
498        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
499        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 1)));
500        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
501        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(10, 5)));
502
503        assert_eq!(index.get(2), None);
504        assert_eq!(index.get(3), None);
505    }
506
507    #[test]
508    fn test_empty_fragment_sequences() {
509        let fragment_indices = vec![
510            FragmentRowIdIndex {
511                fragment_id: 10,
512                row_id_sequence: Arc::new(RowIdSequence(vec![])),
513                deletion_vector: Arc::new(DeletionVector::default()),
514            },
515            FragmentRowIdIndex {
516                fragment_id: 20,
517                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..8)])),
518                deletion_vector: Arc::new(DeletionVector::default()),
519            },
520        ];
521
522        let index = RowIdIndex::new(&fragment_indices).unwrap();
523
524        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
525        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(20, 2)));
526        assert_eq!(index.get(4), None);
527    }
528
529    #[test]
530    fn test_completely_empty_index() {
531        let fragment_indices = vec![];
532        let index = RowIdIndex::new(&fragment_indices).unwrap();
533
534        assert_eq!(index.get(0), None);
535        assert_eq!(index.get(100), None);
536    }
537
538    #[test]
539    fn test_non_overlapping_ranges() {
540        let fragment_indices = vec![
541            FragmentRowIdIndex {
542                fragment_id: 10,
543                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..5)])),
544                deletion_vector: Arc::new(DeletionVector::default()),
545            },
546            FragmentRowIdIndex {
547                fragment_id: 20,
548                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..10)])),
549                deletion_vector: Arc::new(DeletionVector::default()),
550            },
551            FragmentRowIdIndex {
552                fragment_id: 30,
553                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(10..15)])),
554                deletion_vector: Arc::new(DeletionVector::default()),
555            },
556        ];
557
558        let index = RowIdIndex::new(&fragment_indices).unwrap();
559
560        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
561        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
562        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
563        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(20, 4)));
564        assert_eq!(index.get(10), Some(RowAddress::new_from_parts(30, 0)));
565        assert_eq!(index.get(14), Some(RowAddress::new_from_parts(30, 4)));
566    }
567
568    fn arbitrary_row_ids(
569        num_fragments_range: std::ops::Range<usize>,
570        frag_size_range: std::ops::Range<usize>,
571    ) -> impl Strategy<Value = Vec<(u32, Arc<RowIdSequence>)>> {
572        let fragment_sizes = proptest::collection::vec(frag_size_range, num_fragments_range);
573        fragment_sizes.prop_flat_map(|fragment_sizes| {
574            let num_rows = fragment_sizes.iter().sum::<usize>() as u64;
575            let row_ids = 0..num_rows;
576            let row_ids = row_ids.collect::<Vec<_>>();
577            let row_ids_shuffled = proptest::strategy::Just(row_ids).prop_shuffle();
578            row_ids_shuffled.prop_map(move |row_ids| {
579                let mut sequences = Vec::with_capacity(fragment_sizes.len());
580                let mut i = 0;
581                for size in &fragment_sizes {
582                    let end = i + size;
583                    let sequence =
584                        RowIdSequence(vec![U64Segment::from_slice(row_ids[i..end].into())]);
585                    sequences.push((i as u32, Arc::new(sequence)));
586                    i = end;
587                }
588                sequences
589            })
590        })
591    }
592
593    #[test]
594    fn test_large_range_segments_no_deletions() {
595        // Simulates a real-world scenario: many fragments with large Range segments
596        // and no deletions. Before optimization, this would iterate over all rows
597        // (O(total_rows)). After optimization, it's O(num_fragments).
598        let rows_per_fragment = 250_000u64;
599        let num_fragments = 100u32;
600        let mut offset = 0u64;
601
602        let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
603            .map(|frag_id| {
604                let start = offset;
605                offset += rows_per_fragment;
606                FragmentRowIdIndex {
607                    fragment_id: frag_id,
608                    row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
609                        start..start + rows_per_fragment,
610                    )])),
611                    deletion_vector: Arc::new(DeletionVector::default()),
612                }
613            })
614            .collect();
615
616        let start = std::time::Instant::now();
617        let index = RowIdIndex::new(&fragment_indices).unwrap();
618        let elapsed = start.elapsed();
619
620        // Verify correctness at boundaries
621        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
622        assert_eq!(
623            index.get(rows_per_fragment - 1),
624            Some(RowAddress::new_from_parts(0, rows_per_fragment as u32 - 1))
625        );
626        assert_eq!(
627            index.get(rows_per_fragment),
628            Some(RowAddress::new_from_parts(1, 0))
629        );
630        let last_row = num_fragments as u64 * rows_per_fragment - 1;
631        assert_eq!(
632            index.get(last_row),
633            Some(RowAddress::new_from_parts(
634                num_fragments - 1,
635                rows_per_fragment as u32 - 1
636            ))
637        );
638        assert_eq!(index.get(last_row + 1), None);
639
640        // With the optimization, building an index for 25M rows across 100 fragments
641        // should complete in well under 1 second (typically < 1ms).
642        assert!(
643            elapsed.as_secs() < 1,
644            "Index build took {:?} for {} fragments x {} rows = {} total rows. \
645             This suggests the O(rows) -> O(fragments) optimization is not working.",
646            elapsed,
647            num_fragments,
648            rows_per_fragment,
649            num_fragments as u64 * rows_per_fragment,
650        );
651    }
652
653    #[test]
654    fn test_large_range_segments_with_deletions() {
655        let rows_per_fragment = 1_000u64;
656        let num_fragments = 10u32;
657        let mut offset = 0u64;
658
659        let fragment_indices: Vec<FragmentRowIdIndex> = (0..num_fragments)
660            .map(|frag_id| {
661                let start = offset;
662                offset += rows_per_fragment;
663
664                // Delete every 3rd row (offsets 0, 3, 6, ...) within each fragment.
665                let mut deleted = roaring::RoaringBitmap::new();
666                for i in (0..rows_per_fragment as u32).step_by(3) {
667                    deleted.insert(i);
668                }
669
670                FragmentRowIdIndex {
671                    fragment_id: frag_id,
672                    row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(
673                        start..start + rows_per_fragment,
674                    )])),
675                    deletion_vector: Arc::new(DeletionVector::Bitmap(deleted)),
676                }
677            })
678            .collect();
679
680        let index = RowIdIndex::new(&fragment_indices).unwrap();
681
682        // Deleted rows (offset 0, 3, 6, ...) should not be found.
683        // Row ID 0 has offset 0 in fragment 0 -> deleted.
684        assert_eq!(index.get(0), None);
685        // Row ID 3 has offset 3 in fragment 0 -> deleted.
686        assert_eq!(index.get(3), None);
687
688        // Non-deleted rows should resolve correctly.
689        // Row ID 1 has offset 1 in fragment 0 -> address (frag=0, row=1).
690        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(0, 1)));
691        // Row ID 2 has offset 2 in fragment 0 -> address (frag=0, row=2).
692        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(0, 2)));
693        // Row ID 4 has offset 4 in fragment 0 -> address (frag=0, row=4).
694        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(0, 4)));
695
696        // Check second fragment: row IDs start at 1000.
697        // Row ID 1000 has offset 0 in fragment 1 -> deleted.
698        assert_eq!(index.get(rows_per_fragment), None);
699        // Row ID 1001 has offset 1 in fragment 1 -> address (frag=1, row=1).
700        assert_eq!(
701            index.get(rows_per_fragment + 1),
702            Some(RowAddress::new_from_parts(1, 1))
703        );
704
705        // Last fragment, last non-deleted row.
706        // Row ID 9999 has offset 999 in fragment 9 -> 999 % 3 == 0 -> deleted.
707        let last_row = num_fragments as u64 * rows_per_fragment - 1;
708        assert_eq!(index.get(last_row), None);
709        // Row ID 9998 has offset 998 -> 998 % 3 == 2 -> not deleted.
710        assert_eq!(
711            index.get(last_row - 1),
712            Some(RowAddress::new_from_parts(num_fragments - 1, 998))
713        );
714
715        // Out of range.
716        assert_eq!(index.get(last_row + 1), None);
717    }
718
719    proptest::proptest! {
720        #[test]
721        fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {
722            let fragment_indices: Vec<FragmentRowIdIndex> = row_ids
723                .iter()
724                .map(|(frag_id, sequence)| FragmentRowIdIndex {
725                    fragment_id: *frag_id,
726                    row_id_sequence: sequence.clone(),
727                    deletion_vector: Arc::new(DeletionVector::default()),
728                })
729                .collect();
730
731            let index = RowIdIndex::new(&fragment_indices).unwrap();
732            for (frag_id, sequence) in row_ids.iter() {
733                for (local_offset, row_id) in sequence.iter().enumerate() {
734                    prop_assert_eq!(
735                        index.get(row_id),
736                        Some(RowAddress::new_from_parts(*frag_id, local_offset as u32)),
737                        "Row id {} in sequence {:?} not found in index {:?}",
738                        row_id,
739                        sequence,
740                        index
741                    );
742                }
743            }
744        }
745    }
746}