lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23
24use deepsize::DeepSizeOf;
25// These are the public API.
26pub use index::FragmentRowIdIndex;
27pub use index::RowIdIndex;
28use lance_core::{
29    utils::mask::{RowIdMask, RowIdTreeMap},
30    Error, Result,
31};
32use lance_io::ReadBatchParams;
33pub use serde::{read_row_ids, write_row_ids};
34
35use snafu::location;
36
37use crate::utils::LanceIteratorExtension;
38use segment::U64Segment;
39use tracing::instrument;
40
41/// A sequence of row ids.
42///
43/// Row ids are u64s that:
44///
45/// 1. Are **unique** within a table (except for tombstones)
46/// 2. Are *often* but not always sorted and/or contiguous.
47///
48/// This sequence of row ids is optimized to be compact when the row ids are
49/// contiguous and sorted. However, it does not require that the row ids are
50/// contiguous or sorted.
51///
52/// We can make optimizations that assume uniqueness.
53#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
54pub struct RowIdSequence(Vec<U64Segment>);
55
56impl std::fmt::Display for RowIdSequence {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        let mut iter = self.iter();
59        let mut first_10 = Vec::new();
60        let mut last_10 = Vec::new();
61        for row_id in iter.by_ref() {
62            first_10.push(row_id);
63            if first_10.len() > 10 {
64                break;
65            }
66        }
67
68        while let Some(row_id) = iter.next_back() {
69            last_10.push(row_id);
70            if last_10.len() > 10 {
71                break;
72            }
73        }
74        last_10.reverse();
75
76        let theres_more = iter.next().is_some();
77
78        write!(f, "[")?;
79        for row_id in first_10 {
80            write!(f, "{}", row_id)?;
81        }
82        if theres_more {
83            write!(f, ", ...")?;
84        }
85        for row_id in last_10 {
86            write!(f, ", {}", row_id)?;
87        }
88        write!(f, "]")
89    }
90}
91
92impl From<Range<u64>> for RowIdSequence {
93    fn from(range: Range<u64>) -> Self {
94        Self(vec![U64Segment::Range(range)])
95    }
96}
97
98impl From<&[u64]> for RowIdSequence {
99    fn from(row_ids: &[u64]) -> Self {
100        Self(vec![U64Segment::from_slice(row_ids)])
101    }
102}
103
104impl RowIdSequence {
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
110        self.0.iter().flat_map(|segment| segment.iter())
111    }
112
113    pub fn len(&self) -> u64 {
114        self.0.iter().map(|segment| segment.len() as u64).sum()
115    }
116
117    pub fn is_empty(&self) -> bool {
118        self.0.is_empty()
119    }
120
121    /// Combines this row id sequence with another row id sequence.
122    pub fn extend(&mut self, other: Self) {
123        // If the last element of this sequence and the first element of next
124        // sequence are ranges, we might be able to combine them into a single
125        // range.
126        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
127            (self.0.last(), other.0.first())
128        {
129            if range1.end == range2.start {
130                let new_range = U64Segment::Range(range1.start..range2.end);
131                self.0.pop();
132                self.0.push(new_range);
133                self.0.extend(other.0.into_iter().skip(1));
134                return;
135            }
136        }
137        // TODO: add other optimizations, such as combining two RangeWithHoles.
138        self.0.extend(other.0);
139    }
140
141    /// Remove a set of row ids from the sequence.
142    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
143        // Order the row ids by position in which they appear in the sequence.
144        let (row_ids, offsets) = self.find_ids(row_ids);
145
146        let capacity = self.0.capacity();
147        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
148        let mut remaining_segments = old_segments.as_slice();
149
150        for (segment_idx, range) in offsets {
151            let segments_handled = old_segments.len() - remaining_segments.len();
152            let segments_to_add = segment_idx - segments_handled;
153            self.0
154                .extend_from_slice(&remaining_segments[..segments_to_add]);
155            remaining_segments = &remaining_segments[segments_to_add..];
156
157            let segment;
158            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
159
160            let segment_ids = &row_ids[range];
161            self.0.push(segment.delete(segment_ids));
162        }
163
164        // Add the remaining segments.
165        self.0.extend_from_slice(remaining_segments);
166    }
167
168    /// Delete row ids by position.
169    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
170        let mut local_positions = Vec::new();
171        let mut positions_iter = positions.into_iter();
172        let mut curr_position = positions_iter.next();
173        let mut offset = 0;
174        let mut cutoff = 0;
175
176        for segment in &mut self.0 {
177            // Make vector of local positions
178            cutoff += segment.len() as u32;
179            while let Some(position) = curr_position {
180                if position >= cutoff {
181                    break;
182                }
183                local_positions.push(position - offset);
184                curr_position = positions_iter.next();
185            }
186
187            if !local_positions.is_empty() {
188                segment.mask(&local_positions);
189                local_positions.clear();
190            }
191            offset = cutoff;
192        }
193
194        self.0.retain(|segment| !segment.is_empty());
195
196        Ok(())
197    }
198
199    /// Find the row ids in the sequence.
200    ///
201    /// Returns the row ids sorted by their appearance in the sequence.
202    /// Also returns the segment index and the range where that segment's
203    /// row id matches are found in the returned row id vector.
204    fn find_ids(
205        &self,
206        row_ids: impl IntoIterator<Item = u64>,
207    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
208        // Often, the row ids will already be provided in the order they appear.
209        // So the optimal way to search will be to cycle through rather than
210        // restarting the search from the beginning each time.
211        let mut segment_iter = self.0.iter().enumerate().cycle();
212
213        let mut segment_matches = vec![Vec::new(); self.0.len()];
214
215        row_ids.into_iter().for_each(|row_id| {
216            let mut i = 0;
217            // If we've cycled through all segments, we know the row id is not in the sequence.
218            while i < self.0.len() {
219                let (segment_idx, segment) = segment_iter.next().unwrap();
220                if segment.range().is_some_and(|range| range.contains(&row_id)) {
221                    if let Some(offset) = segment.position(row_id) {
222                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
223                    }
224                    // The row id was not found it the segment. It might be in a later segment.
225                }
226                i += 1;
227            }
228        });
229        for matches in &mut segment_matches {
230            matches.sort_unstable();
231        }
232
233        let mut offset = 0;
234        let segment_ranges = segment_matches
235            .iter()
236            .enumerate()
237            .filter(|(_, matches)| !matches.is_empty())
238            .map(|(segment_idx, matches)| {
239                let range = offset..offset + matches.len();
240                offset += matches.len();
241                (segment_idx, range)
242            })
243            .collect();
244        let row_ids = segment_matches
245            .into_iter()
246            .enumerate()
247            .flat_map(|(segment_idx, offset)| {
248                offset
249                    .into_iter()
250                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
251            })
252            .collect();
253
254        (row_ids, segment_ranges)
255    }
256
257    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
258        if len == 0 {
259            return RowIdSeqSlice {
260                segments: &[],
261                offset_start: 0,
262                offset_last: 0,
263            };
264        }
265
266        // Find the starting position
267        let mut offset_start = offset;
268        let mut segment_offset = 0;
269        for segment in &self.0 {
270            let segment_len = segment.len();
271            if offset_start < segment_len {
272                break;
273            }
274            offset_start -= segment_len;
275            segment_offset += 1;
276        }
277
278        // Find the ending position
279        let mut offset_last = offset_start + len;
280        let mut segment_offset_last = segment_offset;
281        for segment in &self.0[segment_offset..] {
282            let segment_len = segment.len();
283            if offset_last <= segment_len {
284                break;
285            }
286            offset_last -= segment_len;
287            segment_offset_last += 1;
288        }
289
290        RowIdSeqSlice {
291            segments: &self.0[segment_offset..=segment_offset_last],
292            offset_start,
293            offset_last,
294        }
295    }
296
297    /// Get the row id at the given index.
298    ///
299    /// If the index is out of bounds, this will return None.
300    pub fn get(&self, index: usize) -> Option<u64> {
301        let mut offset = 0;
302        for segment in &self.0 {
303            let segment_len = segment.len();
304            if index < offset + segment_len {
305                return segment.get(index - offset);
306            }
307            offset += segment_len;
308        }
309        None
310    }
311
312    /// Get row ids from the sequence based on the provided _sorted_ offsets
313    ///
314    /// Any out of bounds offsets will be ignored
315    ///
316    /// # Panics
317    ///
318    /// If the input selection is not sorted, this function will panic
319    pub fn select<'a>(
320        &'a self,
321        selection: impl Iterator<Item = usize> + 'a,
322    ) -> impl Iterator<Item = u64> + 'a {
323        let mut seg_iter = self.0.iter();
324        let mut cur_seg = seg_iter.next();
325        let mut rows_passed = 0;
326        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
327        let mut last_index = 0;
328        selection.filter_map(move |index| {
329            if index < last_index {
330                panic!("Selection is not sorted");
331            }
332            last_index = index;
333
334            cur_seg?;
335
336            while (index - rows_passed) >= cur_seg_len {
337                rows_passed += cur_seg_len;
338                cur_seg = seg_iter.next();
339                if let Some(cur_seg) = cur_seg {
340                    cur_seg_len = cur_seg.len();
341                } else {
342                    return None;
343                }
344            }
345
346            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
347        })
348    }
349
350    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
351    /// in the sequence.
352    ///
353    /// For example, given a mask that selects all even ids and a sequence that is
354    /// [80..85, 86..90, 14]
355    ///
356    /// this will return [0, 2, 4, 5, 7, 9]
357    /// because the range expands to
358    ///
359    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
360    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
361    ///
362    /// This function is useful when determining which row offsets to read from a fragment given
363    /// a mask.
364    #[instrument(level = "debug", skip_all)]
365    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
366        let mut offset = 0;
367        let mut ranges = Vec::new();
368        for segment in &self.0 {
369            match segment {
370                U64Segment::Range(range) => {
371                    let mut ids = RowIdTreeMap::from(range.clone());
372                    ids.mask(mask);
373                    ranges.extend(GroupingIterator::new(
374                        unsafe { ids.into_id_iter() }.map(|id| id - range.start + offset),
375                    ));
376                    offset += range.end - range.start;
377                }
378                U64Segment::RangeWithHoles { range, holes } => {
379                    let offset_start = offset;
380                    let mut ids = RowIdTreeMap::from(range.clone());
381                    offset += range.end - range.start;
382                    for hole in holes.iter() {
383                        if ids.remove(hole) {
384                            offset -= 1;
385                        }
386                    }
387                    ids.mask(mask);
388
389                    // Sadly we can't just subtract the offset because of the holes
390                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
391                    sorted_holes.sort_unstable();
392                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
393                    let mut holes_passed = 0;
394                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
395                        |id| {
396                            while let Some(next_hole) = next_holes_iter.peek() {
397                                if *next_hole < id {
398                                    next_holes_iter.next();
399                                    holes_passed += 1;
400                                } else {
401                                    break;
402                                }
403                            }
404                            id - range.start + offset_start - holes_passed
405                        },
406                    )));
407                }
408                U64Segment::RangeWithBitmap { range, bitmap } => {
409                    let mut ids = RowIdTreeMap::from(range.clone());
410                    let offset_start = offset;
411                    offset += range.end - range.start;
412                    for (i, val) in range.clone().enumerate() {
413                        if !bitmap.get(i) && ids.remove(val) {
414                            offset -= 1;
415                        }
416                    }
417                    ids.mask(mask);
418                    let mut bitmap_iter = bitmap.iter();
419                    let mut bitmap_iter_pos = 0;
420                    let mut holes_passed = 0;
421                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
422                        |id| {
423                            let offset_no_holes = id - range.start + offset_start;
424                            while bitmap_iter_pos < offset_no_holes {
425                                if !bitmap_iter.next().unwrap() {
426                                    holes_passed += 1;
427                                }
428                                bitmap_iter_pos += 1;
429                            }
430                            offset_no_holes - holes_passed
431                        },
432                    )));
433                }
434                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
435                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
436                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
437                        |(off, id)| {
438                            if mask.selected(id) {
439                                Some(off as u64 + offset)
440                            } else {
441                                None
442                            }
443                        },
444                    )));
445                    offset += array.len() as u64;
446                }
447            }
448        }
449        ranges
450    }
451}
452
453/// An iterator that groups row ids into ranges
454///
455/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
456/// this will return an iterator of [(1..4), (5..8), (10..13)]
457struct GroupingIterator<I: Iterator<Item = u64>> {
458    iter: I,
459    cur_range: Option<Range<u64>>,
460}
461
462impl<I: Iterator<Item = u64>> GroupingIterator<I> {
463    fn new(iter: I) -> Self {
464        Self {
465            iter,
466            cur_range: None,
467        }
468    }
469}
470
471impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
472    type Item = Range<u64>;
473
474    fn next(&mut self) -> Option<Self::Item> {
475        for id in self.iter.by_ref() {
476            if let Some(range) = self.cur_range.as_mut() {
477                if range.end == id {
478                    range.end = id + 1;
479                } else {
480                    let ret = Some(range.clone());
481                    self.cur_range = Some(id..id + 1);
482                    return ret;
483                }
484            } else {
485                self.cur_range = Some(id..id + 1);
486            }
487        }
488        self.cur_range.take()
489    }
490}
491
492impl From<&RowIdSequence> for RowIdTreeMap {
493    fn from(row_ids: &RowIdSequence) -> Self {
494        let mut tree_map = Self::new();
495        for segment in &row_ids.0 {
496            match segment {
497                U64Segment::Range(range) => {
498                    tree_map.insert_range(range.clone());
499                }
500                U64Segment::RangeWithBitmap { range, bitmap } => {
501                    tree_map.insert_range(range.clone());
502                    for (i, val) in range.clone().enumerate() {
503                        if !bitmap.get(i) {
504                            tree_map.remove(val);
505                        }
506                    }
507                }
508                U64Segment::RangeWithHoles { range, holes } => {
509                    tree_map.insert_range(range.clone());
510                    for hole in holes.iter() {
511                        tree_map.remove(hole);
512                    }
513                }
514                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
515                    for val in array.iter() {
516                        tree_map.insert(val);
517                    }
518                }
519            }
520        }
521        tree_map
522    }
523}
524
525#[derive(Debug)]
526pub struct RowIdSeqSlice<'a> {
527    /// Current slice of the segments we cover
528    segments: &'a [U64Segment],
529    /// Offset into the first segment to start iterating from
530    offset_start: usize,
531    /// Offset into the last segment to stop iterating at
532    offset_last: usize,
533}
534
535impl RowIdSeqSlice<'_> {
536    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
537        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
538        known_size -= self.offset_start;
539        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
540
541        let end = self.segments.len().saturating_sub(1);
542        self.segments
543            .iter()
544            .enumerate()
545            .flat_map(move |(i, segment)| {
546                match i {
547                    0 if self.segments.len() == 1 => {
548                        let len = self.offset_last - self.offset_start;
549                        // TODO: Optimize this so we don't have to use skip
550                        // (take is probably fine though.)
551                        Box::new(segment.iter().skip(self.offset_start).take(len))
552                            as Box<dyn Iterator<Item = u64>>
553                    }
554                    0 => Box::new(segment.iter().skip(self.offset_start)),
555                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
556                    _ => Box::new(segment.iter()),
557                }
558            })
559            .exact_size(known_size)
560    }
561}
562
563/// Re-chunk a sequences of row ids into chunks of a given size.
564///
565/// The sequences may less than chunk sizes, because the sequences only
566/// contains the row ids that we want to keep, they come from the updates records.
567/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
568/// So if the sequences are smaller than the chunk sizes, we need to
569/// assign the incremental row ids in the further step. This behavior is controlled by the
570/// `allow_incomplete` parameter.
571///
572/// # Errors
573///
574/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
575/// is not equal to the total number of row ids in the sequences.
576pub fn rechunk_sequences(
577    sequences: impl IntoIterator<Item = RowIdSequence>,
578    chunk_sizes: impl IntoIterator<Item = u64>,
579    allow_incomplete: bool,
580) -> Result<Vec<RowIdSequence>> {
581    // TODO: return an iterator. (with a good size hint?)
582    let chunk_size_iter = chunk_sizes.into_iter();
583    let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
584    let mut segment_iter = sequences
585        .into_iter()
586        .flat_map(|sequence| sequence.0.into_iter())
587        .peekable();
588
589    let mut segment_offset = 0_u64;
590    for chunk_size in chunk_size_iter {
591        let mut sequence = RowIdSequence(Vec::new());
592        let mut remaining = chunk_size;
593
594        let too_many_segments_error = || {
595            Error::invalid_input(
596                "Got too many segments for the provided chunk lengths",
597                location!(),
598            )
599        };
600
601        while remaining > 0 {
602            let remaining_in_segment = segment_iter
603                .peek()
604                .map_or(0, |segment| segment.len() as u64 - segment_offset);
605            match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
606                (std::cmp::Ordering::Greater, _) => {
607                    // Can only push part of the segment, we are done with this chunk.
608                    let segment = segment_iter
609                        .peek()
610                        .ok_or_else(too_many_segments_error)?
611                        .slice(segment_offset as usize, remaining as usize);
612                    sequence.extend(RowIdSequence(vec![segment]));
613                    segment_offset += remaining;
614                    remaining = 0;
615                }
616                (_, 0) => {
617                    // Can push the entire segment.
618                    if let Some(segment) = segment_iter.next() {
619                        sequence.extend(RowIdSequence(vec![segment]));
620                        remaining = 0;
621                    } else if allow_incomplete {
622                        remaining = 0;
623                    } else {
624                        return Err(too_many_segments_error());
625                    }
626                }
627                (_, _) => {
628                    // Push remaining segment
629                    let segment = segment_iter
630                        .next()
631                        .ok_or_else(too_many_segments_error)?
632                        .slice(segment_offset as usize, remaining_in_segment as usize);
633                    sequence.extend(RowIdSequence(vec![segment]));
634                    segment_offset = 0;
635                    remaining -= remaining_in_segment;
636                }
637            }
638        }
639
640        chunked_sequences.push(sequence);
641    }
642
643    if segment_iter.peek().is_some() {
644        return Err(Error::invalid_input(
645            "Got too few segments for the provided chunk lengths",
646            location!(),
647        ));
648    }
649
650    Ok(chunked_sequences)
651}
652
653/// Selects the row ids from a sequence based on the provided offsets.
654pub fn select_row_ids<'a>(
655    sequence: &'a RowIdSequence,
656    offsets: &'a ReadBatchParams,
657) -> Result<Vec<u64>> {
658    let out_of_bounds_err = |offset: u32| {
659        Error::invalid_input(
660            format!(
661                "Index out of bounds: {} for sequence of length {}",
662                offset,
663                sequence.len()
664            ),
665            location!(),
666        )
667    };
668
669    match offsets {
670        // TODO: Optimize this if indices are sorted, which is a common case.
671        ReadBatchParams::Indices(indices) => indices
672            .values()
673            .iter()
674            .map(|index| {
675                sequence
676                    .get(*index as usize)
677                    .ok_or_else(|| out_of_bounds_err(*index))
678            })
679            .collect(),
680        ReadBatchParams::Range(range) => {
681            if range.end > sequence.len() as usize {
682                return Err(out_of_bounds_err(range.end as u32));
683            }
684            let sequence = sequence.slice(range.start, range.end - range.start);
685            Ok(sequence.iter().collect())
686        }
687        ReadBatchParams::Ranges(ranges) => {
688            let num_rows = ranges
689                .iter()
690                .map(|r| (r.end - r.start) as usize)
691                .sum::<usize>();
692            let mut result = Vec::with_capacity(num_rows);
693            for range in ranges.as_ref() {
694                if range.end > sequence.len() {
695                    return Err(out_of_bounds_err(range.end as u32));
696                }
697                let sequence =
698                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
699                result.extend(sequence.iter());
700            }
701            Ok(result)
702        }
703
704        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
705        ReadBatchParams::RangeTo(to) => {
706            if to.end > sequence.len() as usize {
707                return Err(out_of_bounds_err(to.end as u32));
708            }
709            let len = to.end;
710            let sequence = sequence.slice(0, len);
711            Ok(sequence.iter().collect())
712        }
713        ReadBatchParams::RangeFrom(from) => {
714            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
715            Ok(sequence.iter().collect())
716        }
717    }
718}
719
720#[cfg(test)]
721mod test {
722    use super::*;
723
724    use pretty_assertions::assert_eq;
725    use test::bitmap::Bitmap;
726
727    #[test]
728    fn test_row_id_sequence_from_range() {
729        let sequence = RowIdSequence::from(0..10);
730        assert_eq!(sequence.len(), 10);
731        assert_eq!(sequence.is_empty(), false);
732
733        let iter = sequence.iter();
734        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
735    }
736
737    #[test]
738    fn test_row_id_sequence_extend() {
739        let mut sequence = RowIdSequence::from(0..10);
740        sequence.extend(RowIdSequence::from(10..20));
741        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
742
743        let mut sequence = RowIdSequence::from(0..10);
744        sequence.extend(RowIdSequence::from(20..30));
745        assert_eq!(
746            sequence.0,
747            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
748        );
749    }
750
751    #[test]
752    fn test_row_id_sequence_delete() {
753        let mut sequence = RowIdSequence::from(0..10);
754        sequence.delete(vec![1, 3, 5, 7, 9]);
755        let mut expected_bitmap = Bitmap::new_empty(9);
756        for i in [0, 2, 4, 6, 8] {
757            expected_bitmap.set(i as usize);
758        }
759        assert_eq!(
760            sequence.0,
761            vec![U64Segment::RangeWithBitmap {
762                range: 0..9,
763                bitmap: expected_bitmap
764            },]
765        );
766
767        let mut sequence = RowIdSequence::from(0..10);
768        sequence.extend(RowIdSequence::from(12..20));
769        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
770        assert_eq!(
771            sequence.0,
772            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
773        );
774
775        let mut sequence = RowIdSequence::from(0..10);
776        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
777        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
778    }
779
780    #[test]
781    fn test_row_id_slice() {
782        // The type of sequence isn't that relevant to the implementation, so
783        // we can just have a single one with all the segment types.
784        let sequence = RowIdSequence(vec![
785            U64Segment::Range(30..35), // 5
786            U64Segment::RangeWithHoles {
787                // 8
788                range: 50..60,
789                holes: vec![53, 54].into(),
790            },
791            U64Segment::SortedArray(vec![7, 9].into()), // 2
792            U64Segment::RangeWithBitmap {
793                range: 0..5,
794                bitmap: [true, false, true, false, true].as_slice().into(),
795            },
796            U64Segment::Array(vec![35, 39].into()),
797            U64Segment::Range(40..50),
798        ]);
799
800        // All possible offsets and lengths
801        for offset in 0..sequence.len() as usize {
802            for len in 0..sequence.len() as usize {
803                if offset + len > sequence.len() as usize {
804                    continue;
805                }
806                let slice = sequence.slice(offset, len);
807
808                let actual = slice.iter().collect::<Vec<_>>();
809                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
810                assert_eq!(
811                    actual, expected,
812                    "Failed for offset {} and len {}",
813                    offset, len
814                );
815
816                let (claimed_size, claimed_max) = slice.iter().size_hint();
817                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
818                assert_eq!(claimed_size, actual.len()); // Correct size hint
819            }
820        }
821    }
822
823    #[test]
824    fn test_row_id_slice_empty() {
825        let sequence = RowIdSequence::from(0..10);
826        let slice = sequence.slice(10, 0);
827        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
828    }
829
830    #[test]
831    fn test_row_id_sequence_rechunk() {
832        fn assert_rechunked(
833            input: Vec<RowIdSequence>,
834            chunk_sizes: Vec<u64>,
835            expected: Vec<RowIdSequence>,
836        ) {
837            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
838            assert_eq!(chunked, expected);
839        }
840
841        // Small pieces to larger ones
842        let many_segments = vec![
843            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
844            RowIdSequence::from(10..18),
845            RowIdSequence::from(18..28),
846            RowIdSequence::from(28..30),
847        ];
848        let fewer_segments = vec![
849            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
850            RowIdSequence::from(10..30),
851        ];
852        assert_rechunked(
853            many_segments.clone(),
854            fewer_segments.iter().map(|seq| seq.len()).collect(),
855            fewer_segments.clone(),
856        );
857
858        // Large pieces to smaller ones
859        assert_rechunked(
860            fewer_segments,
861            many_segments.iter().map(|seq| seq.len()).collect(),
862            many_segments.clone(),
863        );
864
865        // Equal pieces
866        assert_rechunked(
867            many_segments.clone(),
868            many_segments.iter().map(|seq| seq.len()).collect(),
869            many_segments.clone(),
870        );
871
872        // Too few segments -> error
873        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
874        assert!(result.is_err());
875
876        // Too many segments -> error
877        let result = rechunk_sequences(many_segments, vec![5], false);
878        assert!(result.is_err());
879    }
880
881    #[test]
882    fn test_select_row_ids() {
883        // All forms of offsets
884        let offsets = [
885            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
886            ReadBatchParams::Range(2..8),
887            ReadBatchParams::RangeFull,
888            ReadBatchParams::RangeTo(..5),
889            ReadBatchParams::RangeFrom(5..),
890            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
891        ];
892
893        // Sequences with all segment types. These have at least 10 elements,
894        // so they are valid for all the above offsets.
895        let sequences = [
896            RowIdSequence(vec![
897                U64Segment::Range(0..5),
898                U64Segment::RangeWithHoles {
899                    range: 50..60,
900                    holes: vec![53, 54].into(),
901                },
902                U64Segment::SortedArray(vec![7, 9].into()),
903            ]),
904            RowIdSequence(vec![
905                U64Segment::RangeWithBitmap {
906                    range: 0..5,
907                    bitmap: [true, false, true, false, true].as_slice().into(),
908                },
909                U64Segment::Array(vec![30, 20, 10].into()),
910                U64Segment::Range(40..50),
911            ]),
912        ];
913
914        for params in offsets {
915            for sequence in &sequences {
916                let row_ids = select_row_ids(sequence, &params).unwrap();
917                let flat_sequence = sequence.iter().collect::<Vec<_>>();
918
919                // Transform params into bounded ones
920                let selection: Vec<usize> = match &params {
921                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
922                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
923                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
924                    ReadBatchParams::Range(range) => range.clone().collect(),
925                    ReadBatchParams::Ranges(ranges) => ranges
926                        .iter()
927                        .flat_map(|r| r.start as usize..r.end as usize)
928                        .collect(),
929                    ReadBatchParams::Indices(indices) => {
930                        indices.values().iter().map(|i| *i as usize).collect()
931                    }
932                };
933
934                let expected = selection
935                    .into_iter()
936                    .map(|i| flat_sequence[i])
937                    .collect::<Vec<_>>();
938                assert_eq!(
939                    row_ids, expected,
940                    "Failed for params {:?} on the sequence {:?}",
941                    &params, sequence
942                );
943            }
944        }
945    }
946
947    #[test]
948    fn test_select_row_ids_out_of_bounds() {
949        let offsets = [
950            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
951            ReadBatchParams::Range(2..1000),
952            ReadBatchParams::RangeTo(..1000),
953        ];
954
955        let sequence = RowIdSequence::from(0..10);
956
957        for params in offsets {
958            let result = select_row_ids(&sequence, &params);
959            assert!(result.is_err());
960            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
961        }
962    }
963
964    #[test]
965    fn test_row_id_sequence_to_treemap() {
966        let sequence = RowIdSequence(vec![
967            U64Segment::Range(0..5),
968            U64Segment::RangeWithHoles {
969                range: 50..60,
970                holes: vec![53, 54].into(),
971            },
972            U64Segment::SortedArray(vec![7, 9].into()),
973            U64Segment::RangeWithBitmap {
974                range: 10..15,
975                bitmap: [true, false, true, false, true].as_slice().into(),
976            },
977            U64Segment::Array(vec![35, 39].into()),
978            U64Segment::Range(40..50),
979        ]);
980
981        let tree_map = RowIdTreeMap::from(&sequence);
982        let expected = vec![
983            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
984            51, 52, 55, 56, 57, 58, 59,
985        ]
986        .into_iter()
987        .collect::<RowIdTreeMap>();
988        assert_eq!(tree_map, expected);
989    }
990
991    #[test]
992    fn test_row_id_mask() {
993        // 0, 1, 2, 3, 4
994        // 50, 51, 52, 55, 56, 57, 58, 59
995        // 7, 9
996        // 10, 12, 14
997        // 35, 39
998        let sequence = RowIdSequence(vec![
999            U64Segment::Range(0..5),
1000            U64Segment::RangeWithHoles {
1001                range: 50..60,
1002                holes: vec![53, 54].into(),
1003            },
1004            U64Segment::SortedArray(vec![7, 9].into()),
1005            U64Segment::RangeWithBitmap {
1006                range: 10..15,
1007                bitmap: [true, false, true, false, true].as_slice().into(),
1008            },
1009            U64Segment::Array(vec![35, 39].into()),
1010        ]);
1011
1012        // Masking one in each segment
1013        let values_to_remove = [4, 55, 7, 12, 39];
1014        let positions_to_remove = sequence
1015            .iter()
1016            .enumerate()
1017            .filter_map(|(i, val)| {
1018                if values_to_remove.contains(&val) {
1019                    Some(i as u32)
1020                } else {
1021                    None
1022                }
1023            })
1024            .collect::<Vec<_>>();
1025        let mut sequence = sequence;
1026        sequence.mask(positions_to_remove).unwrap();
1027        let expected = RowIdSequence(vec![
1028            U64Segment::Range(0..4),
1029            U64Segment::RangeWithBitmap {
1030                range: 50..60,
1031                bitmap: [
1032                    true, true, true, false, false, false, true, true, true, true,
1033                ]
1034                .as_slice()
1035                .into(),
1036            },
1037            U64Segment::Range(9..10),
1038            U64Segment::RangeWithBitmap {
1039                range: 10..15,
1040                bitmap: [true, false, false, false, true].as_slice().into(),
1041            },
1042            U64Segment::Array(vec![35].into()),
1043        ]);
1044        assert_eq!(sequence, expected);
1045    }
1046
1047    #[test]
1048    fn test_row_id_mask_everything() {
1049        let mut sequence = RowIdSequence(vec![
1050            U64Segment::Range(0..5),
1051            U64Segment::SortedArray(vec![7, 9].into()),
1052        ]);
1053        sequence.mask(0..sequence.len() as u32).unwrap();
1054        let expected = RowIdSequence(vec![]);
1055        assert_eq!(sequence, expected);
1056    }
1057
1058    #[test]
1059    fn test_selection() {
1060        let sequence = RowIdSequence(vec![
1061            U64Segment::Range(0..5),
1062            U64Segment::Range(10..15),
1063            U64Segment::Range(20..25),
1064        ]);
1065        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1066        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1067    }
1068
1069    #[test]
1070    #[should_panic]
1071    fn test_selection_unsorted() {
1072        let sequence = RowIdSequence(vec![
1073            U64Segment::Range(0..5),
1074            U64Segment::Range(10..15),
1075            U64Segment::Range(20..25),
1076        ]);
1077        let _ = sequence
1078            .select(vec![2, 4, 3].into_iter())
1079            .collect::<Vec<_>>();
1080    }
1081
1082    #[test]
1083    fn test_mask_to_offset_ranges() {
1084        // Tests with a simple range segment
1085        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1086        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1087        let ranges = sequence.mask_to_offset_ranges(&mask);
1088        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1089
1090        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1091        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1092        let ranges = sequence.mask_to_offset_ranges(&mask);
1093        assert_eq!(ranges, vec![14..15]);
1094
1095        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1096        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1097        let ranges = sequence.mask_to_offset_ranges(&mask);
1098        assert_eq!(ranges, vec![0..14, 15..20]);
1099
1100        // Test with a range segment with holes
1101        // 0, 1, 3, 4, 5, 7, 8, 9
1102        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1103            range: 0..10,
1104            holes: vec![2, 6].into(),
1105        }]);
1106        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1107        let ranges = sequence.mask_to_offset_ranges(&mask);
1108        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1109
1110        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1111            range: 40..60,
1112            holes: vec![47, 43].into(),
1113        }]);
1114        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1115        let ranges = sequence.mask_to_offset_ranges(&mask);
1116        assert_eq!(ranges, vec![3..4]);
1117
1118        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1119            range: 40..60,
1120            holes: vec![47, 43].into(),
1121        }]);
1122        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1123        let ranges = sequence.mask_to_offset_ranges(&mask);
1124        assert_eq!(ranges, vec![0..3, 4..18]);
1125
1126        // Test with a range segment with bitmap
1127        // 0, 1, 4, 5, 6, 7
1128        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1129            range: 0..10,
1130            bitmap: [
1131                true, true, false, false, true, true, true, true, false, false,
1132            ]
1133            .as_slice()
1134            .into(),
1135        }]);
1136        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1137        let ranges = sequence.mask_to_offset_ranges(&mask);
1138        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1139
1140        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1141            range: 40..45,
1142            bitmap: [true, true, false, false, true].as_slice().into(),
1143        }]);
1144        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1145        let ranges = sequence.mask_to_offset_ranges(&mask);
1146        assert_eq!(ranges, vec![2..3]);
1147
1148        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1149            range: 40..45,
1150            bitmap: [true, true, false, false, true].as_slice().into(),
1151        }]);
1152        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1153        let ranges = sequence.mask_to_offset_ranges(&mask);
1154        assert_eq!(ranges, vec![0..2]);
1155
1156        // Test with a sorted array segment
1157        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1158        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1159        let ranges = sequence.mask_to_offset_ranges(&mask);
1160        assert_eq!(ranges, vec![0..1, 3..5]);
1161
1162        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1163        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1164        let ranges = sequence.mask_to_offset_ranges(&mask);
1165        assert_eq!(ranges, vec![0..1, 2..4]);
1166
1167        // Test with multiple segments
1168        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1169        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1170        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1171        let sequence = RowIdSequence(vec![
1172            U64Segment::Range(0..5),
1173            U64Segment::RangeWithHoles {
1174                range: 100..105,
1175                holes: vec![103].into(),
1176            },
1177            U64Segment::SortedArray(vec![44, 46, 78].into()),
1178        ]);
1179        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1180        let ranges = sequence.mask_to_offset_ranges(&mask);
1181        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1182
1183        // Test with empty mask (should select everything)
1184        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1185        let mask = RowIdMask::default();
1186        let ranges = sequence.mask_to_offset_ranges(&mask);
1187        assert_eq!(ranges, vec![0..10]);
1188
1189        // Test with allow nothing mask
1190        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1191        let mask = RowIdMask::allow_nothing();
1192        let ranges = sequence.mask_to_offset_ranges(&mask);
1193        assert_eq!(ranges, vec![]);
1194    }
1195}