lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15
16// TODO: replace this with Arrow BooleanBuffer.
17
18// These are all internal data structures, and are private.
19mod bitmap;
20mod encoded_array;
21mod index;
22pub mod segment;
23mod serde;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::RowIdIndex;
28use lance_core::{
29    utils::mask::{RowIdMask, RowIdTreeMap},
30    Error, Result,
31};
32use lance_io::ReadBatchParams;
33pub use serde::{read_row_ids, write_row_ids};
34
35use snafu::location;
36
37use segment::U64Segment;
38use tracing::instrument;
39
40use crate::utils::LanceIteratorExtension;
41
42/// A sequence of row ids.
43///
44/// Row ids are u64s that:
45///
46/// 1. Are **unique** within a table (except for tombstones)
47/// 2. Are *often* but not always sorted and/or contiguous.
48///
49/// This sequence of row ids is optimized to be compact when the row ids are
50/// contiguous and sorted. However, it does not require that the row ids are
51/// contiguous or sorted.
52///
53/// We can make optimizations that assume uniqueness.
54#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
55pub struct RowIdSequence(Vec<U64Segment>);
56
57impl std::fmt::Display for RowIdSequence {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        let mut iter = self.iter();
60        let mut first_10 = Vec::new();
61        let mut last_10 = Vec::new();
62        for row_id in iter.by_ref() {
63            first_10.push(row_id);
64            if first_10.len() > 10 {
65                break;
66            }
67        }
68
69        while let Some(row_id) = iter.next_back() {
70            last_10.push(row_id);
71            if last_10.len() > 10 {
72                break;
73            }
74        }
75        last_10.reverse();
76
77        let theres_more = iter.next().is_some();
78
79        write!(f, "[")?;
80        for row_id in first_10 {
81            write!(f, "{}", row_id)?;
82        }
83        if theres_more {
84            write!(f, ", ...")?;
85        }
86        for row_id in last_10 {
87            write!(f, ", {}", row_id)?;
88        }
89        write!(f, "]")
90    }
91}
92
93impl From<Range<u64>> for RowIdSequence {
94    fn from(range: Range<u64>) -> Self {
95        Self(vec![U64Segment::Range(range)])
96    }
97}
98
99impl From<&[u64]> for RowIdSequence {
100    fn from(row_ids: &[u64]) -> Self {
101        Self(vec![U64Segment::from_slice(row_ids)])
102    }
103}
104
105impl RowIdSequence {
106    pub fn new() -> Self {
107        Self::default()
108    }
109
110    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
111        self.0.iter().flat_map(|segment| segment.iter())
112    }
113
114    pub fn len(&self) -> u64 {
115        self.0.iter().map(|segment| segment.len() as u64).sum()
116    }
117
118    pub fn is_empty(&self) -> bool {
119        self.0.is_empty()
120    }
121
122    /// Combines this row id sequence with another row id sequence.
123    pub fn extend(&mut self, other: Self) {
124        // If the last element of this sequence and the first element of next
125        // sequence are ranges, we might be able to combine them into a single
126        // range.
127        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
128            (self.0.last(), other.0.first())
129        {
130            if range1.end == range2.start {
131                let new_range = U64Segment::Range(range1.start..range2.end);
132                self.0.pop();
133                self.0.push(new_range);
134                self.0.extend(other.0.into_iter().skip(1));
135                return;
136            }
137        }
138        // TODO: add other optimizations, such as combining two RangeWithHoles.
139        self.0.extend(other.0);
140    }
141
142    /// Remove a set of row ids from the sequence.
143    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
144        // Order the row ids by position in which they appear in the sequence.
145        let (row_ids, offsets) = self.find_ids(row_ids);
146
147        let capacity = self.0.capacity();
148        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
149        let mut remaining_segments = old_segments.as_slice();
150
151        for (segment_idx, range) in offsets {
152            let segments_handled = old_segments.len() - remaining_segments.len();
153            let segments_to_add = segment_idx - segments_handled;
154            self.0
155                .extend_from_slice(&remaining_segments[..segments_to_add]);
156            remaining_segments = &remaining_segments[segments_to_add..];
157
158            let segment;
159            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
160
161            let segment_ids = &row_ids[range];
162            self.0.push(segment.delete(segment_ids));
163        }
164
165        // Add the remaining segments.
166        self.0.extend_from_slice(remaining_segments);
167    }
168
169    /// Delete row ids by position.
170    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
171        let mut local_positions = Vec::new();
172        let mut positions_iter = positions.into_iter();
173        let mut curr_position = positions_iter.next();
174        let mut offset = 0;
175        let mut cutoff = 0;
176
177        for segment in &mut self.0 {
178            // Make vector of local positions
179            cutoff += segment.len() as u32;
180            while let Some(position) = curr_position {
181                if position >= cutoff {
182                    break;
183                }
184                local_positions.push(position - offset);
185                curr_position = positions_iter.next();
186            }
187
188            if !local_positions.is_empty() {
189                segment.mask(&local_positions);
190                local_positions.clear();
191            }
192            offset = cutoff;
193        }
194
195        self.0.retain(|segment| !segment.is_empty());
196
197        Ok(())
198    }
199
200    /// Find the row ids in the sequence.
201    ///
202    /// Returns the row ids sorted by their appearance in the sequence.
203    /// Also returns the segment index and the range where that segment's
204    /// row id matches are found in the returned row id vector.
205    fn find_ids(
206        &self,
207        row_ids: impl IntoIterator<Item = u64>,
208    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
209        // Often, the row ids will already be provided in the order they appear.
210        // So the optimal way to search will be to cycle through rather than
211        // restarting the search from the beginning each time.
212        let mut segment_iter = self.0.iter().enumerate().cycle();
213
214        let mut segment_matches = vec![Vec::new(); self.0.len()];
215
216        row_ids.into_iter().for_each(|row_id| {
217            let mut i = 0;
218            // If we've cycled through all segments, we know the row id is not in the sequence.
219            while i < self.0.len() {
220                let (segment_idx, segment) = segment_iter.next().unwrap();
221                if segment.range().is_some_and(|range| range.contains(&row_id)) {
222                    if let Some(offset) = segment.position(row_id) {
223                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
224                    }
225                    // The row id was not found it the segment. It might be in a later segment.
226                }
227                i += 1;
228            }
229        });
230        for matches in &mut segment_matches {
231            matches.sort_unstable();
232        }
233
234        let mut offset = 0;
235        let segment_ranges = segment_matches
236            .iter()
237            .enumerate()
238            .filter(|(_, matches)| !matches.is_empty())
239            .map(|(segment_idx, matches)| {
240                let range = offset..offset + matches.len();
241                offset += matches.len();
242                (segment_idx, range)
243            })
244            .collect();
245        let row_ids = segment_matches
246            .into_iter()
247            .enumerate()
248            .flat_map(|(segment_idx, offset)| {
249                offset
250                    .into_iter()
251                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
252            })
253            .collect();
254
255        (row_ids, segment_ranges)
256    }
257
258    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
259        if len == 0 {
260            return RowIdSeqSlice {
261                segments: &[],
262                offset_start: 0,
263                offset_last: 0,
264            };
265        }
266
267        // Find the starting position
268        let mut offset_start = offset;
269        let mut segment_offset = 0;
270        for segment in &self.0 {
271            let segment_len = segment.len();
272            if offset_start < segment_len {
273                break;
274            }
275            offset_start -= segment_len;
276            segment_offset += 1;
277        }
278
279        // Find the ending position
280        let mut offset_last = offset_start + len;
281        let mut segment_offset_last = segment_offset;
282        for segment in &self.0[segment_offset..] {
283            let segment_len = segment.len();
284            if offset_last <= segment_len {
285                break;
286            }
287            offset_last -= segment_len;
288            segment_offset_last += 1;
289        }
290
291        RowIdSeqSlice {
292            segments: &self.0[segment_offset..=segment_offset_last],
293            offset_start,
294            offset_last,
295        }
296    }
297
298    /// Get the row id at the given index.
299    ///
300    /// If the index is out of bounds, this will return None.
301    pub fn get(&self, index: usize) -> Option<u64> {
302        let mut offset = 0;
303        for segment in &self.0 {
304            let segment_len = segment.len();
305            if index < offset + segment_len {
306                return segment.get(index - offset);
307            }
308            offset += segment_len;
309        }
310        None
311    }
312
313    /// Get row ids from the sequence based on the provided _sorted_ offsets
314    ///
315    /// Any out of bounds offsets will be ignored
316    ///
317    /// # Panics
318    ///
319    /// If the input selection is not sorted, this function will panic
320    pub fn select<'a>(
321        &'a self,
322        selection: impl Iterator<Item = usize> + 'a,
323    ) -> impl Iterator<Item = u64> + 'a {
324        let mut seg_iter = self.0.iter();
325        let mut cur_seg = seg_iter.next();
326        let mut rows_passed = 0;
327        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
328        let mut last_index = 0;
329        selection.filter_map(move |index| {
330            if index < last_index {
331                panic!("Selection is not sorted");
332            }
333            last_index = index;
334
335            cur_seg?;
336
337            while (index - rows_passed) >= cur_seg_len {
338                rows_passed += cur_seg_len;
339                cur_seg = seg_iter.next();
340                if let Some(cur_seg) = cur_seg {
341                    cur_seg_len = cur_seg.len();
342                } else {
343                    return None;
344                }
345            }
346
347            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
348        })
349    }
350
351    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
352    /// in the sequence.
353    ///
354    /// For example, given a mask that selects all even ids and a sequence that is
355    /// [80..85, 86..90, 14]
356    ///
357    /// this will return [0, 2, 4, 5, 7, 9]
358    /// because the range expands to
359    ///
360    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
361    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
362    ///
363    /// This function is useful when determining which row offsets to read from a fragment given
364    /// a mask.
365    #[instrument(level = "debug", skip_all)]
366    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
367        let mut offset = 0;
368        let mut ranges = Vec::new();
369        for segment in &self.0 {
370            match segment {
371                U64Segment::Range(range) => {
372                    let mut ids = RowIdTreeMap::from(range.clone());
373                    ids.mask(mask);
374                    ranges.extend(GroupingIterator::new(
375                        unsafe { ids.into_id_iter() }.map(|id| id - range.start + offset),
376                    ));
377                    offset += range.end - range.start;
378                }
379                U64Segment::RangeWithHoles { range, holes } => {
380                    let offset_start = offset;
381                    let mut ids = RowIdTreeMap::from(range.clone());
382                    offset += range.end - range.start;
383                    for hole in holes.iter() {
384                        if ids.remove(hole) {
385                            offset -= 1;
386                        }
387                    }
388                    ids.mask(mask);
389
390                    // Sadly we can't just subtract the offset because of the holes
391                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
392                    sorted_holes.sort_unstable();
393                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
394                    let mut holes_passed = 0;
395                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
396                        |id| {
397                            while let Some(next_hole) = next_holes_iter.peek() {
398                                if *next_hole < id {
399                                    next_holes_iter.next();
400                                    holes_passed += 1;
401                                } else {
402                                    break;
403                                }
404                            }
405                            id - range.start + offset_start - holes_passed
406                        },
407                    )));
408                }
409                U64Segment::RangeWithBitmap { range, bitmap } => {
410                    let mut ids = RowIdTreeMap::from(range.clone());
411                    let offset_start = offset;
412                    offset += range.end - range.start;
413                    for (i, val) in range.clone().enumerate() {
414                        if !bitmap.get(i) && ids.remove(val) {
415                            offset -= 1;
416                        }
417                    }
418                    ids.mask(mask);
419                    let mut bitmap_iter = bitmap.iter();
420                    let mut bitmap_iter_pos = 0;
421                    let mut holes_passed = 0;
422                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
423                        |id| {
424                            let offset_no_holes = id - range.start + offset_start;
425                            while bitmap_iter_pos < offset_no_holes {
426                                if !bitmap_iter.next().unwrap() {
427                                    holes_passed += 1;
428                                }
429                                bitmap_iter_pos += 1;
430                            }
431                            offset_no_holes - holes_passed
432                        },
433                    )));
434                }
435                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
436                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
437                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
438                        |(off, id)| {
439                            if mask.selected(id) {
440                                Some(off as u64 + offset)
441                            } else {
442                                None
443                            }
444                        },
445                    )));
446                    offset += array.len() as u64;
447                }
448            }
449        }
450        ranges
451    }
452}
453
454/// An iterator that groups row ids into ranges
455///
456/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
457/// this will return an iterator of [(1..4), (5..8), (10..13)]
458struct GroupingIterator<I: Iterator<Item = u64>> {
459    iter: I,
460    cur_range: Option<Range<u64>>,
461}
462
463impl<I: Iterator<Item = u64>> GroupingIterator<I> {
464    fn new(iter: I) -> Self {
465        Self {
466            iter,
467            cur_range: None,
468        }
469    }
470}
471
472impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
473    type Item = Range<u64>;
474
475    fn next(&mut self) -> Option<Self::Item> {
476        for id in self.iter.by_ref() {
477            if let Some(range) = self.cur_range.as_mut() {
478                if range.end == id {
479                    range.end = id + 1;
480                } else {
481                    let ret = Some(range.clone());
482                    self.cur_range = Some(id..id + 1);
483                    return ret;
484                }
485            } else {
486                self.cur_range = Some(id..id + 1);
487            }
488        }
489        self.cur_range.take()
490    }
491}
492
493impl From<&RowIdSequence> for RowIdTreeMap {
494    fn from(row_ids: &RowIdSequence) -> Self {
495        let mut tree_map = Self::new();
496        for segment in &row_ids.0 {
497            match segment {
498                U64Segment::Range(range) => {
499                    tree_map.insert_range(range.clone());
500                }
501                U64Segment::RangeWithBitmap { range, bitmap } => {
502                    tree_map.insert_range(range.clone());
503                    for (i, val) in range.clone().enumerate() {
504                        if !bitmap.get(i) {
505                            tree_map.remove(val);
506                        }
507                    }
508                }
509                U64Segment::RangeWithHoles { range, holes } => {
510                    tree_map.insert_range(range.clone());
511                    for hole in holes.iter() {
512                        tree_map.remove(hole);
513                    }
514                }
515                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
516                    for val in array.iter() {
517                        tree_map.insert(val);
518                    }
519                }
520            }
521        }
522        tree_map
523    }
524}
525
526#[derive(Debug)]
527pub struct RowIdSeqSlice<'a> {
528    /// Current slice of the segments we cover
529    segments: &'a [U64Segment],
530    /// Offset into the first segment to start iterating from
531    offset_start: usize,
532    /// Offset into the last segment to stop iterating at
533    offset_last: usize,
534}
535
536impl RowIdSeqSlice<'_> {
537    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
538        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
539        known_size -= self.offset_start;
540        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
541
542        let end = self.segments.len().saturating_sub(1);
543        self.segments
544            .iter()
545            .enumerate()
546            .flat_map(move |(i, segment)| {
547                match i {
548                    0 if self.segments.len() == 1 => {
549                        let len = self.offset_last - self.offset_start;
550                        // TODO: Optimize this so we don't have to use skip
551                        // (take is probably fine though.)
552                        Box::new(segment.iter().skip(self.offset_start).take(len))
553                            as Box<dyn Iterator<Item = u64>>
554                    }
555                    0 => Box::new(segment.iter().skip(self.offset_start)),
556                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
557                    _ => Box::new(segment.iter()),
558                }
559            })
560            .exact_size(known_size)
561    }
562}
563
564/// Re-chunk a sequences of row ids into chunks of a given size.
565///
566/// # Errors
567///
568/// Will return an error if the sum of the chunk sizes is not equal to the total
569/// number of row ids in the sequences.
570pub fn rechunk_sequences(
571    sequences: impl IntoIterator<Item = RowIdSequence>,
572    chunk_sizes: impl IntoIterator<Item = u64>,
573) -> Result<Vec<RowIdSequence>> {
574    // TODO: return an iterator. (with a good size hint?)
575    let chunk_size_iter = chunk_sizes.into_iter();
576    let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
577    let mut segment_iter = sequences
578        .into_iter()
579        .flat_map(|sequence| sequence.0.into_iter())
580        .peekable();
581
582    let mut segment_offset = 0_u64;
583    for chunk_size in chunk_size_iter {
584        let mut sequence = RowIdSequence(Vec::new());
585        let mut remaining = chunk_size;
586
587        let too_many_segments_error = || {
588            Error::invalid_input(
589                "Got too many segments for the provided chunk lengths",
590                location!(),
591            )
592        };
593
594        while remaining > 0 {
595            let remaining_in_segment = segment_iter
596                .peek()
597                .map_or(0, |segment| segment.len() as u64 - segment_offset);
598            match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
599                (std::cmp::Ordering::Greater, _) => {
600                    // Can only push part of the segment, we are done with this chunk.
601                    let segment = segment_iter
602                        .peek()
603                        .ok_or_else(too_many_segments_error)?
604                        .slice(segment_offset as usize, remaining as usize);
605                    sequence.extend(RowIdSequence(vec![segment]));
606                    segment_offset += remaining;
607                    remaining = 0;
608                }
609                (_, 0) => {
610                    // Can push the entire segment.
611                    let segment = segment_iter.next().ok_or_else(too_many_segments_error)?;
612                    sequence.extend(RowIdSequence(vec![segment]));
613                    remaining = 0;
614                }
615                (_, _) => {
616                    // Push remaining segment
617                    let segment = segment_iter
618                        .next()
619                        .ok_or_else(too_many_segments_error)?
620                        .slice(segment_offset as usize, remaining_in_segment as usize);
621                    sequence.extend(RowIdSequence(vec![segment]));
622                    segment_offset = 0;
623                    remaining -= remaining_in_segment;
624                }
625            }
626        }
627
628        chunked_sequences.push(sequence);
629    }
630
631    if segment_iter.peek().is_some() {
632        return Err(Error::invalid_input(
633            "Got too few segments for the provided chunk lengths",
634            location!(),
635        ));
636    }
637
638    Ok(chunked_sequences)
639}
640
641/// Selects the row ids from a sequence based on the provided offsets.
642pub fn select_row_ids<'a>(
643    sequence: &'a RowIdSequence,
644    offsets: &'a ReadBatchParams,
645) -> Result<Vec<u64>> {
646    let out_of_bounds_err = |offset: u32| {
647        Error::invalid_input(
648            format!(
649                "Index out of bounds: {} for sequence of length {}",
650                offset,
651                sequence.len()
652            ),
653            location!(),
654        )
655    };
656
657    match offsets {
658        // TODO: Optimize this if indices are sorted, which is a common case.
659        ReadBatchParams::Indices(indices) => indices
660            .values()
661            .iter()
662            .map(|index| {
663                sequence
664                    .get(*index as usize)
665                    .ok_or_else(|| out_of_bounds_err(*index))
666            })
667            .collect(),
668        ReadBatchParams::Range(range) => {
669            if range.end > sequence.len() as usize {
670                return Err(out_of_bounds_err(range.end as u32));
671            }
672            let sequence = sequence.slice(range.start, range.end - range.start);
673            Ok(sequence.iter().collect())
674        }
675        ReadBatchParams::Ranges(ranges) => {
676            let num_rows = ranges
677                .iter()
678                .map(|r| (r.end - r.start) as usize)
679                .sum::<usize>();
680            let mut result = Vec::with_capacity(num_rows);
681            for range in ranges.as_ref() {
682                if range.end > sequence.len() {
683                    return Err(out_of_bounds_err(range.end as u32));
684                }
685                let sequence =
686                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
687                result.extend(sequence.iter());
688            }
689            Ok(result)
690        }
691
692        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
693        ReadBatchParams::RangeTo(to) => {
694            if to.end > sequence.len() as usize {
695                return Err(out_of_bounds_err(to.end as u32));
696            }
697            let len = to.end;
698            let sequence = sequence.slice(0, len);
699            Ok(sequence.iter().collect())
700        }
701        ReadBatchParams::RangeFrom(from) => {
702            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
703            Ok(sequence.iter().collect())
704        }
705    }
706}
707
708#[cfg(test)]
709mod test {
710    use super::*;
711
712    use pretty_assertions::assert_eq;
713    use test::bitmap::Bitmap;
714
715    #[test]
716    fn test_row_id_sequence_from_range() {
717        let sequence = RowIdSequence::from(0..10);
718        assert_eq!(sequence.len(), 10);
719        assert_eq!(sequence.is_empty(), false);
720
721        let iter = sequence.iter();
722        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
723    }
724
725    #[test]
726    fn test_row_id_sequence_extend() {
727        let mut sequence = RowIdSequence::from(0..10);
728        sequence.extend(RowIdSequence::from(10..20));
729        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
730
731        let mut sequence = RowIdSequence::from(0..10);
732        sequence.extend(RowIdSequence::from(20..30));
733        assert_eq!(
734            sequence.0,
735            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
736        );
737    }
738
739    #[test]
740    fn test_row_id_sequence_delete() {
741        let mut sequence = RowIdSequence::from(0..10);
742        sequence.delete(vec![1, 3, 5, 7, 9]);
743        let mut expected_bitmap = Bitmap::new_empty(9);
744        for i in [0, 2, 4, 6, 8] {
745            expected_bitmap.set(i as usize);
746        }
747        assert_eq!(
748            sequence.0,
749            vec![U64Segment::RangeWithBitmap {
750                range: 0..9,
751                bitmap: expected_bitmap
752            },]
753        );
754
755        let mut sequence = RowIdSequence::from(0..10);
756        sequence.extend(RowIdSequence::from(12..20));
757        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
758        assert_eq!(
759            sequence.0,
760            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
761        );
762
763        let mut sequence = RowIdSequence::from(0..10);
764        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
765        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
766    }
767
768    #[test]
769    fn test_row_id_slice() {
770        // The type of sequence isn't that relevant to the implementation, so
771        // we can just have a single one with all the segment types.
772        let sequence = RowIdSequence(vec![
773            U64Segment::Range(30..35), // 5
774            U64Segment::RangeWithHoles {
775                // 8
776                range: 50..60,
777                holes: vec![53, 54].into(),
778            },
779            U64Segment::SortedArray(vec![7, 9].into()), // 2
780            U64Segment::RangeWithBitmap {
781                range: 0..5,
782                bitmap: [true, false, true, false, true].as_slice().into(),
783            },
784            U64Segment::Array(vec![35, 39].into()),
785            U64Segment::Range(40..50),
786        ]);
787
788        // All possible offsets and lengths
789        for offset in 0..sequence.len() as usize {
790            for len in 0..sequence.len() as usize {
791                if offset + len > sequence.len() as usize {
792                    continue;
793                }
794                let slice = sequence.slice(offset, len);
795
796                let actual = slice.iter().collect::<Vec<_>>();
797                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
798                assert_eq!(
799                    actual, expected,
800                    "Failed for offset {} and len {}",
801                    offset, len
802                );
803
804                let (claimed_size, claimed_max) = slice.iter().size_hint();
805                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
806                assert_eq!(claimed_size, actual.len()); // Correct size hint
807            }
808        }
809    }
810
811    #[test]
812    fn test_row_id_slice_empty() {
813        let sequence = RowIdSequence::from(0..10);
814        let slice = sequence.slice(10, 0);
815        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
816    }
817
818    #[test]
819    fn test_row_id_sequence_rechunk() {
820        fn assert_rechunked(
821            input: Vec<RowIdSequence>,
822            chunk_sizes: Vec<u64>,
823            expected: Vec<RowIdSequence>,
824        ) {
825            let chunked = rechunk_sequences(input, chunk_sizes).unwrap();
826            assert_eq!(chunked, expected);
827        }
828
829        // Small pieces to larger ones
830        let many_segments = vec![
831            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
832            RowIdSequence::from(10..18),
833            RowIdSequence::from(18..28),
834            RowIdSequence::from(28..30),
835        ];
836        let fewer_segments = vec![
837            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
838            RowIdSequence::from(10..30),
839        ];
840        assert_rechunked(
841            many_segments.clone(),
842            fewer_segments.iter().map(|seq| seq.len()).collect(),
843            fewer_segments.clone(),
844        );
845
846        // Large pieces to smaller ones
847        assert_rechunked(
848            fewer_segments,
849            many_segments.iter().map(|seq| seq.len()).collect(),
850            many_segments.clone(),
851        );
852
853        // Equal pieces
854        assert_rechunked(
855            many_segments.clone(),
856            many_segments.iter().map(|seq| seq.len()).collect(),
857            many_segments.clone(),
858        );
859
860        // Too few segments -> error
861        let result = rechunk_sequences(many_segments.clone(), vec![100]);
862        assert!(result.is_err());
863
864        // Too many segments -> error
865        let result = rechunk_sequences(many_segments, vec![5]);
866        assert!(result.is_err());
867    }
868
869    #[test]
870    fn test_select_row_ids() {
871        // All forms of offsets
872        let offsets = [
873            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
874            ReadBatchParams::Range(2..8),
875            ReadBatchParams::RangeFull,
876            ReadBatchParams::RangeTo(..5),
877            ReadBatchParams::RangeFrom(5..),
878            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
879        ];
880
881        // Sequences with all segment types. These have at least 10 elements,
882        // so they are valid for all the above offsets.
883        let sequences = [
884            RowIdSequence(vec![
885                U64Segment::Range(0..5),
886                U64Segment::RangeWithHoles {
887                    range: 50..60,
888                    holes: vec![53, 54].into(),
889                },
890                U64Segment::SortedArray(vec![7, 9].into()),
891            ]),
892            RowIdSequence(vec![
893                U64Segment::RangeWithBitmap {
894                    range: 0..5,
895                    bitmap: [true, false, true, false, true].as_slice().into(),
896                },
897                U64Segment::Array(vec![30, 20, 10].into()),
898                U64Segment::Range(40..50),
899            ]),
900        ];
901
902        for params in offsets {
903            for sequence in &sequences {
904                let row_ids = select_row_ids(sequence, &params).unwrap();
905                let flat_sequence = sequence.iter().collect::<Vec<_>>();
906
907                // Transform params into bounded ones
908                let selection: Vec<usize> = match &params {
909                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
910                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
911                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
912                    ReadBatchParams::Range(range) => range.clone().collect(),
913                    ReadBatchParams::Ranges(ranges) => ranges
914                        .iter()
915                        .flat_map(|r| r.start as usize..r.end as usize)
916                        .collect(),
917                    ReadBatchParams::Indices(indices) => {
918                        indices.values().iter().map(|i| *i as usize).collect()
919                    }
920                };
921
922                let expected = selection
923                    .into_iter()
924                    .map(|i| flat_sequence[i])
925                    .collect::<Vec<_>>();
926                assert_eq!(
927                    row_ids, expected,
928                    "Failed for params {:?} on the sequence {:?}",
929                    &params, sequence
930                );
931            }
932        }
933    }
934
935    #[test]
936    fn test_select_row_ids_out_of_bounds() {
937        let offsets = [
938            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
939            ReadBatchParams::Range(2..1000),
940            ReadBatchParams::RangeTo(..1000),
941        ];
942
943        let sequence = RowIdSequence::from(0..10);
944
945        for params in offsets {
946            let result = select_row_ids(&sequence, &params);
947            assert!(result.is_err());
948            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
949        }
950    }
951
952    #[test]
953    fn test_row_id_sequence_to_treemap() {
954        let sequence = RowIdSequence(vec![
955            U64Segment::Range(0..5),
956            U64Segment::RangeWithHoles {
957                range: 50..60,
958                holes: vec![53, 54].into(),
959            },
960            U64Segment::SortedArray(vec![7, 9].into()),
961            U64Segment::RangeWithBitmap {
962                range: 10..15,
963                bitmap: [true, false, true, false, true].as_slice().into(),
964            },
965            U64Segment::Array(vec![35, 39].into()),
966            U64Segment::Range(40..50),
967        ]);
968
969        let tree_map = RowIdTreeMap::from(&sequence);
970        let expected = vec![
971            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
972            51, 52, 55, 56, 57, 58, 59,
973        ]
974        .into_iter()
975        .collect::<RowIdTreeMap>();
976        assert_eq!(tree_map, expected);
977    }
978
979    #[test]
980    fn test_row_id_mask() {
981        // 0, 1, 2, 3, 4
982        // 50, 51, 52, 55, 56, 57, 58, 59
983        // 7, 9
984        // 10, 12, 14
985        // 35, 39
986        let sequence = RowIdSequence(vec![
987            U64Segment::Range(0..5),
988            U64Segment::RangeWithHoles {
989                range: 50..60,
990                holes: vec![53, 54].into(),
991            },
992            U64Segment::SortedArray(vec![7, 9].into()),
993            U64Segment::RangeWithBitmap {
994                range: 10..15,
995                bitmap: [true, false, true, false, true].as_slice().into(),
996            },
997            U64Segment::Array(vec![35, 39].into()),
998        ]);
999
1000        // Masking one in each segment
1001        let values_to_remove = [4, 55, 7, 12, 39];
1002        let positions_to_remove = sequence
1003            .iter()
1004            .enumerate()
1005            .filter_map(|(i, val)| {
1006                if values_to_remove.contains(&val) {
1007                    Some(i as u32)
1008                } else {
1009                    None
1010                }
1011            })
1012            .collect::<Vec<_>>();
1013        let mut sequence = sequence;
1014        sequence.mask(positions_to_remove).unwrap();
1015        let expected = RowIdSequence(vec![
1016            U64Segment::Range(0..4),
1017            U64Segment::RangeWithBitmap {
1018                range: 50..60,
1019                bitmap: [
1020                    true, true, true, false, false, false, true, true, true, true,
1021                ]
1022                .as_slice()
1023                .into(),
1024            },
1025            U64Segment::Range(9..10),
1026            U64Segment::RangeWithBitmap {
1027                range: 10..15,
1028                bitmap: [true, false, false, false, true].as_slice().into(),
1029            },
1030            U64Segment::Array(vec![35].into()),
1031        ]);
1032        assert_eq!(sequence, expected);
1033    }
1034
1035    #[test]
1036    fn test_row_id_mask_everything() {
1037        let mut sequence = RowIdSequence(vec![
1038            U64Segment::Range(0..5),
1039            U64Segment::SortedArray(vec![7, 9].into()),
1040        ]);
1041        sequence.mask(0..sequence.len() as u32).unwrap();
1042        let expected = RowIdSequence(vec![]);
1043        assert_eq!(sequence, expected);
1044    }
1045
1046    #[test]
1047    fn test_selection() {
1048        let sequence = RowIdSequence(vec![
1049            U64Segment::Range(0..5),
1050            U64Segment::Range(10..15),
1051            U64Segment::Range(20..25),
1052        ]);
1053        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1054        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1055    }
1056
1057    #[test]
1058    #[should_panic]
1059    fn test_selection_unsorted() {
1060        let sequence = RowIdSequence(vec![
1061            U64Segment::Range(0..5),
1062            U64Segment::Range(10..15),
1063            U64Segment::Range(20..25),
1064        ]);
1065        let _ = sequence
1066            .select(vec![2, 4, 3].into_iter())
1067            .collect::<Vec<_>>();
1068    }
1069
1070    #[test]
1071    fn test_mask_to_offset_ranges() {
1072        // Tests with a simple range segment
1073        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1074        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1075        let ranges = sequence.mask_to_offset_ranges(&mask);
1076        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1077
1078        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1079        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1080        let ranges = sequence.mask_to_offset_ranges(&mask);
1081        assert_eq!(ranges, vec![14..15]);
1082
1083        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1084        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1085        let ranges = sequence.mask_to_offset_ranges(&mask);
1086        assert_eq!(ranges, vec![0..14, 15..20]);
1087
1088        // Test with a range segment with holes
1089        // 0, 1, 3, 4, 5, 7, 8, 9
1090        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1091            range: 0..10,
1092            holes: vec![2, 6].into(),
1093        }]);
1094        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1095        let ranges = sequence.mask_to_offset_ranges(&mask);
1096        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1097
1098        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1099            range: 40..60,
1100            holes: vec![47, 43].into(),
1101        }]);
1102        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1103        let ranges = sequence.mask_to_offset_ranges(&mask);
1104        assert_eq!(ranges, vec![3..4]);
1105
1106        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1107            range: 40..60,
1108            holes: vec![47, 43].into(),
1109        }]);
1110        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1111        let ranges = sequence.mask_to_offset_ranges(&mask);
1112        assert_eq!(ranges, vec![0..3, 4..18]);
1113
1114        // Test with a range segment with bitmap
1115        // 0, 1, 4, 5, 6, 7
1116        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1117            range: 0..10,
1118            bitmap: [
1119                true, true, false, false, true, true, true, true, false, false,
1120            ]
1121            .as_slice()
1122            .into(),
1123        }]);
1124        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1125        let ranges = sequence.mask_to_offset_ranges(&mask);
1126        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1127
1128        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1129            range: 40..45,
1130            bitmap: [true, true, false, false, true].as_slice().into(),
1131        }]);
1132        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1133        let ranges = sequence.mask_to_offset_ranges(&mask);
1134        assert_eq!(ranges, vec![2..3]);
1135
1136        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1137            range: 40..45,
1138            bitmap: [true, true, false, false, true].as_slice().into(),
1139        }]);
1140        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1141        let ranges = sequence.mask_to_offset_ranges(&mask);
1142        assert_eq!(ranges, vec![0..2]);
1143
1144        // Test with a sorted array segment
1145        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1146        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1147        let ranges = sequence.mask_to_offset_ranges(&mask);
1148        assert_eq!(ranges, vec![0..1, 3..5]);
1149
1150        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1151        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1152        let ranges = sequence.mask_to_offset_ranges(&mask);
1153        assert_eq!(ranges, vec![0..1, 2..4]);
1154
1155        // Test with multiple segments
1156        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1157        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1158        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1159        let sequence = RowIdSequence(vec![
1160            U64Segment::Range(0..5),
1161            U64Segment::RangeWithHoles {
1162                range: 100..105,
1163                holes: vec![103].into(),
1164            },
1165            U64Segment::SortedArray(vec![44, 46, 78].into()),
1166        ]);
1167        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1168        let ranges = sequence.mask_to_offset_ranges(&mask);
1169        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1170
1171        // Test with empty mask (should select everything)
1172        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1173        let mask = RowIdMask::default();
1174        let ranges = sequence.mask_to_offset_ranges(&mask);
1175        assert_eq!(ranges, vec![0..10]);
1176
1177        // Test with allow nothing mask
1178        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1179        let mask = RowIdMask::allow_nothing();
1180        let ranges = sequence.mask_to_offset_ranges(&mask);
1181        assert_eq!(ranges, vec![]);
1182    }
1183}