lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15
16// TODO: replace this with Arrow BooleanBuffer.
17
18// These are all internal data structures, and are private.
19mod bitmap;
20mod encoded_array;
21mod index;
22pub mod segment;
23mod serde;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::RowIdIndex;
28use lance_core::{
29    utils::mask::{RowIdMask, RowIdTreeMap},
30    Error, Result,
31};
32use lance_io::ReadBatchParams;
33pub use serde::{read_row_ids, write_row_ids};
34
35use snafu::location;
36
37use segment::U64Segment;
38use tracing::instrument;
39
40use crate::utils::LanceIteratorExtension;
41
42/// A sequence of row ids.
43///
44/// Row ids are u64s that:
45///
46/// 1. Are **unique** within a table (except for tombstones)
47/// 2. Are *often* but not always sorted and/or contiguous.
48///
49/// This sequence of row ids is optimized to be compact when the row ids are
50/// contiguous and sorted. However, it does not require that the row ids are
51/// contiguous or sorted.
52///
53/// We can make optimizations that assume uniqueness.
54#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq)]
55pub struct RowIdSequence(Vec<U64Segment>);
56
57impl std::fmt::Display for RowIdSequence {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        let mut iter = self.iter();
60        let mut first_10 = Vec::new();
61        let mut last_10 = Vec::new();
62        for row_id in iter.by_ref() {
63            first_10.push(row_id);
64            if first_10.len() > 10 {
65                break;
66            }
67        }
68
69        while let Some(row_id) = iter.next_back() {
70            last_10.push(row_id);
71            if last_10.len() > 10 {
72                break;
73            }
74        }
75        last_10.reverse();
76
77        let theres_more = iter.next().is_some();
78
79        write!(f, "[")?;
80        for row_id in first_10 {
81            write!(f, "{}", row_id)?;
82        }
83        if theres_more {
84            write!(f, ", ...")?;
85        }
86        for row_id in last_10 {
87            write!(f, ", {}", row_id)?;
88        }
89        write!(f, "]")
90    }
91}
92
93impl From<Range<u64>> for RowIdSequence {
94    fn from(range: Range<u64>) -> Self {
95        Self(vec![U64Segment::Range(range)])
96    }
97}
98
99impl RowIdSequence {
100    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
101        self.0.iter().flat_map(|segment| segment.iter())
102    }
103
104    pub fn len(&self) -> u64 {
105        self.0.iter().map(|segment| segment.len() as u64).sum()
106    }
107
108    pub fn is_empty(&self) -> bool {
109        self.0.is_empty()
110    }
111
112    /// Combines this row id sequence with another row id sequence.
113    pub fn extend(&mut self, other: Self) {
114        // If the last element of this sequence and the first element of next
115        // sequence are ranges, we might be able to combine them into a single
116        // range.
117        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
118            (self.0.last(), other.0.first())
119        {
120            if range1.end == range2.start {
121                let new_range = U64Segment::Range(range1.start..range2.end);
122                self.0.pop();
123                self.0.push(new_range);
124                self.0.extend(other.0.into_iter().skip(1));
125                return;
126            }
127        }
128        // TODO: add other optimizations, such as combining two RangeWithHoles.
129        self.0.extend(other.0);
130    }
131
132    /// Remove a set of row ids from the sequence.
133    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
134        // Order the row ids by position in which they appear in the sequence.
135        let (row_ids, offsets) = self.find_ids(row_ids);
136
137        let capacity = self.0.capacity();
138        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
139        let mut remaining_segments = old_segments.as_slice();
140
141        for (segment_idx, range) in offsets {
142            let segments_handled = old_segments.len() - remaining_segments.len();
143            let segments_to_add = segment_idx - segments_handled;
144            self.0
145                .extend_from_slice(&remaining_segments[..segments_to_add]);
146            remaining_segments = &remaining_segments[segments_to_add..];
147
148            let segment;
149            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
150
151            let segment_ids = &row_ids[range];
152            self.0.push(segment.delete(segment_ids));
153        }
154
155        // Add the remaining segments.
156        self.0.extend_from_slice(remaining_segments);
157    }
158
159    /// Delete row ids by position.
160    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
161        let mut local_positions = Vec::new();
162        let mut positions_iter = positions.into_iter();
163        let mut curr_position = positions_iter.next();
164        let mut offset = 0;
165        let mut cutoff = 0;
166
167        for segment in &mut self.0 {
168            // Make vector of local positions
169            cutoff += segment.len() as u32;
170            while let Some(position) = curr_position {
171                if position >= cutoff {
172                    break;
173                }
174                local_positions.push(position - offset);
175                curr_position = positions_iter.next();
176            }
177
178            if !local_positions.is_empty() {
179                segment.mask(&local_positions);
180                local_positions.clear();
181            }
182            offset = cutoff;
183        }
184
185        self.0.retain(|segment| !segment.is_empty());
186
187        Ok(())
188    }
189
190    /// Find the row ids in the sequence.
191    ///
192    /// Returns the row ids sorted by their appearance in the sequence.
193    /// Also returns the segment index and the range where that segment's
194    /// row id matches are found in the returned row id vector.
195    fn find_ids(
196        &self,
197        row_ids: impl IntoIterator<Item = u64>,
198    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
199        // Often, the row ids will already be provided in the order they appear.
200        // So the optimal way to search will be to cycle through rather than
201        // restarting the search from the beginning each time.
202        let mut segment_iter = self.0.iter().enumerate().cycle();
203
204        let mut segment_matches = vec![Vec::new(); self.0.len()];
205
206        row_ids.into_iter().for_each(|row_id| {
207            let mut i = 0;
208            // If we've cycled through all segments, we know the row id is not in the sequence.
209            while i < self.0.len() {
210                let (segment_idx, segment) = segment_iter.next().unwrap();
211                if segment.range().is_some_and(|range| range.contains(&row_id)) {
212                    if let Some(offset) = segment.position(row_id) {
213                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
214                    }
215                    // The row id was not found it the segment. It might be in a later segment.
216                }
217                i += 1;
218            }
219        });
220        for matches in &mut segment_matches {
221            matches.sort_unstable();
222        }
223
224        let mut offset = 0;
225        let segment_ranges = segment_matches
226            .iter()
227            .enumerate()
228            .filter(|(_, matches)| !matches.is_empty())
229            .map(|(segment_idx, matches)| {
230                let range = offset..offset + matches.len();
231                offset += matches.len();
232                (segment_idx, range)
233            })
234            .collect();
235        let row_ids = segment_matches
236            .into_iter()
237            .enumerate()
238            .flat_map(|(segment_idx, offset)| {
239                offset
240                    .into_iter()
241                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
242            })
243            .collect();
244
245        (row_ids, segment_ranges)
246    }
247
248    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
249        if len == 0 {
250            return RowIdSeqSlice {
251                segments: &[],
252                offset_start: 0,
253                offset_last: 0,
254            };
255        }
256
257        // Find the starting position
258        let mut offset_start = offset;
259        let mut segment_offset = 0;
260        for segment in &self.0 {
261            let segment_len = segment.len();
262            if offset_start < segment_len {
263                break;
264            }
265            offset_start -= segment_len;
266            segment_offset += 1;
267        }
268
269        // Find the ending position
270        let mut offset_last = offset_start + len;
271        let mut segment_offset_last = segment_offset;
272        for segment in &self.0[segment_offset..] {
273            let segment_len = segment.len();
274            if offset_last <= segment_len {
275                break;
276            }
277            offset_last -= segment_len;
278            segment_offset_last += 1;
279        }
280
281        RowIdSeqSlice {
282            segments: &self.0[segment_offset..=segment_offset_last],
283            offset_start,
284            offset_last,
285        }
286    }
287
288    /// Get the row id at the given index.
289    ///
290    /// If the index is out of bounds, this will return None.
291    pub fn get(&self, index: usize) -> Option<u64> {
292        let mut offset = 0;
293        for segment in &self.0 {
294            let segment_len = segment.len();
295            if index < offset + segment_len {
296                return segment.get(index - offset);
297            }
298            offset += segment_len;
299        }
300        None
301    }
302
303    /// Get row ids from the sequence based on the provided _sorted_ offsets
304    ///
305    /// Any out of bounds offsets will be ignored
306    ///
307    /// # Panics
308    ///
309    /// If the input selection is not sorted, this function will panic
310    pub fn select<'a>(
311        &'a self,
312        selection: impl Iterator<Item = usize> + 'a,
313    ) -> impl Iterator<Item = u64> + 'a {
314        let mut seg_iter = self.0.iter();
315        let mut cur_seg = seg_iter.next();
316        let mut rows_passed = 0;
317        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
318        let mut last_index = 0;
319        selection.filter_map(move |index| {
320            if index < last_index {
321                panic!("Selection is not sorted");
322            }
323            last_index = index;
324
325            cur_seg?;
326
327            while (index - rows_passed) >= cur_seg_len {
328                rows_passed += cur_seg_len;
329                cur_seg = seg_iter.next();
330                if let Some(cur_seg) = cur_seg {
331                    cur_seg_len = cur_seg.len();
332                } else {
333                    return None;
334                }
335            }
336
337            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
338        })
339    }
340
341    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
342    /// in the sequence.
343    ///
344    /// For example, given a mask that selects all even ids and a sequence that is
345    /// [80..85, 86..90, 14]
346    ///
347    /// this will return [0, 2, 4, 5, 7, 9]
348    /// because the range expands to
349    ///
350    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
351    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
352    ///
353    /// This function is useful when determining which row offsets to read from a fragment given
354    /// a mask.
355    #[instrument(level = "debug", skip_all)]
356    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
357        let mut offset = 0;
358        let mut ranges = Vec::new();
359        for segment in &self.0 {
360            match segment {
361                U64Segment::Range(range) => {
362                    let mut ids = RowIdTreeMap::from(range.clone());
363                    ids.mask(mask);
364                    ranges.extend(GroupingIterator::new(
365                        unsafe { ids.into_id_iter() }.map(|id| id - range.start + offset),
366                    ));
367                    offset += range.end - range.start;
368                }
369                U64Segment::RangeWithHoles { range, holes } => {
370                    let offset_start = offset;
371                    let mut ids = RowIdTreeMap::from(range.clone());
372                    offset += range.end - range.start;
373                    for hole in holes.iter() {
374                        if ids.remove(hole) {
375                            offset -= 1;
376                        }
377                    }
378                    ids.mask(mask);
379
380                    // Sadly we can't just subtract the offset because of the holes
381                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
382                    sorted_holes.sort_unstable();
383                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
384                    let mut holes_passed = 0;
385                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
386                        |id| {
387                            while let Some(next_hole) = next_holes_iter.peek() {
388                                if *next_hole < id {
389                                    next_holes_iter.next();
390                                    holes_passed += 1;
391                                } else {
392                                    break;
393                                }
394                            }
395                            id - range.start + offset_start - holes_passed
396                        },
397                    )));
398                }
399                U64Segment::RangeWithBitmap { range, bitmap } => {
400                    let mut ids = RowIdTreeMap::from(range.clone());
401                    let offset_start = offset;
402                    offset += range.end - range.start;
403                    for (i, val) in range.clone().enumerate() {
404                        if !bitmap.get(i) && ids.remove(val) {
405                            offset -= 1;
406                        }
407                    }
408                    ids.mask(mask);
409                    let mut bitmap_iter = bitmap.iter();
410                    let mut bitmap_iter_pos = 0;
411                    let mut holes_passed = 0;
412                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
413                        |id| {
414                            let offset_no_holes = id - range.start + offset_start;
415                            while bitmap_iter_pos < offset_no_holes {
416                                if !bitmap_iter.next().unwrap() {
417                                    holes_passed += 1;
418                                }
419                                bitmap_iter_pos += 1;
420                            }
421                            offset_no_holes - holes_passed
422                        },
423                    )));
424                }
425                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
426                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
427                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
428                        |(off, id)| {
429                            if mask.selected(id) {
430                                Some(off as u64 + offset)
431                            } else {
432                                None
433                            }
434                        },
435                    )));
436                    offset += array.len() as u64;
437                }
438            }
439        }
440        ranges
441    }
442}
443
444/// An iterator that groups row ids into ranges
445///
446/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
447/// this will return an iterator of [(1..4), (5..8), (10..13)]
448struct GroupingIterator<I: Iterator<Item = u64>> {
449    iter: I,
450    cur_range: Option<Range<u64>>,
451}
452
453impl<I: Iterator<Item = u64>> GroupingIterator<I> {
454    fn new(iter: I) -> Self {
455        Self {
456            iter,
457            cur_range: None,
458        }
459    }
460}
461
462impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
463    type Item = Range<u64>;
464
465    fn next(&mut self) -> Option<Self::Item> {
466        for id in self.iter.by_ref() {
467            if let Some(range) = self.cur_range.as_mut() {
468                if range.end == id {
469                    range.end = id + 1;
470                } else {
471                    let ret = Some(range.clone());
472                    self.cur_range = Some(id..id + 1);
473                    return ret;
474                }
475            } else {
476                self.cur_range = Some(id..id + 1);
477            }
478        }
479        self.cur_range.take()
480    }
481}
482
483impl From<&RowIdSequence> for RowIdTreeMap {
484    fn from(row_ids: &RowIdSequence) -> Self {
485        let mut tree_map = Self::new();
486        for segment in &row_ids.0 {
487            match segment {
488                U64Segment::Range(range) => {
489                    tree_map.insert_range(range.clone());
490                }
491                U64Segment::RangeWithBitmap { range, bitmap } => {
492                    tree_map.insert_range(range.clone());
493                    for (i, val) in range.clone().enumerate() {
494                        if !bitmap.get(i) {
495                            tree_map.remove(val);
496                        }
497                    }
498                }
499                U64Segment::RangeWithHoles { range, holes } => {
500                    tree_map.insert_range(range.clone());
501                    for hole in holes.iter() {
502                        tree_map.remove(hole);
503                    }
504                }
505                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
506                    for val in array.iter() {
507                        tree_map.insert(val);
508                    }
509                }
510            }
511        }
512        tree_map
513    }
514}
515
516#[derive(Debug)]
517pub struct RowIdSeqSlice<'a> {
518    /// Current slice of the segments we cover
519    segments: &'a [U64Segment],
520    /// Offset into the first segment to start iterating from
521    offset_start: usize,
522    /// Offset into the last segment to stop iterating at
523    offset_last: usize,
524}
525
526impl RowIdSeqSlice<'_> {
527    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
528        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
529        known_size -= self.offset_start;
530        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
531
532        let end = self.segments.len().saturating_sub(1);
533        self.segments
534            .iter()
535            .enumerate()
536            .flat_map(move |(i, segment)| {
537                match i {
538                    0 if self.segments.len() == 1 => {
539                        let len = self.offset_last - self.offset_start;
540                        // TODO: Optimize this so we don't have to use skip
541                        // (take is probably fine though.)
542                        Box::new(segment.iter().skip(self.offset_start).take(len))
543                            as Box<dyn Iterator<Item = u64>>
544                    }
545                    0 => Box::new(segment.iter().skip(self.offset_start)),
546                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
547                    _ => Box::new(segment.iter()),
548                }
549            })
550            .exact_size(known_size)
551    }
552}
553
554/// Re-chunk a sequences of row ids into chunks of a given size.
555///
556/// # Errors
557///
558/// Will return an error if the sum of the chunk sizes is not equal to the total
559/// number of row ids in the sequences.
560pub fn rechunk_sequences(
561    sequences: impl IntoIterator<Item = RowIdSequence>,
562    chunk_sizes: impl IntoIterator<Item = u64>,
563) -> Result<Vec<RowIdSequence>> {
564    // TODO: return an iterator. (with a good size hint?)
565    let chunk_size_iter = chunk_sizes.into_iter();
566    let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
567    let mut segment_iter = sequences
568        .into_iter()
569        .flat_map(|sequence| sequence.0.into_iter())
570        .peekable();
571
572    let mut segment_offset = 0_u64;
573    for chunk_size in chunk_size_iter {
574        let mut sequence = RowIdSequence(Vec::new());
575        let mut remaining = chunk_size;
576
577        let too_many_segments_error = || {
578            Error::invalid_input(
579                "Got too many segments for the provided chunk lengths",
580                location!(),
581            )
582        };
583
584        while remaining > 0 {
585            let remaining_in_segment = segment_iter
586                .peek()
587                .map_or(0, |segment| segment.len() as u64 - segment_offset);
588            match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
589                (std::cmp::Ordering::Greater, _) => {
590                    // Can only push part of the segment, we are done with this chunk.
591                    let segment = segment_iter
592                        .peek()
593                        .ok_or_else(too_many_segments_error)?
594                        .slice(segment_offset as usize, remaining as usize);
595                    sequence.extend(RowIdSequence(vec![segment]));
596                    segment_offset += remaining;
597                    remaining = 0;
598                }
599                (_, 0) => {
600                    // Can push the entire segment.
601                    let segment = segment_iter.next().ok_or_else(too_many_segments_error)?;
602                    sequence.extend(RowIdSequence(vec![segment]));
603                    remaining = 0;
604                }
605                (_, _) => {
606                    // Push remaining segment
607                    let segment = segment_iter
608                        .next()
609                        .ok_or_else(too_many_segments_error)?
610                        .slice(segment_offset as usize, remaining_in_segment as usize);
611                    sequence.extend(RowIdSequence(vec![segment]));
612                    segment_offset = 0;
613                    remaining -= remaining_in_segment;
614                }
615            }
616        }
617
618        chunked_sequences.push(sequence);
619    }
620
621    if segment_iter.peek().is_some() {
622        return Err(Error::invalid_input(
623            "Got too few segments for the provided chunk lengths",
624            location!(),
625        ));
626    }
627
628    Ok(chunked_sequences)
629}
630
631/// Selects the row ids from a sequence based on the provided offsets.
632pub fn select_row_ids<'a>(
633    sequence: &'a RowIdSequence,
634    offsets: &'a ReadBatchParams,
635) -> Result<Vec<u64>> {
636    let out_of_bounds_err = |offset: u32| {
637        Error::invalid_input(
638            format!(
639                "Index out of bounds: {} for sequence of length {}",
640                offset,
641                sequence.len()
642            ),
643            location!(),
644        )
645    };
646
647    match offsets {
648        // TODO: Optimize this if indices are sorted, which is a common case.
649        ReadBatchParams::Indices(indices) => indices
650            .values()
651            .iter()
652            .map(|index| {
653                sequence
654                    .get(*index as usize)
655                    .ok_or_else(|| out_of_bounds_err(*index))
656            })
657            .collect(),
658        ReadBatchParams::Range(range) => {
659            if range.end > sequence.len() as usize {
660                return Err(out_of_bounds_err(range.end as u32));
661            }
662            let sequence = sequence.slice(range.start, range.end - range.start);
663            Ok(sequence.iter().collect())
664        }
665        ReadBatchParams::Ranges(ranges) => {
666            let num_rows = ranges
667                .iter()
668                .map(|r| (r.end - r.start) as usize)
669                .sum::<usize>();
670            let mut result = Vec::with_capacity(num_rows);
671            for range in ranges.as_ref() {
672                if range.end > sequence.len() {
673                    return Err(out_of_bounds_err(range.end as u32));
674                }
675                let sequence =
676                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
677                result.extend(sequence.iter());
678            }
679            Ok(result)
680        }
681
682        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
683        ReadBatchParams::RangeTo(to) => {
684            if to.end > sequence.len() as usize {
685                return Err(out_of_bounds_err(to.end as u32));
686            }
687            let len = to.end;
688            let sequence = sequence.slice(0, len);
689            Ok(sequence.iter().collect())
690        }
691        ReadBatchParams::RangeFrom(from) => {
692            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
693            Ok(sequence.iter().collect())
694        }
695    }
696}
697
698#[cfg(test)]
699mod test {
700    use super::*;
701
702    use pretty_assertions::assert_eq;
703    use test::bitmap::Bitmap;
704
705    #[test]
706    fn test_row_id_sequence_from_range() {
707        let sequence = RowIdSequence::from(0..10);
708        assert_eq!(sequence.len(), 10);
709        assert_eq!(sequence.is_empty(), false);
710
711        let iter = sequence.iter();
712        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
713    }
714
715    #[test]
716    fn test_row_id_sequence_extend() {
717        let mut sequence = RowIdSequence::from(0..10);
718        sequence.extend(RowIdSequence::from(10..20));
719        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
720
721        let mut sequence = RowIdSequence::from(0..10);
722        sequence.extend(RowIdSequence::from(20..30));
723        assert_eq!(
724            sequence.0,
725            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
726        );
727    }
728
729    #[test]
730    fn test_row_id_sequence_delete() {
731        let mut sequence = RowIdSequence::from(0..10);
732        sequence.delete(vec![1, 3, 5, 7, 9]);
733        let mut expected_bitmap = Bitmap::new_empty(9);
734        for i in [0, 2, 4, 6, 8] {
735            expected_bitmap.set(i as usize);
736        }
737        assert_eq!(
738            sequence.0,
739            vec![U64Segment::RangeWithBitmap {
740                range: 0..9,
741                bitmap: expected_bitmap
742            },]
743        );
744
745        let mut sequence = RowIdSequence::from(0..10);
746        sequence.extend(RowIdSequence::from(12..20));
747        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
748        assert_eq!(
749            sequence.0,
750            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
751        );
752
753        let mut sequence = RowIdSequence::from(0..10);
754        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
755        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
756    }
757
758    #[test]
759    fn test_row_id_slice() {
760        // The type of sequence isn't that relevant to the implementation, so
761        // we can just have a single one with all the segment types.
762        let sequence = RowIdSequence(vec![
763            U64Segment::Range(30..35), // 5
764            U64Segment::RangeWithHoles {
765                // 8
766                range: 50..60,
767                holes: vec![53, 54].into(),
768            },
769            U64Segment::SortedArray(vec![7, 9].into()), // 2
770            U64Segment::RangeWithBitmap {
771                range: 0..5,
772                bitmap: [true, false, true, false, true].as_slice().into(),
773            },
774            U64Segment::Array(vec![35, 39].into()),
775            U64Segment::Range(40..50),
776        ]);
777
778        // All possible offsets and lengths
779        for offset in 0..sequence.len() as usize {
780            for len in 0..sequence.len() as usize {
781                if offset + len > sequence.len() as usize {
782                    continue;
783                }
784                let slice = sequence.slice(offset, len);
785
786                let actual = slice.iter().collect::<Vec<_>>();
787                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
788                assert_eq!(
789                    actual, expected,
790                    "Failed for offset {} and len {}",
791                    offset, len
792                );
793
794                let (claimed_size, claimed_max) = slice.iter().size_hint();
795                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
796                assert_eq!(claimed_size, actual.len()); // Correct size hint
797            }
798        }
799    }
800
801    #[test]
802    fn test_row_id_slice_empty() {
803        let sequence = RowIdSequence::from(0..10);
804        let slice = sequence.slice(10, 0);
805        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
806    }
807
808    #[test]
809    fn test_row_id_sequence_rechunk() {
810        fn assert_rechunked(
811            input: Vec<RowIdSequence>,
812            chunk_sizes: Vec<u64>,
813            expected: Vec<RowIdSequence>,
814        ) {
815            let chunked = rechunk_sequences(input, chunk_sizes).unwrap();
816            assert_eq!(chunked, expected);
817        }
818
819        // Small pieces to larger ones
820        let many_segments = vec![
821            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
822            RowIdSequence::from(10..18),
823            RowIdSequence::from(18..28),
824            RowIdSequence::from(28..30),
825        ];
826        let fewer_segments = vec![
827            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
828            RowIdSequence::from(10..30),
829        ];
830        assert_rechunked(
831            many_segments.clone(),
832            fewer_segments.iter().map(|seq| seq.len()).collect(),
833            fewer_segments.clone(),
834        );
835
836        // Large pieces to smaller ones
837        assert_rechunked(
838            fewer_segments,
839            many_segments.iter().map(|seq| seq.len()).collect(),
840            many_segments.clone(),
841        );
842
843        // Equal pieces
844        assert_rechunked(
845            many_segments.clone(),
846            many_segments.iter().map(|seq| seq.len()).collect(),
847            many_segments.clone(),
848        );
849
850        // Too few segments -> error
851        let result = rechunk_sequences(many_segments.clone(), vec![100]);
852        assert!(result.is_err());
853
854        // Too many segments -> error
855        let result = rechunk_sequences(many_segments, vec![5]);
856        assert!(result.is_err());
857    }
858
859    #[test]
860    fn test_select_row_ids() {
861        // All forms of offsets
862        let offsets = [
863            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
864            ReadBatchParams::Range(2..8),
865            ReadBatchParams::RangeFull,
866            ReadBatchParams::RangeTo(..5),
867            ReadBatchParams::RangeFrom(5..),
868            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
869        ];
870
871        // Sequences with all segment types. These have at least 10 elements,
872        // so they are valid for all the above offsets.
873        let sequences = [
874            RowIdSequence(vec![
875                U64Segment::Range(0..5),
876                U64Segment::RangeWithHoles {
877                    range: 50..60,
878                    holes: vec![53, 54].into(),
879                },
880                U64Segment::SortedArray(vec![7, 9].into()),
881            ]),
882            RowIdSequence(vec![
883                U64Segment::RangeWithBitmap {
884                    range: 0..5,
885                    bitmap: [true, false, true, false, true].as_slice().into(),
886                },
887                U64Segment::Array(vec![30, 20, 10].into()),
888                U64Segment::Range(40..50),
889            ]),
890        ];
891
892        for params in offsets {
893            for sequence in &sequences {
894                let row_ids = select_row_ids(sequence, &params).unwrap();
895                let flat_sequence = sequence.iter().collect::<Vec<_>>();
896
897                // Transform params into bounded ones
898                let selection: Vec<usize> = match &params {
899                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
900                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
901                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
902                    ReadBatchParams::Range(range) => range.clone().collect(),
903                    ReadBatchParams::Ranges(ranges) => ranges
904                        .iter()
905                        .flat_map(|r| r.start as usize..r.end as usize)
906                        .collect(),
907                    ReadBatchParams::Indices(indices) => {
908                        indices.values().iter().map(|i| *i as usize).collect()
909                    }
910                };
911
912                let expected = selection
913                    .into_iter()
914                    .map(|i| flat_sequence[i])
915                    .collect::<Vec<_>>();
916                assert_eq!(
917                    row_ids, expected,
918                    "Failed for params {:?} on the sequence {:?}",
919                    &params, sequence
920                );
921            }
922        }
923    }
924
925    #[test]
926    fn test_select_row_ids_out_of_bounds() {
927        let offsets = [
928            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
929            ReadBatchParams::Range(2..1000),
930            ReadBatchParams::RangeTo(..1000),
931        ];
932
933        let sequence = RowIdSequence::from(0..10);
934
935        for params in offsets {
936            let result = select_row_ids(&sequence, &params);
937            assert!(result.is_err());
938            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
939        }
940    }
941
942    #[test]
943    fn test_row_id_sequence_to_treemap() {
944        let sequence = RowIdSequence(vec![
945            U64Segment::Range(0..5),
946            U64Segment::RangeWithHoles {
947                range: 50..60,
948                holes: vec![53, 54].into(),
949            },
950            U64Segment::SortedArray(vec![7, 9].into()),
951            U64Segment::RangeWithBitmap {
952                range: 10..15,
953                bitmap: [true, false, true, false, true].as_slice().into(),
954            },
955            U64Segment::Array(vec![35, 39].into()),
956            U64Segment::Range(40..50),
957        ]);
958
959        let tree_map = RowIdTreeMap::from(&sequence);
960        let expected = vec![
961            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
962            51, 52, 55, 56, 57, 58, 59,
963        ]
964        .into_iter()
965        .collect::<RowIdTreeMap>();
966        assert_eq!(tree_map, expected);
967    }
968
969    #[test]
970    fn test_row_id_mask() {
971        // 0, 1, 2, 3, 4
972        // 50, 51, 52, 55, 56, 57, 58, 59
973        // 7, 9
974        // 10, 12, 14
975        // 35, 39
976        let sequence = RowIdSequence(vec![
977            U64Segment::Range(0..5),
978            U64Segment::RangeWithHoles {
979                range: 50..60,
980                holes: vec![53, 54].into(),
981            },
982            U64Segment::SortedArray(vec![7, 9].into()),
983            U64Segment::RangeWithBitmap {
984                range: 10..15,
985                bitmap: [true, false, true, false, true].as_slice().into(),
986            },
987            U64Segment::Array(vec![35, 39].into()),
988        ]);
989
990        // Masking one in each segment
991        let values_to_remove = [4, 55, 7, 12, 39];
992        let positions_to_remove = sequence
993            .iter()
994            .enumerate()
995            .filter_map(|(i, val)| {
996                if values_to_remove.contains(&val) {
997                    Some(i as u32)
998                } else {
999                    None
1000                }
1001            })
1002            .collect::<Vec<_>>();
1003        let mut sequence = sequence;
1004        sequence.mask(positions_to_remove).unwrap();
1005        let expected = RowIdSequence(vec![
1006            U64Segment::Range(0..4),
1007            U64Segment::RangeWithBitmap {
1008                range: 50..60,
1009                bitmap: [
1010                    true, true, true, false, false, false, true, true, true, true,
1011                ]
1012                .as_slice()
1013                .into(),
1014            },
1015            U64Segment::Range(9..10),
1016            U64Segment::RangeWithBitmap {
1017                range: 10..15,
1018                bitmap: [true, false, false, false, true].as_slice().into(),
1019            },
1020            U64Segment::Array(vec![35].into()),
1021        ]);
1022        assert_eq!(sequence, expected);
1023    }
1024
1025    #[test]
1026    fn test_row_id_mask_everything() {
1027        let mut sequence = RowIdSequence(vec![
1028            U64Segment::Range(0..5),
1029            U64Segment::SortedArray(vec![7, 9].into()),
1030        ]);
1031        sequence.mask(0..sequence.len() as u32).unwrap();
1032        let expected = RowIdSequence(vec![]);
1033        assert_eq!(sequence, expected);
1034    }
1035
1036    #[test]
1037    fn test_selection() {
1038        let sequence = RowIdSequence(vec![
1039            U64Segment::Range(0..5),
1040            U64Segment::Range(10..15),
1041            U64Segment::Range(20..25),
1042        ]);
1043        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1044        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1045    }
1046
1047    #[test]
1048    #[should_panic]
1049    fn test_selection_unsorted() {
1050        let sequence = RowIdSequence(vec![
1051            U64Segment::Range(0..5),
1052            U64Segment::Range(10..15),
1053            U64Segment::Range(20..25),
1054        ]);
1055        let _ = sequence
1056            .select(vec![2, 4, 3].into_iter())
1057            .collect::<Vec<_>>();
1058    }
1059
1060    #[test]
1061    fn test_mask_to_offset_ranges() {
1062        // Tests with a simple range segment
1063        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1064        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1065        let ranges = sequence.mask_to_offset_ranges(&mask);
1066        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1067
1068        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1069        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1070        let ranges = sequence.mask_to_offset_ranges(&mask);
1071        assert_eq!(ranges, vec![14..15]);
1072
1073        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1074        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1075        let ranges = sequence.mask_to_offset_ranges(&mask);
1076        assert_eq!(ranges, vec![0..14, 15..20]);
1077
1078        // Test with a range segment with holes
1079        // 0, 1, 3, 4, 5, 7, 8, 9
1080        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1081            range: 0..10,
1082            holes: vec![2, 6].into(),
1083        }]);
1084        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1085        let ranges = sequence.mask_to_offset_ranges(&mask);
1086        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1087
1088        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1089            range: 40..60,
1090            holes: vec![47, 43].into(),
1091        }]);
1092        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1093        let ranges = sequence.mask_to_offset_ranges(&mask);
1094        assert_eq!(ranges, vec![3..4]);
1095
1096        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1097            range: 40..60,
1098            holes: vec![47, 43].into(),
1099        }]);
1100        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1101        let ranges = sequence.mask_to_offset_ranges(&mask);
1102        assert_eq!(ranges, vec![0..3, 4..18]);
1103
1104        // Test with a range segment with bitmap
1105        // 0, 1, 4, 5, 6, 7
1106        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1107            range: 0..10,
1108            bitmap: [
1109                true, true, false, false, true, true, true, true, false, false,
1110            ]
1111            .as_slice()
1112            .into(),
1113        }]);
1114        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1115        let ranges = sequence.mask_to_offset_ranges(&mask);
1116        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1117
1118        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1119            range: 40..45,
1120            bitmap: [true, true, false, false, true].as_slice().into(),
1121        }]);
1122        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1123        let ranges = sequence.mask_to_offset_ranges(&mask);
1124        assert_eq!(ranges, vec![2..3]);
1125
1126        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1127            range: 40..45,
1128            bitmap: [true, true, false, false, true].as_slice().into(),
1129        }]);
1130        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1131        let ranges = sequence.mask_to_offset_ranges(&mask);
1132        assert_eq!(ranges, vec![0..2]);
1133
1134        // Test with a sorted array segment
1135        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1136        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1137        let ranges = sequence.mask_to_offset_ranges(&mask);
1138        assert_eq!(ranges, vec![0..1, 3..5]);
1139
1140        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1141        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1142        let ranges = sequence.mask_to_offset_ranges(&mask);
1143        assert_eq!(ranges, vec![0..1, 2..4]);
1144
1145        // Test with multiple segments
1146        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1147        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1148        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1149        let sequence = RowIdSequence(vec![
1150            U64Segment::Range(0..5),
1151            U64Segment::RangeWithHoles {
1152                range: 100..105,
1153                holes: vec![103].into(),
1154            },
1155            U64Segment::SortedArray(vec![44, 46, 78].into()),
1156        ]);
1157        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1158        let ranges = sequence.mask_to_offset_ranges(&mask);
1159        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1160
1161        // Test with empty mask (should select everything)
1162        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1163        let mask = RowIdMask::default();
1164        let ranges = sequence.mask_to_offset_ranges(&mask);
1165        assert_eq!(ranges, vec![0..10]);
1166
1167        // Test with allow nothing mask
1168        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1169        let mask = RowIdMask::allow_nothing();
1170        let ranges = sequence.mask_to_offset_ranges(&mask);
1171        assert_eq!(ranges, vec![]);
1172    }
1173}