1use std::ops::Range;
15
16mod bitmap;
20mod encoded_array;
21mod index;
22mod segment;
23mod serde;
24
25use deepsize::DeepSizeOf;
26pub use index::RowIdIndex;
28use lance_core::{utils::mask::RowIdTreeMap, Error, Result};
29use lance_io::ReadBatchParams;
30pub use serde::{read_row_ids, write_row_ids};
31
32use snafu::location;
33
34use segment::U64Segment;
35
36use crate::utils::LanceIteratorExtension;
37
38#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq)]
51pub struct RowIdSequence(Vec<U64Segment>);
52
53impl std::fmt::Display for RowIdSequence {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 let mut iter = self.iter();
56 let mut first_10 = Vec::new();
57 let mut last_10 = Vec::new();
58 for row_id in iter.by_ref() {
59 first_10.push(row_id);
60 if first_10.len() > 10 {
61 break;
62 }
63 }
64
65 while let Some(row_id) = iter.next_back() {
66 last_10.push(row_id);
67 if last_10.len() > 10 {
68 break;
69 }
70 }
71 last_10.reverse();
72
73 let theres_more = iter.next().is_some();
74
75 write!(f, "[")?;
76 for row_id in first_10 {
77 write!(f, "{}", row_id)?;
78 }
79 if theres_more {
80 write!(f, ", ...")?;
81 }
82 for row_id in last_10 {
83 write!(f, ", {}", row_id)?;
84 }
85 write!(f, "]")
86 }
87}
88
89impl From<Range<u64>> for RowIdSequence {
90 fn from(range: Range<u64>) -> Self {
91 Self(vec![U64Segment::Range(range)])
92 }
93}
94
95impl RowIdSequence {
96 pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
97 self.0.iter().flat_map(|segment| segment.iter())
98 }
99
100 pub fn len(&self) -> u64 {
101 self.0.iter().map(|segment| segment.len() as u64).sum()
102 }
103
104 pub fn is_empty(&self) -> bool {
105 self.0.is_empty()
106 }
107
108 pub fn extend(&mut self, other: Self) {
110 if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
114 (self.0.last(), other.0.first())
115 {
116 if range1.end == range2.start {
117 let new_range = U64Segment::Range(range1.start..range2.end);
118 self.0.pop();
119 self.0.push(new_range);
120 self.0.extend(other.0.into_iter().skip(1));
121 return;
122 }
123 }
124 self.0.extend(other.0);
126 }
127
128 pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
130 let (row_ids, offsets) = self.find_ids(row_ids);
132
133 let capacity = self.0.capacity();
134 let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
135 let mut remaining_segments = old_segments.as_slice();
136
137 for (segment_idx, range) in offsets {
138 let segments_handled = old_segments.len() - remaining_segments.len();
139 let segments_to_add = segment_idx - segments_handled;
140 self.0
141 .extend_from_slice(&remaining_segments[..segments_to_add]);
142 remaining_segments = &remaining_segments[segments_to_add..];
143
144 let segment;
145 (segment, remaining_segments) = remaining_segments.split_first().unwrap();
146
147 let segment_ids = &row_ids[range];
148 self.0.push(segment.delete(segment_ids));
149 }
150
151 self.0.extend_from_slice(remaining_segments);
153 }
154
155 pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
157 let mut local_positions = Vec::new();
158 let mut positions_iter = positions.into_iter();
159 let mut curr_position = positions_iter.next();
160 let mut offset = 0;
161 let mut cutoff = 0;
162
163 for segment in &mut self.0 {
164 cutoff += segment.len() as u32;
166 while let Some(position) = curr_position {
167 if position >= cutoff {
168 break;
169 }
170 local_positions.push(position - offset);
171 curr_position = positions_iter.next();
172 }
173
174 if !local_positions.is_empty() {
175 segment.mask(&local_positions);
176 local_positions.clear();
177 }
178 offset = cutoff;
179 }
180
181 self.0.retain(|segment| segment.len() != 0);
182
183 Ok(())
184 }
185
186 fn find_ids(
192 &self,
193 row_ids: impl IntoIterator<Item = u64>,
194 ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
195 let mut segment_iter = self.0.iter().enumerate().cycle();
199
200 let mut segment_matches = vec![Vec::new(); self.0.len()];
201
202 row_ids.into_iter().for_each(|row_id| {
203 let mut i = 0;
204 while i < self.0.len() {
206 let (segment_idx, segment) = segment_iter.next().unwrap();
207 if segment.range().is_some_and(|range| range.contains(&row_id)) {
208 if let Some(offset) = segment.position(row_id) {
209 segment_matches.get_mut(segment_idx).unwrap().push(offset);
210 }
211 }
213 i += 1;
214 }
215 });
216 for matches in &mut segment_matches {
217 matches.sort_unstable();
218 }
219
220 let mut offset = 0;
221 let segment_ranges = segment_matches
222 .iter()
223 .enumerate()
224 .filter(|(_, matches)| !matches.is_empty())
225 .map(|(segment_idx, matches)| {
226 let range = offset..offset + matches.len();
227 offset += matches.len();
228 (segment_idx, range)
229 })
230 .collect();
231 let row_ids = segment_matches
232 .into_iter()
233 .enumerate()
234 .flat_map(|(segment_idx, offset)| {
235 offset
236 .into_iter()
237 .map(move |offset| self.0[segment_idx].get(offset).unwrap())
238 })
239 .collect();
240
241 (row_ids, segment_ranges)
242 }
243
244 pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
245 if len == 0 {
246 return RowIdSeqSlice {
247 segments: &[],
248 offset_start: 0,
249 offset_last: 0,
250 };
251 }
252
253 let mut offset_start = offset;
255 let mut segment_offset = 0;
256 for segment in &self.0 {
257 let segment_len = segment.len();
258 if offset_start < segment_len {
259 break;
260 }
261 offset_start -= segment_len;
262 segment_offset += 1;
263 }
264
265 let mut offset_last = offset_start + len;
267 let mut segment_offset_last = segment_offset;
268 for segment in &self.0[segment_offset..] {
269 let segment_len = segment.len();
270 if offset_last <= segment_len {
271 break;
272 }
273 offset_last -= segment_len;
274 segment_offset_last += 1;
275 }
276
277 RowIdSeqSlice {
278 segments: &self.0[segment_offset..=segment_offset_last],
279 offset_start,
280 offset_last,
281 }
282 }
283
284 pub fn get(&self, index: usize) -> Option<u64> {
288 let mut offset = 0;
289 for segment in &self.0 {
290 let segment_len = segment.len();
291 if index < offset + segment_len {
292 return segment.get(index - offset);
293 }
294 offset += segment_len;
295 }
296 None
297 }
298}
299
300impl From<&RowIdSequence> for RowIdTreeMap {
301 fn from(row_ids: &RowIdSequence) -> Self {
302 let mut tree_map = Self::new();
303 for segment in &row_ids.0 {
304 match segment {
305 U64Segment::Range(range) => {
306 tree_map.insert_range(range.clone());
307 }
308 U64Segment::RangeWithBitmap { range, bitmap } => {
309 tree_map.insert_range(range.clone());
310 for (i, val) in range.clone().enumerate() {
311 if !bitmap.get(i) {
312 tree_map.remove(val);
313 }
314 }
315 }
316 U64Segment::RangeWithHoles { range, holes } => {
317 tree_map.insert_range(range.clone());
318 for hole in holes.iter() {
319 tree_map.remove(hole);
320 }
321 }
322 U64Segment::SortedArray(array) | U64Segment::Array(array) => {
323 for val in array.iter() {
324 tree_map.insert(val);
325 }
326 }
327 }
328 }
329 tree_map
330 }
331}
332
333#[derive(Debug)]
334pub struct RowIdSeqSlice<'a> {
335 segments: &'a [U64Segment],
337 offset_start: usize,
339 offset_last: usize,
341}
342
343impl RowIdSeqSlice<'_> {
344 pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
345 let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
346 known_size -= self.offset_start;
347 known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
348
349 let end = self.segments.len().saturating_sub(1);
350 self.segments
351 .iter()
352 .enumerate()
353 .flat_map(move |(i, segment)| {
354 match i {
355 0 if self.segments.len() == 1 => {
356 let len = self.offset_last - self.offset_start;
357 Box::new(segment.iter().skip(self.offset_start).take(len))
360 as Box<dyn Iterator<Item = u64>>
361 }
362 0 => Box::new(segment.iter().skip(self.offset_start)),
363 i if i == end => Box::new(segment.iter().take(self.offset_last)),
364 _ => Box::new(segment.iter()),
365 }
366 })
367 .exact_size(known_size)
368 }
369}
370
371pub fn rechunk_sequences(
378 sequences: impl IntoIterator<Item = RowIdSequence>,
379 chunk_sizes: impl IntoIterator<Item = u64>,
380) -> Result<Vec<RowIdSequence>> {
381 let chunk_size_iter = chunk_sizes.into_iter();
383 let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
384 let mut segment_iter = sequences
385 .into_iter()
386 .flat_map(|sequence| sequence.0.into_iter())
387 .peekable();
388
389 let mut segment_offset = 0_u64;
390 for chunk_size in chunk_size_iter {
391 let mut sequence = RowIdSequence(Vec::new());
392 let mut remaining = chunk_size;
393
394 let too_many_segments_error = || {
395 Error::invalid_input(
396 "Got too many segments for the provided chunk lengths",
397 location!(),
398 )
399 };
400
401 while remaining > 0 {
402 let remaining_in_segment = segment_iter
403 .peek()
404 .map_or(0, |segment| segment.len() as u64 - segment_offset);
405 match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
406 (std::cmp::Ordering::Greater, _) => {
407 let segment = segment_iter
409 .peek()
410 .ok_or_else(too_many_segments_error)?
411 .slice(segment_offset as usize, remaining as usize);
412 sequence.extend(RowIdSequence(vec![segment]));
413 segment_offset += remaining;
414 remaining = 0;
415 }
416 (_, 0) => {
417 let segment = segment_iter.next().ok_or_else(too_many_segments_error)?;
419 sequence.extend(RowIdSequence(vec![segment]));
420 remaining = 0;
421 }
422 (_, _) => {
423 let segment = segment_iter
425 .next()
426 .ok_or_else(too_many_segments_error)?
427 .slice(segment_offset as usize, remaining_in_segment as usize);
428 sequence.extend(RowIdSequence(vec![segment]));
429 segment_offset = 0;
430 remaining -= remaining_in_segment;
431 }
432 }
433 }
434
435 chunked_sequences.push(sequence);
436 }
437
438 if segment_iter.peek().is_some() {
439 return Err(Error::invalid_input(
440 "Got too few segments for the provided chunk lengths",
441 location!(),
442 ));
443 }
444
445 Ok(chunked_sequences)
446}
447
448pub fn select_row_ids<'a>(
450 sequence: &'a RowIdSequence,
451 offsets: &'a ReadBatchParams,
452) -> Result<Vec<u64>> {
453 let out_of_bounds_err = |offset: u32| {
454 Error::invalid_input(
455 format!(
456 "Index out of bounds: {} for sequence of length {}",
457 offset,
458 sequence.len()
459 ),
460 location!(),
461 )
462 };
463
464 match offsets {
465 ReadBatchParams::Indices(indices) => indices
467 .values()
468 .iter()
469 .map(|index| {
470 sequence
471 .get(*index as usize)
472 .ok_or_else(|| out_of_bounds_err(*index))
473 })
474 .collect(),
475 ReadBatchParams::Range(range) => {
476 if range.end > sequence.len() as usize {
477 return Err(out_of_bounds_err(range.end as u32));
478 }
479 let sequence = sequence.slice(range.start, range.end - range.start);
480 Ok(sequence.iter().collect())
481 }
482 ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
483 ReadBatchParams::RangeTo(to) => {
484 if to.end > sequence.len() as usize {
485 return Err(out_of_bounds_err(to.end as u32));
486 }
487 let len = to.end;
488 let sequence = sequence.slice(0, len);
489 Ok(sequence.iter().collect())
490 }
491 ReadBatchParams::RangeFrom(from) => {
492 let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
493 Ok(sequence.iter().collect())
494 }
495 }
496}
497
498#[cfg(test)]
499mod test {
500 use super::*;
501
502 use pretty_assertions::assert_eq;
503 use test::bitmap::Bitmap;
504
505 #[test]
506 fn test_row_id_sequence_from_range() {
507 let sequence = RowIdSequence::from(0..10);
508 assert_eq!(sequence.len(), 10);
509 assert_eq!(sequence.is_empty(), false);
510
511 let iter = sequence.iter();
512 assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
513 }
514
515 #[test]
516 fn test_row_id_sequence_extend() {
517 let mut sequence = RowIdSequence::from(0..10);
518 sequence.extend(RowIdSequence::from(10..20));
519 assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
520
521 let mut sequence = RowIdSequence::from(0..10);
522 sequence.extend(RowIdSequence::from(20..30));
523 assert_eq!(
524 sequence.0,
525 vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
526 );
527 }
528
529 #[test]
530 fn test_row_id_sequence_delete() {
531 let mut sequence = RowIdSequence::from(0..10);
532 sequence.delete(vec![1, 3, 5, 7, 9]);
533 let mut expected_bitmap = Bitmap::new_empty(9);
534 for i in [0, 2, 4, 6, 8] {
535 expected_bitmap.set(i as usize);
536 }
537 assert_eq!(
538 sequence.0,
539 vec![U64Segment::RangeWithBitmap {
540 range: 0..9,
541 bitmap: expected_bitmap
542 },]
543 );
544
545 let mut sequence = RowIdSequence::from(0..10);
546 sequence.extend(RowIdSequence::from(12..20));
547 sequence.delete(vec![0, 9, 10, 11, 12, 13]);
548 assert_eq!(
549 sequence.0,
550 vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
551 );
552
553 let mut sequence = RowIdSequence::from(0..10);
554 sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
555 assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
556 }
557
558 #[test]
559 fn test_row_id_slice() {
560 let sequence = RowIdSequence(vec![
563 U64Segment::Range(30..35), U64Segment::RangeWithHoles {
565 range: 50..60,
567 holes: vec![53, 54].into(),
568 },
569 U64Segment::SortedArray(vec![7, 9].into()), U64Segment::RangeWithBitmap {
571 range: 0..5,
572 bitmap: [true, false, true, false, true].as_slice().into(),
573 },
574 U64Segment::Array(vec![35, 39].into()),
575 U64Segment::Range(40..50),
576 ]);
577
578 for offset in 0..sequence.len() as usize {
580 for len in 0..sequence.len() as usize {
581 if offset + len > sequence.len() as usize {
582 continue;
583 }
584 let slice = sequence.slice(offset, len);
585
586 let actual = slice.iter().collect::<Vec<_>>();
587 let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
588 assert_eq!(
589 actual, expected,
590 "Failed for offset {} and len {}",
591 offset, len
592 );
593
594 let (claimed_size, claimed_max) = slice.iter().size_hint();
595 assert_eq!(claimed_max, Some(claimed_size)); assert_eq!(claimed_size, actual.len()); }
598 }
599 }
600
601 #[test]
602 fn test_row_id_slice_empty() {
603 let sequence = RowIdSequence::from(0..10);
604 let slice = sequence.slice(10, 0);
605 assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
606 }
607
608 #[test]
609 fn test_row_id_sequence_rechunk() {
610 fn assert_rechunked(
611 input: Vec<RowIdSequence>,
612 chunk_sizes: Vec<u64>,
613 expected: Vec<RowIdSequence>,
614 ) {
615 let chunked = rechunk_sequences(input, chunk_sizes).unwrap();
616 assert_eq!(chunked, expected);
617 }
618
619 let many_segments = vec![
621 RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
622 RowIdSequence::from(10..18),
623 RowIdSequence::from(18..28),
624 RowIdSequence::from(28..30),
625 ];
626 let fewer_segments = vec![
627 RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
628 RowIdSequence::from(10..30),
629 ];
630 assert_rechunked(
631 many_segments.clone(),
632 fewer_segments.iter().map(|seq| seq.len()).collect(),
633 fewer_segments.clone(),
634 );
635
636 assert_rechunked(
638 fewer_segments,
639 many_segments.iter().map(|seq| seq.len()).collect(),
640 many_segments.clone(),
641 );
642
643 assert_rechunked(
645 many_segments.clone(),
646 many_segments.iter().map(|seq| seq.len()).collect(),
647 many_segments.clone(),
648 );
649
650 let result = rechunk_sequences(many_segments.clone(), vec![100]);
652 assert!(result.is_err());
653
654 let result = rechunk_sequences(many_segments, vec![5]);
656 assert!(result.is_err());
657 }
658
659 #[test]
660 fn test_select_row_ids() {
661 let offsets = [
663 ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
664 ReadBatchParams::Range(2..8),
665 ReadBatchParams::RangeFull,
666 ReadBatchParams::RangeTo(..5),
667 ReadBatchParams::RangeFrom(5..),
668 ];
669
670 let sequences = [
673 RowIdSequence(vec![
674 U64Segment::Range(0..5),
675 U64Segment::RangeWithHoles {
676 range: 50..60,
677 holes: vec![53, 54].into(),
678 },
679 U64Segment::SortedArray(vec![7, 9].into()),
680 ]),
681 RowIdSequence(vec![
682 U64Segment::RangeWithBitmap {
683 range: 0..5,
684 bitmap: [true, false, true, false, true].as_slice().into(),
685 },
686 U64Segment::Array(vec![30, 20, 10].into()),
687 U64Segment::Range(40..50),
688 ]),
689 ];
690
691 for params in offsets {
692 for sequence in &sequences {
693 let row_ids = select_row_ids(sequence, ¶ms).unwrap();
694 let flat_sequence = sequence.iter().collect::<Vec<_>>();
695
696 let selection: Vec<usize> = match ¶ms {
698 ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
699 ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
700 ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
701 ReadBatchParams::Range(range) => range.clone().collect(),
702 ReadBatchParams::Indices(indices) => {
703 indices.values().iter().map(|i| *i as usize).collect()
704 }
705 };
706
707 let expected = selection
708 .into_iter()
709 .map(|i| flat_sequence[i])
710 .collect::<Vec<_>>();
711 assert_eq!(
712 row_ids, expected,
713 "Failed for params {:?} on the sequence {:?}",
714 ¶ms, sequence
715 );
716 }
717 }
718 }
719
720 #[test]
721 fn test_select_row_ids_out_of_bounds() {
722 let offsets = [
723 ReadBatchParams::Indices(vec![1, 1000, 4].into()),
724 ReadBatchParams::Range(2..1000),
725 ReadBatchParams::RangeTo(..1000),
726 ];
727
728 let sequence = RowIdSequence::from(0..10);
729
730 for params in offsets {
731 let result = select_row_ids(&sequence, ¶ms);
732 assert!(result.is_err());
733 assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
734 }
735 }
736
737 #[test]
738 fn test_row_id_sequence_to_treemap() {
739 let sequence = RowIdSequence(vec![
740 U64Segment::Range(0..5),
741 U64Segment::RangeWithHoles {
742 range: 50..60,
743 holes: vec![53, 54].into(),
744 },
745 U64Segment::SortedArray(vec![7, 9].into()),
746 U64Segment::RangeWithBitmap {
747 range: 10..15,
748 bitmap: [true, false, true, false, true].as_slice().into(),
749 },
750 U64Segment::Array(vec![35, 39].into()),
751 U64Segment::Range(40..50),
752 ]);
753
754 let tree_map = RowIdTreeMap::from(&sequence);
755 let expected = vec![
756 0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
757 51, 52, 55, 56, 57, 58, 59,
758 ]
759 .into_iter()
760 .collect::<RowIdTreeMap>();
761 assert_eq!(tree_map, expected);
762 }
763
764 #[test]
765 fn test_row_id_mask() {
766 let sequence = RowIdSequence(vec![
772 U64Segment::Range(0..5),
773 U64Segment::RangeWithHoles {
774 range: 50..60,
775 holes: vec![53, 54].into(),
776 },
777 U64Segment::SortedArray(vec![7, 9].into()),
778 U64Segment::RangeWithBitmap {
779 range: 10..15,
780 bitmap: [true, false, true, false, true].as_slice().into(),
781 },
782 U64Segment::Array(vec![35, 39].into()),
783 ]);
784
785 let values_to_remove = [4, 55, 7, 12, 39];
787 let positions_to_remove = sequence
788 .iter()
789 .enumerate()
790 .filter_map(|(i, val)| {
791 if values_to_remove.contains(&val) {
792 Some(i as u32)
793 } else {
794 None
795 }
796 })
797 .collect::<Vec<_>>();
798 let mut sequence = sequence;
799 sequence.mask(positions_to_remove).unwrap();
800 let expected = RowIdSequence(vec![
801 U64Segment::Range(0..4),
802 U64Segment::RangeWithBitmap {
803 range: 50..60,
804 bitmap: [
805 true, true, true, false, false, false, true, true, true, true,
806 ]
807 .as_slice()
808 .into(),
809 },
810 U64Segment::Range(9..10),
811 U64Segment::RangeWithBitmap {
812 range: 10..15,
813 bitmap: [true, false, false, false, true].as_slice().into(),
814 },
815 U64Segment::Array(vec![35].into()),
816 ]);
817 assert_eq!(sequence, expected);
818 }
819
820 #[test]
821 fn test_row_id_mask_everything() {
822 let mut sequence = RowIdSequence(vec![
823 U64Segment::Range(0..5),
824 U64Segment::SortedArray(vec![7, 9].into()),
825 ]);
826 sequence.mask(0..sequence.len() as u32).unwrap();
827 let expected = RowIdSequence(vec![]);
828 assert_eq!(sequence, expected);
829 }
830}