lance_table/rowids/
index.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use deepsize::DeepSizeOf;
8use lance_core::utils::address::RowAddress;
9use lance_core::{Error, Result};
10use rangemap::RangeInclusiveMap;
11use snafu::location;
12
13use super::{RowIdSequence, U64Segment};
14
15/// An index of row ids
16///
17/// This index is used to map row ids to their corresponding addresses. These
18/// addresses correspond to physical positions in the dataset. See [RowAddress].
19///
20/// This structure only contains rows that physically exist. However, it may
21/// map to addresses that have been tombstoned. A separate tombstone index is
22/// used to track tombstoned rows.
23// (Implementation)
24// Disjoint ranges of row ids are stored as the keys of the map. The values are
25// a pair of segments. The first segment is the row ids, and the second segment
26// is the addresses.
27#[derive(Debug)]
28pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
29
30impl RowIdIndex {
31    /// Create a new index from a list of fragment ids and their corresponding row id sequences.
32    pub fn new(fragment_indices: &[(u32, Arc<RowIdSequence>)]) -> Result<Self> {
33        let mut pieces = fragment_indices
34            .iter()
35            .flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
36            .collect::<Vec<_>>();
37        pieces.sort_by_key(|(range, _)| *range.start());
38
39        // Check for overlapping ranges and if found, return a NotImplementedError.
40        // We don't expect this pattern yet until we make row ids stable after
41        // updates.
42        if pieces.windows(2).any(|w| w[0].0.end() >= w[1].0.start()) {
43            return Err(Error::NotSupported {
44                source: "Overlapping ranges are not yet supported".into(),
45                location: location!(),
46            });
47        }
48
49        Ok(Self(RangeInclusiveMap::from_iter(pieces)))
50    }
51
52    /// Get the address for a given row id.
53    ///
54    /// Will return None if the row id does not exist in the index.
55    pub fn get(&self, row_id: u64) -> Option<RowAddress> {
56        let (row_id_segment, address_segment) = self.0.get(&row_id)?;
57        let pos = row_id_segment.position(row_id)?;
58        let address = address_segment.get(pos)?;
59        Some(RowAddress::from(address))
60    }
61}
62
63impl DeepSizeOf for RowIdIndex {
64    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
65        self.0
66            .iter()
67            .map(|(_, (row_id_segment, address_segment))| {
68                (2 * std::mem::size_of::<u64>())
69                    + std::mem::size_of::<(U64Segment, U64Segment)>()
70                    + row_id_segment.deep_size_of_children(context)
71                    + address_segment.deep_size_of_children(context)
72            })
73            .sum()
74    }
75}
76
77fn decompose_sequence(
78    fragment_id: u32,
79    sequence: &RowIdSequence,
80) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
81    let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
82    sequence
83        .0
84        .iter()
85        .filter_map(|segment| {
86            let segment_len = segment.len() as u64;
87            let address_segment = U64Segment::Range(start_address..(start_address + segment_len));
88            start_address += segment_len;
89
90            let coverage = segment.range()?;
91
92            Some((coverage, (segment.clone(), address_segment)))
93        })
94        .collect()
95}
96
97#[cfg(test)]
98mod tests {
99    use super::*;
100
101    #[test]
102    fn test_new_index() {
103        let fragment_indices = vec![
104            (
105                10,
106                Arc::new(RowIdSequence(vec![
107                    U64Segment::Range(0..10),
108                    U64Segment::RangeWithHoles {
109                        range: 10..17,
110                        holes: vec![12, 15].into(),
111                    },
112                    U64Segment::SortedArray(vec![20, 25, 30].into()),
113                ])),
114            ),
115            (
116                20,
117                Arc::new(RowIdSequence(vec![
118                    U64Segment::RangeWithBitmap {
119                        range: 17..20,
120                        bitmap: [true, false, true].as_slice().into(),
121                    },
122                    U64Segment::Array(vec![40, 50, 60].into()),
123                ])),
124            ),
125        ];
126
127        let index = RowIdIndex::new(&fragment_indices).unwrap();
128
129        // Check various queries.
130        assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
131        assert_eq!(index.get(15), None);
132        assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
133        assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
134        assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
135        assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
136        assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
137        assert_eq!(index.get(61), None);
138    }
139}