lance_table/rowids/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use deepsize::DeepSizeOf;
8use lance_core::utils::address::RowAddress;
9use lance_core::{Error, Result};
10use rangemap::RangeInclusiveMap;
11use snafu::location;
12
13use super::{RowIdSequence, U64Segment};
14
15/// An index of row ids
16///
17/// This index is used to map row ids to their corresponding addresses. These
18/// addresses correspond to physical positions in the dataset. See [RowAddress].
19///
20/// This structure only contains rows that physically exist. However, it may
21/// map to addresses that have been tombstoned. A separate tombstone index is
22/// used to track tombstoned rows.
23// (Implementation)
24// Disjoint ranges of row ids are stored as the keys of the map. The values are
25// a pair of segments. The first segment is the row ids, and the second segment
26// is the addresses.
27#[derive(Debug)]
28pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
29
30impl RowIdIndex {
31    /// Create a new index from a list of fragment ids and their corresponding row id sequences.
32    pub fn new(
33        fragment_indices: &[(u32, Arc<RowIdSequence>)],
34        uses_stable_row_ids: bool,
35    ) -> Result<Self> {
36        let mut pieces = fragment_indices
37            .iter()
38            .flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
39            .collect::<Vec<_>>();
40        pieces.sort_by_key(|(range, _)| *range.start());
41
42        // If users stable row ids, check for overlapping ranges and if found,
43        // return a NotImplementedError.
44        if !uses_stable_row_ids && pieces.windows(2).any(|w| w[0].0.end() >= w[1].0.start()) {
45            return Err(Error::NotSupported {
46                source: "Overlapping ranges are not yet supported".into(),
47                location: location!(),
48            });
49        }
50
51        Ok(Self(RangeInclusiveMap::from_iter(pieces)))
52    }
53
54    /// Get the address for a given row id.
55    ///
56    /// Will return None if the row id does not exist in the index.
57    pub fn get(&self, row_id: u64) -> Option<RowAddress> {
58        let (row_id_segment, address_segment) = self.0.get(&row_id)?;
59        let pos = row_id_segment.position(row_id)?;
60        let address = address_segment.get(pos)?;
61        Some(RowAddress::from(address))
62    }
63}
64
65impl DeepSizeOf for RowIdIndex {
66    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
67        self.0
68            .iter()
69            .map(|(_, (row_id_segment, address_segment))| {
70                (2 * std::mem::size_of::<u64>())
71                    + std::mem::size_of::<(U64Segment, U64Segment)>()
72                    + row_id_segment.deep_size_of_children(context)
73                    + address_segment.deep_size_of_children(context)
74            })
75            .sum()
76    }
77}
78
79fn decompose_sequence(
80    fragment_id: u32,
81    sequence: &RowIdSequence,
82) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
83    let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
84    sequence
85        .0
86        .iter()
87        .filter_map(|segment| {
88            let segment_len = segment.len() as u64;
89            let address_segment = U64Segment::Range(start_address..(start_address + segment_len));
90            start_address += segment_len;
91
92            let coverage = segment.range()?;
93
94            Some((coverage, (segment.clone(), address_segment)))
95        })
96        .collect()
97}
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    #[test]
104    fn test_new_index() {
105        let fragment_indices = vec![
106            (
107                10,
108                Arc::new(RowIdSequence(vec![
109                    U64Segment::Range(0..10),
110                    U64Segment::RangeWithHoles {
111                        range: 10..17,
112                        holes: vec![12, 15].into(),
113                    },
114                    U64Segment::SortedArray(vec![20, 25, 30].into()),
115                ])),
116            ),
117            (
118                20,
119                Arc::new(RowIdSequence(vec![
120                    U64Segment::RangeWithBitmap {
121                        range: 17..20,
122                        bitmap: [true, false, true].as_slice().into(),
123                    },
124                    U64Segment::Array(vec![40, 50, 60].into()),
125                ])),
126            ),
127        ];
128
129        let index = RowIdIndex::new(&fragment_indices, false).unwrap();
130
131        // Check various queries.
132        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
133        assert_eq!(index.get(15), None);
134        assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
135        assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
136        assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
137        assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
138        assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
139        assert_eq!(index.get(61), None);
140    }
141}