1use std::ops::RangeInclusive;
5use std::sync::Arc;
6
7use super::{RowIdSequence, U64Segment};
8use deepsize::DeepSizeOf;
9use lance_core::utils::address::RowAddress;
10use lance_core::utils::deletion::DeletionVector;
11use lance_core::Result;
12use rangemap::RangeInclusiveMap;
13
14#[derive(Debug)]
27pub struct RowIdIndex(RangeInclusiveMap<u64, (U64Segment, U64Segment)>);
28
29pub struct FragmentRowIdIndex {
30 pub fragment_id: u32,
31 pub row_id_sequence: Arc<RowIdSequence>,
32 pub deletion_vector: Arc<DeletionVector>,
33}
34
35impl RowIdIndex {
36 pub fn new(fragment_indices: &[FragmentRowIdIndex]) -> Result<Self> {
38 let chunks = fragment_indices
39 .iter()
40 .flat_map(decompose_sequence)
41 .collect::<Vec<_>>();
42
43 let mut final_chunks = Vec::new();
44 for processed_chunk in prep_index_chunks(chunks) {
45 match processed_chunk {
46 RawIndexChunk::NonOverlapping(chunk) => {
47 final_chunks.push(chunk);
48 }
49 RawIndexChunk::Overlapping(range, overlapping_chunks) => {
50 debug_assert_eq!(
51 range.end() - range.start() + 1,
52 overlapping_chunks
53 .iter()
54 .map(|(_, (seq, _))| seq.len() as u64)
55 .sum::<u64>(),
56 "Wrong range for {:?}, chunks: {:?}",
57 range,
58 overlapping_chunks,
59 );
60 let merged_chunk = merge_overlapping_chunks(overlapping_chunks)?;
62 final_chunks.push(merged_chunk);
63 }
64 }
65 }
66
67 Ok(Self(RangeInclusiveMap::from_iter(final_chunks)))
68 }
69
70 pub fn get(&self, row_id: u64) -> Option<RowAddress> {
74 let (row_id_segment, address_segment) = self.0.get(&row_id)?;
75 let pos = row_id_segment.position(row_id)?;
76 let address = address_segment.get(pos)?;
77 Some(RowAddress::from(address))
78 }
79}
80
81impl DeepSizeOf for RowIdIndex {
82 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
83 self.0
84 .iter()
85 .map(|(_, (row_id_segment, address_segment))| {
86 (2 * std::mem::size_of::<u64>())
87 + std::mem::size_of::<(U64Segment, U64Segment)>()
88 + row_id_segment.deep_size_of_children(context)
89 + address_segment.deep_size_of_children(context)
90 })
91 .sum()
92 }
93}
94
95fn decompose_sequence(
96 frag_index: &FragmentRowIdIndex,
97) -> Vec<(RangeInclusive<u64>, (U64Segment, U64Segment))> {
98 let mut start_address: u64 = RowAddress::first_row(frag_index.fragment_id).into();
99 let mut current_offset = 0u32;
100
101 frag_index
102 .row_id_sequence
103 .0
104 .iter()
105 .filter_map(|segment| {
106 let segment_len = segment.len();
107
108 let active_pairs: Vec<(u64, u64)> = segment
109 .iter()
110 .enumerate()
111 .filter_map(|(i, row_id)| {
112 let row_offset = current_offset + i as u32;
113 if !frag_index.deletion_vector.contains(row_offset) {
114 let address = start_address + i as u64;
115 Some((row_id, address))
116 } else {
117 None
118 }
119 })
120 .collect();
121
122 current_offset += segment_len as u32;
123 start_address += segment_len as u64;
124
125 if active_pairs.is_empty() {
126 return None;
127 }
128
129 let row_ids: Vec<u64> = active_pairs.iter().map(|(rid, _)| *rid).collect();
130 let addresses: Vec<u64> = active_pairs.iter().map(|(_, addr)| *addr).collect();
131
132 let row_id_segment = U64Segment::from_iter(row_ids.iter().copied());
133 let address_segment = U64Segment::from_iter(addresses.iter().copied());
134
135 let coverage = row_id_segment.range()?;
136
137 Some((coverage, (row_id_segment, address_segment)))
138 })
139 .collect()
140}
141
142type IndexChunk = (RangeInclusive<u64>, (U64Segment, U64Segment));
143
144#[derive(Debug)]
145enum RawIndexChunk {
146 NonOverlapping(IndexChunk),
147 Overlapping(RangeInclusive<u64>, Vec<IndexChunk>),
148}
149
150impl RawIndexChunk {
151 fn range_end(&self) -> u64 {
152 match self {
153 Self::NonOverlapping((range, _)) => *range.end(),
154 Self::Overlapping(range, _) => *range.end(),
155 }
156 }
157}
158
159fn prep_index_chunks(mut chunks: Vec<IndexChunk>) -> impl Iterator<Item = RawIndexChunk> {
164 chunks.sort_by_key(|(range, _)| u64::MAX - *range.start());
165
166 let mut output = Vec::new();
167
168 if let Some(first_chunk) = chunks.pop() {
170 output.push(RawIndexChunk::NonOverlapping(first_chunk));
171 } else {
172 return output.into_iter();
174 }
175
176 let mut current_range = 0..=0;
177 let mut current_overlap = Vec::new();
178 while let Some(chunk) = chunks.pop() {
179 debug_assert_eq!(
180 current_overlap
181 .iter()
182 .map(|(range, _): &IndexChunk| *range.start())
183 .min()
184 .unwrap_or_default(),
185 *current_range.start(),
186 );
187 debug_assert_eq!(
188 current_overlap
189 .iter()
190 .map(|(range, _): &IndexChunk| *range.end())
191 .max()
192 .unwrap_or_default(),
193 *current_range.end(),
194 );
195
196 if current_overlap.is_empty() {
197 let last_chunk_end = output.last().unwrap().range_end();
199 if *chunk.0.start() <= last_chunk_end {
200 match output.pop().unwrap() {
202 RawIndexChunk::NonOverlapping(chunk) => {
203 current_overlap.push(chunk);
204 }
205 _ => unreachable!(),
206 }
207 current_overlap.push(chunk);
208
209 let range_start = *current_overlap.first().unwrap().0.start();
210 let range_end = *current_overlap
211 .last()
212 .unwrap()
213 .0
214 .end()
215 .max(current_overlap.first().unwrap().0.end());
216 current_range = range_start..=range_end;
217 } else {
218 output.push(RawIndexChunk::NonOverlapping(chunk));
220 }
221 } else {
222 if chunk.0.start() <= current_range.end() {
224 let range_end = *chunk.0.end().max(current_range.end());
226 current_range = *current_range.start()..=range_end;
227
228 current_overlap.push(chunk);
229 } else {
230 output.push(RawIndexChunk::Overlapping(
232 std::mem::replace(&mut current_range, 0..=0),
233 std::mem::take(&mut current_overlap),
234 ));
235 output.push(RawIndexChunk::NonOverlapping(chunk));
236 }
237 }
238 }
239 debug_assert_eq!(
240 current_overlap
241 .iter()
242 .map(|(range, _): &IndexChunk| *range.start())
243 .min()
244 .unwrap_or_default(),
245 *current_range.start(),
246 );
247 debug_assert_eq!(
248 current_overlap
249 .iter()
250 .map(|(range, _): &IndexChunk| *range.end())
251 .max()
252 .unwrap_or_default(),
253 *current_range.end(),
254 );
255
256 if !current_overlap.is_empty() {
257 output.push(RawIndexChunk::Overlapping(
258 current_range.clone(),
259 current_overlap,
260 ));
261 }
262
263 output.into_iter()
264}
265
266fn merge_overlapping_chunks(overlapping_chunks: Vec<IndexChunk>) -> Result<IndexChunk> {
267 let total_capacity = overlapping_chunks
268 .iter()
269 .map(|(_, (row_ids, _))| row_ids.len())
270 .sum();
271 let mut values = Vec::with_capacity(total_capacity);
272 for (_, (row_ids, row_addrs)) in overlapping_chunks.iter() {
273 values.extend(row_ids.iter().zip(row_addrs.iter()));
274 }
275 values.sort_by_key(|(row_id, _)| *row_id);
276 let row_id_segment = U64Segment::from_iter(values.iter().map(|(row_id, _)| *row_id));
277 let address_segment = U64Segment::from_iter(values.iter().map(|(_, row_addr)| *row_addr));
278
279 let range = row_id_segment.range().unwrap();
280
281 Ok((range, (row_id_segment, address_segment)))
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use proptest::{prelude::Strategy, prop_assert_eq};
288
289 #[test]
290 fn test_new_index() {
291 let fragment_indices = vec![
292 FragmentRowIdIndex {
293 fragment_id: 10,
294 row_id_sequence: Arc::new(RowIdSequence(vec![
295 U64Segment::Range(0..10),
296 U64Segment::RangeWithHoles {
297 range: 10..17,
298 holes: vec![12, 15].into(),
299 },
300 U64Segment::SortedArray(vec![20, 25, 30].into()),
301 ])),
302 deletion_vector: Arc::new(DeletionVector::default()),
303 },
304 FragmentRowIdIndex {
305 fragment_id: 20,
306 row_id_sequence: Arc::new(RowIdSequence(vec![
307 U64Segment::RangeWithBitmap {
308 range: 17..20,
309 bitmap: [true, false, true].as_slice().into(),
310 },
311 U64Segment::Array(vec![40, 50, 60].into()),
312 ])),
313 deletion_vector: Arc::new(DeletionVector::default()),
314 },
315 ];
316
317 let index = RowIdIndex::new(&fragment_indices).unwrap();
318
319 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
321 assert_eq!(index.get(15), None);
322 assert_eq!(index.get(16), Some(RowAddress::new_from_parts(10, 14)));
323 assert_eq!(index.get(17), Some(RowAddress::new_from_parts(20, 0)));
324 assert_eq!(index.get(25), Some(RowAddress::new_from_parts(10, 16)));
325 assert_eq!(index.get(40), Some(RowAddress::new_from_parts(20, 2)));
326 assert_eq!(index.get(60), Some(RowAddress::new_from_parts(20, 4)));
327 assert_eq!(index.get(61), None);
328 }
329
330 #[test]
331 fn test_new_index_overlap() {
332 let fragment_indices = vec![
333 FragmentRowIdIndex {
334 fragment_id: 23,
335 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
336 vec![3, 6, 9].into(),
337 )])),
338 deletion_vector: Arc::new(DeletionVector::default()),
339 },
340 FragmentRowIdIndex {
341 fragment_id: 42,
342 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
343 vec![2, 5, 8].into(),
344 )])),
345 deletion_vector: Arc::new(DeletionVector::default()),
346 },
347 FragmentRowIdIndex {
348 fragment_id: 10,
349 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::SortedArray(
350 vec![1, 4, 7].into(),
351 )])),
352 deletion_vector: Arc::new(DeletionVector::default()),
353 },
354 ];
355
356 let index = RowIdIndex::new(&fragment_indices).unwrap();
357
358 assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 0)));
360 assert_eq!(index.get(2), Some(RowAddress::new_from_parts(42, 0)));
361 assert_eq!(index.get(3), Some(RowAddress::new_from_parts(23, 0)));
362 assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 1)));
363 assert_eq!(index.get(5), Some(RowAddress::new_from_parts(42, 1)));
364 assert_eq!(index.get(6), Some(RowAddress::new_from_parts(23, 1)));
365 assert_eq!(index.get(7), Some(RowAddress::new_from_parts(10, 2)));
366 assert_eq!(index.get(8), Some(RowAddress::new_from_parts(42, 2)));
367 assert_eq!(index.get(9), Some(RowAddress::new_from_parts(23, 2)));
368 }
369
370 #[test]
371 fn test_new_index_unsorted_row_ids() {
372 let fragment_indices = vec![
374 FragmentRowIdIndex {
375 fragment_id: 10,
376 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
377 vec![9, 3, 6].into(), )])),
379 deletion_vector: Arc::new(DeletionVector::default()),
380 },
381 FragmentRowIdIndex {
382 fragment_id: 20,
383 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
384 vec![8, 2, 5].into(), )])),
386 deletion_vector: Arc::new(DeletionVector::default()),
387 },
388 FragmentRowIdIndex {
389 fragment_id: 30,
390 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Array(
391 vec![7, 1, 4].into(), )])),
393 deletion_vector: Arc::new(DeletionVector::default()),
394 },
395 ];
396
397 let index = RowIdIndex::new(&fragment_indices).unwrap();
398
399 assert_eq!(index.get(1), Some(RowAddress::new_from_parts(30, 1)));
401 assert_eq!(index.get(2), Some(RowAddress::new_from_parts(20, 1)));
402 assert_eq!(index.get(3), Some(RowAddress::new_from_parts(10, 1)));
403 assert_eq!(index.get(4), Some(RowAddress::new_from_parts(30, 2)));
404 assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 2)));
405 assert_eq!(index.get(6), Some(RowAddress::new_from_parts(10, 2)));
406 assert_eq!(index.get(7), Some(RowAddress::new_from_parts(30, 0)));
407 assert_eq!(index.get(8), Some(RowAddress::new_from_parts(20, 0)));
408 assert_eq!(index.get(9), Some(RowAddress::new_from_parts(10, 0)));
409
410 assert_eq!(index.get(0), None);
412 assert_eq!(index.get(10), None);
413 }
414
415 #[test]
416 fn test_new_index_partial_overlap() {
417 let fragment_indices = vec![
418 FragmentRowIdIndex {
419 fragment_id: 0,
420 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::RangeWithHoles {
421 range: 0..100,
422 holes: vec![50].into(),
423 }])),
424 deletion_vector: Arc::new(DeletionVector::default()),
425 },
426 FragmentRowIdIndex {
427 fragment_id: 1,
428 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(50..51)])),
429 deletion_vector: Arc::new(DeletionVector::default()),
430 },
431 ];
432
433 let index = RowIdIndex::new(&fragment_indices).unwrap();
434
435 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(0, 0)));
437 assert_eq!(index.get(49), Some(RowAddress::new_from_parts(0, 49)));
438 assert_eq!(index.get(50), Some(RowAddress::new_from_parts(1, 0)));
439 assert_eq!(index.get(51), Some(RowAddress::new_from_parts(0, 50)));
440 assert_eq!(index.get(99), Some(RowAddress::new_from_parts(0, 98)));
441 }
442
443 #[test]
444 fn test_index_with_deletion_vector() {
445 let deletion_vector = DeletionVector::from_iter(vec![2, 3]);
446
447 let fragment_indices = vec![FragmentRowIdIndex {
448 fragment_id: 10,
449 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..6)])),
450 deletion_vector: Arc::new(deletion_vector),
451 }];
452
453 let index = RowIdIndex::new(&fragment_indices).unwrap();
454
455 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
456 assert_eq!(index.get(1), Some(RowAddress::new_from_parts(10, 1)));
457 assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
458 assert_eq!(index.get(5), Some(RowAddress::new_from_parts(10, 5)));
459
460 assert_eq!(index.get(2), None);
461 assert_eq!(index.get(3), None);
462 }
463
464 #[test]
465 fn test_empty_fragment_sequences() {
466 let fragment_indices = vec![
467 FragmentRowIdIndex {
468 fragment_id: 10,
469 row_id_sequence: Arc::new(RowIdSequence(vec![])),
470 deletion_vector: Arc::new(DeletionVector::default()),
471 },
472 FragmentRowIdIndex {
473 fragment_id: 20,
474 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..8)])),
475 deletion_vector: Arc::new(DeletionVector::default()),
476 },
477 ];
478
479 let index = RowIdIndex::new(&fragment_indices).unwrap();
480
481 assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
482 assert_eq!(index.get(7), Some(RowAddress::new_from_parts(20, 2)));
483 assert_eq!(index.get(4), None);
484 }
485
486 #[test]
487 fn test_completely_empty_index() {
488 let fragment_indices = vec![];
489 let index = RowIdIndex::new(&fragment_indices).unwrap();
490
491 assert_eq!(index.get(0), None);
492 assert_eq!(index.get(100), None);
493 }
494
495 #[test]
496 fn test_non_overlapping_ranges() {
497 let fragment_indices = vec![
498 FragmentRowIdIndex {
499 fragment_id: 10,
500 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(0..5)])),
501 deletion_vector: Arc::new(DeletionVector::default()),
502 },
503 FragmentRowIdIndex {
504 fragment_id: 20,
505 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(5..10)])),
506 deletion_vector: Arc::new(DeletionVector::default()),
507 },
508 FragmentRowIdIndex {
509 fragment_id: 30,
510 row_id_sequence: Arc::new(RowIdSequence(vec![U64Segment::Range(10..15)])),
511 deletion_vector: Arc::new(DeletionVector::default()),
512 },
513 ];
514
515 let index = RowIdIndex::new(&fragment_indices).unwrap();
516
517 assert_eq!(index.get(0), Some(RowAddress::new_from_parts(10, 0)));
518 assert_eq!(index.get(4), Some(RowAddress::new_from_parts(10, 4)));
519 assert_eq!(index.get(5), Some(RowAddress::new_from_parts(20, 0)));
520 assert_eq!(index.get(9), Some(RowAddress::new_from_parts(20, 4)));
521 assert_eq!(index.get(10), Some(RowAddress::new_from_parts(30, 0)));
522 assert_eq!(index.get(14), Some(RowAddress::new_from_parts(30, 4)));
523 }
524
525 fn arbitrary_row_ids(
526 num_fragments_range: std::ops::Range<usize>,
527 frag_size_range: std::ops::Range<usize>,
528 ) -> impl Strategy<Value = Vec<(u32, Arc<RowIdSequence>)>> {
529 let fragment_sizes = proptest::collection::vec(frag_size_range, num_fragments_range);
530 fragment_sizes.prop_flat_map(|fragment_sizes| {
531 let num_rows = fragment_sizes.iter().sum::<usize>() as u64;
532 let row_ids = 0..num_rows;
533 let row_ids = row_ids.collect::<Vec<_>>();
534 let row_ids_shuffled = proptest::strategy::Just(row_ids).prop_shuffle();
535 row_ids_shuffled.prop_map(move |row_ids| {
536 let mut sequences = Vec::with_capacity(fragment_sizes.len());
537 let mut i = 0;
538 for size in &fragment_sizes {
539 let end = i + size;
540 let sequence =
541 RowIdSequence(vec![U64Segment::from_slice(row_ids[i..end].into())]);
542 sequences.push((i as u32, Arc::new(sequence)));
543 i = end;
544 }
545 sequences
546 })
547 })
548 }
549
550 proptest::proptest! {
551 #[test]
552 fn test_new_index_robustness(row_ids in arbitrary_row_ids(0..5, 0..32)) {
553 let fragment_indices: Vec<FragmentRowIdIndex> = row_ids
554 .iter()
555 .map(|(frag_id, sequence)| FragmentRowIdIndex {
556 fragment_id: *frag_id,
557 row_id_sequence: sequence.clone(),
558 deletion_vector: Arc::new(DeletionVector::default()),
559 })
560 .collect();
561
562 let index = RowIdIndex::new(&fragment_indices).unwrap();
563 for (frag_id, sequence) in row_ids.iter() {
564 for (local_offset, row_id) in sequence.iter().enumerate() {
565 prop_assert_eq!(
566 index.get(row_id),
567 Some(RowAddress::new_from_parts(*frag_id, local_offset as u32)),
568 "Row id {} in sequence {:?} not found in index {:?}",
569 row_id,
570 sequence,
571 index
572 );
573 }
574 }
575 }
576 }
577}