lance_table/rowids/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use super::{RowIdSequence, U64Segment};
8use deepsize::DeepSizeOf;
9use lance_core::utils::address::RowAddress;
10use lance_core::utils::deletion::DeletionVector;
11use lance_core::Result;
12use rangemap::RangeInclusiveMap;
13
14/// An index of row ids
15///
16/// This index is used to map row ids to their corresponding addresses. These
17/// addresses correspond to physical positions in the dataset. See [RowAddress].
18///
19/// This structure only contains rows that physically exist. However, it may
20/// map to addresses that have been tombstoned. A separate tombstone index is
21/// used to track tombstoned rows.
22// (Implementation)
23// Disjoint ranges of row ids are stored as the keys of the map. The values are
24// a pair of segments. The first segment is the row ids, and the second segment
25// is the addresses.
26#[derive(Debug)]
27pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
28
29pub struct FragmentRowIdIndex {
30    pub fragment_id: u32,
31    pub row_id_sequence: Arc<RowIdSequence>,
32    pub deletion_vector: Arc<DeletionVector>,
33}
34
35impl RowIdIndex {
36    /// Create a new index from a list of fragment ids and their corresponding row id sequences.
37    pub fn new(fragment_indices: &[FragmentRowIdIndex]) -> Result<Self> {
38        let chunks = fragment_indices
39            .iter()
40            .flat_map(decompose_sequence)
41            .collect::<Vec<_>>();
42
43        let mut final_chunks = Vec::new();
44        for processed_chunk in prep_index_chunks(chunks) {
45            match processed_chunk {
46                RawIndexChunk::NonOverlapping(chunk) => {
47                    final_chunks.push(chunk);
48                }
49                RawIndexChunk::Overlapping(range, overlapping_chunks) => {
50                    debug_assert_eq!(
51                        range.end() - range.start() + 1,
52                        overlapping_chunks
53                            .iter()
54                            .map(|(_, (seq, _))| seq.len() as u64)
55                            .sum::<u64>(),
56                        "Wrong range for {:?}, chunks: {:?}",
57                        range,
58                        overlapping_chunks,
59                    );
60                    // Merge overlapping chunks.
61                    let merged_chunk = merge_overlapping_chunks(overlapping_chunks)?;
62                    final_chunks.push(merged_chunk);
63                }
64            }
65        }
66
67        Ok(Self(RangeInclusiveMap::from_iter(final_chunks)))
68    }
69
70    /// Get the address for a given row id.
71    ///
72    /// Will return None if the row id does not exist in the index.
73    pub fn get(&self, row_id: u64) -> Option<RowAddress> {
74        let (row_id_segment, address_segment) = self.0.get(&row_id)?;
75        let pos = row_id_segment.position(row_id)?;
76        let address = address_segment.get(pos)?;
77        Some(RowAddress::from(address))
78    }
79}
80
81impl DeepSizeOf for RowIdIndex {
82    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
83        self.0
84            .iter()
85            .map(|(_, (row_id_segment, address_segment))| {
86                (2 * std::mem::size_of::<u64>())
87                    + std::mem::size_of::<(U64Segment, U64Segment)>()
88                    + row_id_segment.deep_size_of_children(context)
89                    + address_segment.deep_size_of_children(context)
90            })
91            .sum()
92    }
93}
94
95fn decompose_sequence(
96    frag_index: &FragmentRowIdIndex,
97) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
98    let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into();
99    let mut current_offset = 0u32;
100
101    frag_index
102        .row_id_sequence
103        .0
104        .iter()
105        .filter_map(|segment| {
106            let segment_len = segment.len();
107
108            let active_pairs: Vec<(u64, u64)> = segment
109                .iter()
110                .enumerate()
111                .filter_map(|(i, row_id)| {
112                    let row_offset = current_offset + i as u32;
113                    if !frag_index.deletion_vector.contains(row_offset) {
114                        let address = start_address + i as u64;
115                        Some((row_id, address))
116                    } else {
117                        None
118                    }
119                })
120                .collect();
121
122            current_offset += segment_len as u32;
123            start_address += segment_len as u64;
124
125            if active_pairs.is_empty() {
126                return None;
127            }
128
129            let row_ids: Vec<u64> = active_pairs.iter().map(|(rid, _)| *rid).collect();
130            let addresses: Vec<u64> = active_pairs.iter().map(|(_, addr)| *addr).collect();
131
132            let row_id_segment = U64Segment::from_iter(row_ids.iter().copied());
133            let address_segment = U64Segment::from_iter(addresses.iter().copied());
134
135            let coverage = row_id_segment.range()?;
136
137            Some((coverage, (row_id_segment, address_segment)))
138        })
139        .collect()
140}
141
142type IndexChunk = (RangeInclusive<u64>, (U64Segment, U64Segment));
143
144#[derive(Debug)]
145enum RawIndexChunk {
146    NonOverlapping(IndexChunk),
147    Overlapping(RangeInclusive<u64>, Vec<IndexChunk>),
148}
149
150impl RawIndexChunk {
151    fn range_end(&self) -> u64 {
152        match self {
153            Self::NonOverlapping((range, _)) => *range.end(),
154            Self::Overlapping(range, _) => *range.end(),
155        }
156    }
157}
158
159/// Given a vector of index chunks, sort them and return an iterator of index chunks.
160///
161/// The iterator will yield chunks that are non-overlapping or a set of chunks
162/// that are overlapping.
163fn prep_index_chunks(mut chunks: Vec<IndexChunk>) -> impl Iterator<Item = RawIndexChunk> {
164    chunks.sort_by_key(|(range, _)| u64::MAX - *range.start());
165
166    let mut output = Vec::new();
167
168    // Start assuming non-overlapping in first chunk.
169    if let Some(first_chunk) = chunks.pop() {
170        output.push(RawIndexChunk::NonOverlapping(first_chunk));
171    } else {
172        // Early return for empty.
173        return output.into_iter();
174    }
175
176    let mut current_range = 0..=0;
177    let mut current_overlap = Vec::new();
178    while let Some(chunk) = chunks.pop() {
179        debug_assert_eq!(
180            current_overlap
181                .iter()
182                .map(|(range, _): &IndexChunk| *range.start())
183                .min()
184                .unwrap_or_default(),
185            *current_range.start(),
186        );
187        debug_assert_eq!(
188            current_overlap
189                .iter()
190                .map(|(range, _): &IndexChunk| *range.end())
191                .max()
192                .unwrap_or_default(),
193            *current_range.end(),
194        );
195
196        if current_overlap.is_empty() {
197            // We haven't found overlap yet.
198            let last_chunk_end = output.last().unwrap().range_end();
199            if *chunk.0.start() <= last_chunk_end {
200                // We have found overlap.
201                match output.pop().unwrap() {
202                    RawIndexChunk::NonOverlapping(chunk) => {
203                        current_overlap.push(chunk);
204                    }
205                    _ => unreachable!(),
206                }
207                current_overlap.push(chunk);
208
209                let range_start = *current_overlap.first().unwrap().0.start();
210                let range_end = *current_overlap
211                    .last()
212                    .unwrap()
213                    .0
214                    .end()
215                    .max(current_overlap.first().unwrap().0.end());
216                current_range = range_start..=range_end;
217            } else {
218                // We are still in non-overlapping space.
219                output.push(RawIndexChunk::NonOverlapping(chunk));
220            }
221        } else {
222            // We are making an overlap chunk
223            if chunk.0.start() <= current_range.end() {
224                // We are still in overlap.
225                let range_end = *chunk.0.end().max(current_range.end());
226                current_range = *current_range.start()..=range_end;
227
228                current_overlap.push(chunk);
229            } else {
230                // We have exited overlap.
231                output.push(RawIndexChunk::Overlapping(
232                    std::mem::replace(&mut current_range, 0..=0),
233                    std::mem::take(&mut current_overlap),
234                ));
235                output.push(RawIndexChunk::NonOverlapping(chunk));
236            }
237        }
238    }
239    debug_assert_eq!(
240        current_overlap
241            .iter()
242            .map(|(range, _): &IndexChunk| *range.start())
243            .min()
244            .unwrap_or_default(),
245        *current_range.start(),
246    );
247    debug_assert_eq!(
248        current_overlap
249            .iter()
250            .map(|(range, _): &IndexChunk| *range.end())
251            .max()
252            .unwrap_or_default(),
253        *current_range.end(),
254    );
255
256    if !current_overlap.is_empty() {
257        output.push(RawIndexChunk::Overlapping(
258            current_range.clone(),
259            current_overlap,
260        ));
261    }
262
263    output.into_iter()
264}
265
266fn merge_overlapping_chunks(overlapping_chunks: Vec<IndexChunk>) -> Result<IndexChunk> {
267    let total_capacity = overlapping_chunks
268        .iter()
269        .map(|(_, (row_ids, _))| row_ids.len())
270        .sum();
271    let mut values = Vec::with_capacity(total_capacity);
272    for (_, (row_ids, row_addrs)) in overlapping_chunks.iter() {
273        values.extend(row_ids.iter().zip(row_addrs.iter()));
274    }
275    values.sort_by_key(|(row_id, _)| *row_id);
276    let row_id_segment = U64Segment::from_iter(values.iter().map(|(row_id, _)| *row_id));
277    let address_segment = U64Segment::from_iter(values.iter().map(|(_, row_addr)| *row_addr));
278
279    let range = row_id_segment.range().unwrap();
280
281    Ok((range, (row_id_segment, address_segment)))
282}
283
284#[cfg(test)]
285mod tests {
286    use super::*;
287    use proptest::{prelude::Strategy, prop_assert_eq};
288
289    #[test]
290    fn test_new_index() {
291        let fragment_indices = vec![
292            FragmentRowIdIndex {
293                fragment_id: 10,
294                row_id_sequence: Arc::new(RowIdSequence(vec![
295                    U64Segment::Range(0..10),
296                    U64Segment::RangeWithHoles {
297                        range: 10..17,
298                        holes: vec![12, 15].into(),
299                    },
300                    U64Segment::SortedArray(vec![20, 25, 30].into()),
301                ])),
302                deletion_vector: Arc::new(DeletionVector::default()),
303            },
304            FragmentRowIdIndex {
305                fragment_id: 20,
306                row_id_sequence: Arc::new(RowIdSequence(vec![
307                    U64Segment::RangeWithBitmap {
308                        range: 17..20,
309                        bitmap: [true, false, true].as_slice().into(),
310                    },
311                    U64Segment::Array(vec![40, 50, 60].into()),
312                ])),
313                deletion_vector: Arc::new(DeletionVector::default()),
314            },
315        ];
316
317        let index = RowIdIndex::new(&fragment_indices).unwrap();
318
319        // Check various queries.
320        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
321        assert_eq!(index.get(15), None);
322        assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
323        assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
324        assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
325        assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
326        assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
327        assert_eq!(index.get(61), None);
328    }
329
330    #[test]
331    fn test_new_index_overlap() {
332        let fragment_indices = vec![
333            FragmentRowIdIndex {
334                fragment_id: 23,
335                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
336                    vec![3, 6, 9].into(),
337                )])),
338                deletion_vector: Arc::new(DeletionVector::default()),
339            },
340            FragmentRowIdIndex {
341                fragment_id: 42,
342                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
343                    vec![2, 5, 8].into(),
344                )])),
345                deletion_vector: Arc::new(DeletionVector::default()),
346            },
347            FragmentRowIdIndex {
348                fragment_id: 10,
349                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
350                    vec![1, 4, 7].into(),
351                )])),
352                deletion_vector: Arc::new(DeletionVector::default()),
353            },
354        ];
355
356        let index = RowIdIndex::new(&fragment_indices).unwrap();
357
358        // Check various queries.
359        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 0)));
360        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(42, 0)));
361        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(23, 0)));
362        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 1)));
363        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(42, 1)));
364        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(23, 1)));
365        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(10, 2)));
366        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(42, 2)));
367        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(23, 2)));
368    }
369
370    #[test]
371    fn test_new_index_unsorted_row_ids() {
372        // Test case with unsorted row ids within fragments
373        let fragment_indices = vec![
374            FragmentRowIdIndex {
375                fragment_id: 10,
376                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
377                    vec![9, 3, 6].into(), // Unsorted array
378                )])),
379                deletion_vector: Arc::new(DeletionVector::default()),
380            },
381            FragmentRowIdIndex {
382                fragment_id: 20,
383                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
384                    vec![8, 2, 5].into(), // Unsorted array
385                )])),
386                deletion_vector: Arc::new(DeletionVector::default()),
387            },
388            FragmentRowIdIndex {
389                fragment_id: 30,
390                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
391                    vec![7, 1, 4].into(), // Unsorted array
392                )])),
393                deletion_vector: Arc::new(DeletionVector::default()),
394            },
395        ];
396
397        let index = RowIdIndex::new(&fragment_indices).unwrap();
398
399        // Check that all row ids can be found regardless of their order in the segments
400        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(30, 1)));
401        assert_eq!(index.get(2), Some(RowAddress::new_from_parts(20, 1)));
402        assert_eq!(index.get(3), Some(RowAddress::new_from_parts(10, 1)));
403        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(30, 2)));
404        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 2)));
405        assert_eq!(index.get(6), Some(RowAddress::new_from_parts(10, 2)));
406        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(30, 0)));
407        assert_eq!(index.get(8), Some(RowAddress::new_from_parts(20, 0)));
408        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(10, 0)));
409
410        // Check that non-existent row ids return None
411        assert_eq!(index.get(0), None);
412        assert_eq!(index.get(10), None);
413    }
414
415    #[test]
416    fn test_new_index_partial_overlap() {
417        let fragment_indices = vec![
418            FragmentRowIdIndex {
419                fragment_id: 0,
420                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::RangeWithHoles {
421                    range: 0..100,
422                    holes: vec![50].into(),
423                }])),
424                deletion_vector: Arc::new(DeletionVector::default()),
425            },
426            FragmentRowIdIndex {
427                fragment_id: 1,
428                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(50..51)])),
429                deletion_vector: Arc::new(DeletionVector::default()),
430            },
431        ];
432
433        let index = RowIdIndex::new(&fragment_indices).unwrap();
434
435        // Check various queries.
436        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
437        assert_eq!(index.get(49), Some(RowAddress::new_from_parts(0, 49)));
438        assert_eq!(index.get(50), Some(RowAddress::new_from_parts(1, 0)));
439        assert_eq!(index.get(51), Some(RowAddress::new_from_parts(0, 50)));
440        assert_eq!(index.get(99), Some(RowAddress::new_from_parts(0, 98)));
441    }
442
443    #[test]
444    fn test_index_with_deletion_vector() {
445        let deletion_vector = DeletionVector::from_iter(vec![2, 3]);
446
447        let fragment_indices = vec![FragmentRowIdIndex {
448            fragment_id: 10,
449            row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..6)])),
450            deletion_vector: Arc::new(deletion_vector),
451        }];
452
453        let index = RowIdIndex::new(&fragment_indices).unwrap();
454
455        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
456        assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 1)));
457        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
458        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(10, 5)));
459
460        assert_eq!(index.get(2), None);
461        assert_eq!(index.get(3), None);
462    }
463
464    #[test]
465    fn test_empty_fragment_sequences() {
466        let fragment_indices = vec![
467            FragmentRowIdIndex {
468                fragment_id: 10,
469                row_id_sequence: Arc::new(RowIdSequence(vec![])),
470                deletion_vector: Arc::new(DeletionVector::default()),
471            },
472            FragmentRowIdIndex {
473                fragment_id: 20,
474                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..8)])),
475                deletion_vector: Arc::new(DeletionVector::default()),
476            },
477        ];
478
479        let index = RowIdIndex::new(&fragment_indices).unwrap();
480
481        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
482        assert_eq!(index.get(7), Some(RowAddress::new_from_parts(20, 2)));
483        assert_eq!(index.get(4), None);
484    }
485
486    #[test]
487    fn test_completely_empty_index() {
488        let fragment_indices = vec![];
489        let index = RowIdIndex::new(&fragment_indices).unwrap();
490
491        assert_eq!(index.get(0), None);
492        assert_eq!(index.get(100), None);
493    }
494
495    #[test]
496    fn test_non_overlapping_ranges() {
497        let fragment_indices = vec![
498            FragmentRowIdIndex {
499                fragment_id: 10,
500                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..5)])),
501                deletion_vector: Arc::new(DeletionVector::default()),
502            },
503            FragmentRowIdIndex {
504                fragment_id: 20,
505                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..10)])),
506                deletion_vector: Arc::new(DeletionVector::default()),
507            },
508            FragmentRowIdIndex {
509                fragment_id: 30,
510                row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(10..15)])),
511                deletion_vector: Arc::new(DeletionVector::default()),
512            },
513        ];
514
515        let index = RowIdIndex::new(&fragment_indices).unwrap();
516
517        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
518        assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
519        assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
520        assert_eq!(index.get(9), Some(RowAddress::new_from_parts(20, 4)));
521        assert_eq!(index.get(10), Some(RowAddress::new_from_parts(30, 0)));
522        assert_eq!(index.get(14), Some(RowAddress::new_from_parts(30, 4)));
523    }
524
525    fn arbitrary_row_ids(
526        num_fragments_range: std::ops::Range<usize>,
527        frag_size_range: std::ops::Range<usize>,
528    ) -> impl Strategy<Value = Vec<(u32, Arc<RowIdSequence>)>> {
529        let fragment_sizes = proptest::collection::vec(frag_size_range, num_fragments_range);
530        fragment_sizes.prop_flat_map(|fragment_sizes| {
531            let num_rows = fragment_sizes.iter().sum::<usize>() as u64;
532            let row_ids = 0..num_rows;
533            let row_ids = row_ids.collect::<Vec<_>>();
534            let row_ids_shuffled = proptest::strategy::Just(row_ids).prop_shuffle();
535            row_ids_shuffled.prop_map(move |row_ids| {
536                let mut sequences = Vec::with_capacity(fragment_sizes.len());
537                let mut i = 0;
538                for size in &fragment_sizes {
539                    let end = i + size;
540                    let sequence =
541                        RowIdSequence(vec![U64Segment::from_slice(row_ids[i..end].into())]);
542                    sequences.push((i as u32, Arc::new(sequence)));
543                    i = end;
544                }
545                sequences
546            })
547        })
548    }
549
550    proptest::proptest! {
551        #[test]
552        fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {
553            let fragment_indices: Vec<FragmentRowIdIndex> = row_ids
554                .iter()
555                .map(|(frag_id, sequence)| FragmentRowIdIndex {
556                    fragment_id: *frag_id,
557                    row_id_sequence: sequence.clone(),
558                    deletion_vector: Arc::new(DeletionVector::default()),
559                })
560                .collect();
561
562            let index = RowIdIndex::new(&fragment_indices).unwrap();
563            for (frag_id, sequence) in row_ids.iter() {
564                for (local_offset, row_id) in sequence.iter().enumerate() {
565                    prop_assert_eq!(
566                        index.get(row_id),
567                        Some(RowAddress::new_from_parts(*frag_id, local_offset as u32)),
568                        "Row id {} in sequence {:?} not found in index {:?}",
569                        row_id,
570                        sequence,
571                        index
572                    );
573                }
574            }
575        }
576    }
577}