lance_table/rowids/
index.rs1use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use deepsize::DeepSizeOf;
8use lance_core::utils::address::RowAddress;
9use lance_core::{Error, Result};
10use rangemap::RangeInclusiveMap;
11use snafu::location;
12
13use super::{RowIdSequence, U64Segment};
14
15#[derive(Debug)]
28pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
29
30impl RowIdIndex {
31 pub fn new(fragment_indices: &[(u32, Arc<RowIdSequence>)]) -> Result<Self> {
33 let mut pieces = fragment_indices
34 .iter()
35 .flat_map(|(fragment_id, sequence)| decompose_sequence(*fragment_id, sequence))
36 .collect::<Vec<_>>();
37 pieces.sort_by_key(|(range, _)| *range.start());
38
39 if pieces.windows(2).any(|w| w[0].0.end() >= w[1].0.start()) {
43 return Err(Error::NotSupported {
44 source: "Overlapping ranges are not yet supported".into(),
45 location: location!(),
46 });
47 }
48
49 Ok(Self(RangeInclusiveMap::from_iter(pieces)))
50 }
51
52 pub fn get(&self, row_id: u64) -> Option<RowAddress> {
56 let (row_id_segment, address_segment) = self.0.get(&row_id)?;
57 let pos = row_id_segment.position(row_id)?;
58 let address = address_segment.get(pos)?;
59 Some(RowAddress::from(address))
60 }
61}
62
63impl DeepSizeOf for RowIdIndex {
64 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
65 self.0
66 .iter()
67 .map(|(_, (row_id_segment, address_segment))| {
68 (2 * std::mem::size_of::<u64>())
69 + std::mem::size_of::<(U64Segment, U64Segment)>()
70 + row_id_segment.deep_size_of_children(context)
71 + address_segment.deep_size_of_children(context)
72 })
73 .sum()
74 }
75}
76
77fn decompose_sequence(
78 fragment_id: u32,
79 sequence: &RowIdSequence,
80) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
81 let mut start_address: u64 = RowAddress::first_row(fragment_id).into();
82 sequence
83 .0
84 .iter()
85 .filter_map(|segment| {
86 let segment_len = segment.len() as u64;
87 let address_segment = U64Segment::Range(start_address..(start_address + segment_len));
88 start_address += segment_len;
89
90 let coverage = segment.range()?;
91
92 Some((coverage, (segment.clone(), address_segment)))
93 })
94 .collect()
95}
96
97#[cfg(test)]
98mod tests {
99 use super::*;
100
101 #[test]
102 fn test_new_index() {
103 let fragment_indices = vec![
104 (
105 10,
106 Arc::new(RowIdSequence(vec![
107 U64Segment::Range(0..10),
108 U64Segment::RangeWithHoles {
109 range: 10..17,
110 holes: vec![12, 15].into(),
111 },
112 U64Segment::SortedArray(vec![20, 25, 30].into()),
113 ])),
114 ),
115 (
116 20,
117 Arc::new(RowIdSequence(vec![
118 U64Segment::RangeWithBitmap {
119 range: 17..20,
120 bitmap: [true, false, true].as_slice().into(),
121 },
122 U64Segment::Array(vec![40, 50, 60].into()),
123 ])),
124 ),
125 ];
126
127 let index = RowIdIndex::new(&fragment_indices).unwrap();
128
129 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
131 assert_eq!(index.get(15), None);
132 assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
133 assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
134 assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
135 assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
136 assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
137 assert_eq!(index.get(61), None);
138 }
139}