lance_table/rowids/
index.rs1use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use deepsize::DeepSizeOf;
8use lance_core::utils::address::RowAddress;
9use lance_core::{Error, Result};
10use rangemap::RangeInclusiveMap;
11use snafu::location;
12
13use super::{RowIdSequence, U64Segment};
14
15#[derive(Debug)]
28pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
29
30impl RowIdIndex {
31 pub fn new(
33 fragment_indices: &[(u32, Arc<RowIdSequence>)],
34 uses_stable_row_ids: bool,
35 ) -> Result<Self> {
36 let mut pieces = fragment_indices
37 .iter()
38 .flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
39 .collect::<Vec<_>>();
40 pieces.sort_by_key(|(range, _)| *range.start());
41
42 if !uses_stable_row_ids && pieces.windows(2).any(|w| w[0].0.end() >= w[1].0.start()) {
45 return Err(Error::NotSupported {
46 source: "Overlapping ranges are not yet supported".into(),
47 location: location!(),
48 });
49 }
50
51 Ok(Self(RangeInclusiveMap::from_iter(pieces)))
52 }
53
54 pub fn get(&self, row_id: u64) -> Option<RowAddress> {
58 let (row_id_segment, address_segment) = self.0.get(&row_id)?;
59 let pos = row_id_segment.position(row_id)?;
60 let address = address_segment.get(pos)?;
61 Some(RowAddress::from(address))
62 }
63}
64
65impl DeepSizeOf for RowIdIndex {
66 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
67 self.0
68 .iter()
69 .map(|(_, (row_id_segment, address_segment))| {
70 (2 * std::mem::size_of::<u64>())
71 + std::mem::size_of::<(U64Segment, U64Segment)>()
72 + row_id_segment.deep_size_of_children(context)
73 + address_segment.deep_size_of_children(context)
74 })
75 .sum()
76 }
77}
78
79fn decompose_sequence(
80 fragment_id: u32,
81 sequence: &RowIdSequence,
82) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
83 let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
84 sequence
85 .0
86 .iter()
87 .filter_map(|segment| {
88 let segment_len = segment.len() as u64;
89 let address_segment = U64Segment::Range(start_address..(start_address + segment_len));
90 start_address += segment_len;
91
92 let coverage = segment.range()?;
93
94 Some((coverage, (segment.clone(), address_segment)))
95 })
96 .collect()
97}
98
99#[cfg(test)]
100mod tests {
101 use super::*;
102
103 #[test]
104 fn test_new_index() {
105 let fragment_indices = vec![
106 (
107 10,
108 Arc::new(RowIdSequence(vec![
109 U64Segment::Range(0..10),
110 U64Segment::RangeWithHoles {
111 range: 10..17,
112 holes: vec![12, 15].into(),
113 },
114 U64Segment::SortedArray(vec![20, 25, 30].into()),
115 ])),
116 ),
117 (
118 20,
119 Arc::new(RowIdSequence(vec![
120 U64Segment::RangeWithBitmap {
121 range: 17..20,
122 bitmap: [true, false, true].as_slice().into(),
123 },
124 U64Segment::Array(vec![40, 50, 60].into()),
125 ])),
126 ),
127 ];
128
129 let index = RowIdIndex::new(&fragment_indices, false).unwrap();
130
131 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
133 assert_eq!(index.get(15), None);
134 assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
135 assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
136 assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
137 assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
138 assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
139 assert_eq!(index.get(61), None);
140 }
141}