lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23
24use deepsize::DeepSizeOf;
25// These are the public API.
26pub use index::RowIdIndex;
27use lance_core::{
28    utils::mask::{RowIdMask, RowIdTreeMap},
29    Error, Result,
30};
31use lance_io::ReadBatchParams;
32pub use serde::{read_row_ids, write_row_ids};
33
34use snafu::location;
35
36use crate::utils::LanceIteratorExtension;
37use segment::U64Segment;
38use tracing::instrument;
39
40/// A sequence of row ids.
41///
42/// Row ids are u64s that:
43///
44/// 1. Are **unique** within a table (except for tombstones)
45/// 2. Are *often* but not always sorted and/or contiguous.
46///
47/// This sequence of row ids is optimized to be compact when the row ids are
48/// contiguous and sorted. However, it does not require that the row ids are
49/// contiguous or sorted.
50///
51/// We can make optimizations that assume uniqueness.
52#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
53pub struct RowIdSequence(Vec<U64Segment>);
54
55impl std::fmt::Display for RowIdSequence {
56    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
57        let mut iter = self.iter();
58        let mut first_10 = Vec::new();
59        let mut last_10 = Vec::new();
60        for row_id in iter.by_ref() {
61            first_10.push(row_id);
62            if first_10.len() > 10 {
63                break;
64            }
65        }
66
67        while let Some(row_id) = iter.next_back() {
68            last_10.push(row_id);
69            if last_10.len() > 10 {
70                break;
71            }
72        }
73        last_10.reverse();
74
75        let theres_more = iter.next().is_some();
76
77        write!(f, "[")?;
78        for row_id in first_10 {
79            write!(f, "{}", row_id)?;
80        }
81        if theres_more {
82            write!(f, ", ...")?;
83        }
84        for row_id in last_10 {
85            write!(f, ", {}", row_id)?;
86        }
87        write!(f, "]")
88    }
89}
90
91impl From<Range<u64>> for RowIdSequence {
92    fn from(range: Range<u64>) -> Self {
93        Self(vec![U64Segment::Range(range)])
94    }
95}
96
97impl From<&[u64]> for RowIdSequence {
98    fn from(row_ids: &[u64]) -> Self {
99        Self(vec![U64Segment::from_slice(row_ids)])
100    }
101}
102
103impl RowIdSequence {
104    pub fn new() -> Self {
105        Self::default()
106    }
107
108    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
109        self.0.iter().flat_map(|segment| segment.iter())
110    }
111
112    pub fn len(&self) -> u64 {
113        self.0.iter().map(|segment| segment.len() as u64).sum()
114    }
115
116    pub fn is_empty(&self) -> bool {
117        self.0.is_empty()
118    }
119
120    /// Combines this row id sequence with another row id sequence.
121    pub fn extend(&mut self, other: Self) {
122        // If the last element of this sequence and the first element of next
123        // sequence are ranges, we might be able to combine them into a single
124        // range.
125        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
126            (self.0.last(), other.0.first())
127        {
128            if range1.end == range2.start {
129                let new_range = U64Segment::Range(range1.start..range2.end);
130                self.0.pop();
131                self.0.push(new_range);
132                self.0.extend(other.0.into_iter().skip(1));
133                return;
134            }
135        }
136        // TODO: add other optimizations, such as combining two RangeWithHoles.
137        self.0.extend(other.0);
138    }
139
140    /// Remove a set of row ids from the sequence.
141    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
142        // Order the row ids by position in which they appear in the sequence.
143        let (row_ids, offsets) = self.find_ids(row_ids);
144
145        let capacity = self.0.capacity();
146        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
147        let mut remaining_segments = old_segments.as_slice();
148
149        for (segment_idx, range) in offsets {
150            let segments_handled = old_segments.len() - remaining_segments.len();
151            let segments_to_add = segment_idx - segments_handled;
152            self.0
153                .extend_from_slice(&remaining_segments[..segments_to_add]);
154            remaining_segments = &remaining_segments[segments_to_add..];
155
156            let segment;
157            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
158
159            let segment_ids = &row_ids[range];
160            self.0.push(segment.delete(segment_ids));
161        }
162
163        // Add the remaining segments.
164        self.0.extend_from_slice(remaining_segments);
165    }
166
167    /// Delete row ids by position.
168    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
169        let mut local_positions = Vec::new();
170        let mut positions_iter = positions.into_iter();
171        let mut curr_position = positions_iter.next();
172        let mut offset = 0;
173        let mut cutoff = 0;
174
175        for segment in &mut self.0 {
176            // Make vector of local positions
177            cutoff += segment.len() as u32;
178            while let Some(position) = curr_position {
179                if position >= cutoff {
180                    break;
181                }
182                local_positions.push(position - offset);
183                curr_position = positions_iter.next();
184            }
185
186            if !local_positions.is_empty() {
187                segment.mask(&local_positions);
188                local_positions.clear();
189            }
190            offset = cutoff;
191        }
192
193        self.0.retain(|segment| !segment.is_empty());
194
195        Ok(())
196    }
197
198    /// Find the row ids in the sequence.
199    ///
200    /// Returns the row ids sorted by their appearance in the sequence.
201    /// Also returns the segment index and the range where that segment's
202    /// row id matches are found in the returned row id vector.
203    fn find_ids(
204        &self,
205        row_ids: impl IntoIterator<Item = u64>,
206    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
207        // Often, the row ids will already be provided in the order they appear.
208        // So the optimal way to search will be to cycle through rather than
209        // restarting the search from the beginning each time.
210        let mut segment_iter = self.0.iter().enumerate().cycle();
211
212        let mut segment_matches = vec![Vec::new(); self.0.len()];
213
214        row_ids.into_iter().for_each(|row_id| {
215            let mut i = 0;
216            // If we've cycled through all segments, we know the row id is not in the sequence.
217            while i < self.0.len() {
218                let (segment_idx, segment) = segment_iter.next().unwrap();
219                if segment.range().is_some_and(|range| range.contains(&row_id)) {
220                    if let Some(offset) = segment.position(row_id) {
221                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
222                    }
223                    // The row id was not found it the segment. It might be in a later segment.
224                }
225                i += 1;
226            }
227        });
228        for matches in &mut segment_matches {
229            matches.sort_unstable();
230        }
231
232        let mut offset = 0;
233        let segment_ranges = segment_matches
234            .iter()
235            .enumerate()
236            .filter(|(_, matches)| !matches.is_empty())
237            .map(|(segment_idx, matches)| {
238                let range = offset..offset + matches.len();
239                offset += matches.len();
240                (segment_idx, range)
241            })
242            .collect();
243        let row_ids = segment_matches
244            .into_iter()
245            .enumerate()
246            .flat_map(|(segment_idx, offset)| {
247                offset
248                    .into_iter()
249                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
250            })
251            .collect();
252
253        (row_ids, segment_ranges)
254    }
255
256    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
257        if len == 0 {
258            return RowIdSeqSlice {
259                segments: &[],
260                offset_start: 0,
261                offset_last: 0,
262            };
263        }
264
265        // Find the starting position
266        let mut offset_start = offset;
267        let mut segment_offset = 0;
268        for segment in &self.0 {
269            let segment_len = segment.len();
270            if offset_start < segment_len {
271                break;
272            }
273            offset_start -= segment_len;
274            segment_offset += 1;
275        }
276
277        // Find the ending position
278        let mut offset_last = offset_start + len;
279        let mut segment_offset_last = segment_offset;
280        for segment in &self.0[segment_offset..] {
281            let segment_len = segment.len();
282            if offset_last <= segment_len {
283                break;
284            }
285            offset_last -= segment_len;
286            segment_offset_last += 1;
287        }
288
289        RowIdSeqSlice {
290            segments: &self.0[segment_offset..=segment_offset_last],
291            offset_start,
292            offset_last,
293        }
294    }
295
296    /// Get the row id at the given index.
297    ///
298    /// If the index is out of bounds, this will return None.
299    pub fn get(&self, index: usize) -> Option<u64> {
300        let mut offset = 0;
301        for segment in &self.0 {
302            let segment_len = segment.len();
303            if index < offset + segment_len {
304                return segment.get(index - offset);
305            }
306            offset += segment_len;
307        }
308        None
309    }
310
311    /// Get row ids from the sequence based on the provided _sorted_ offsets
312    ///
313    /// Any out of bounds offsets will be ignored
314    ///
315    /// # Panics
316    ///
317    /// If the input selection is not sorted, this function will panic
318    pub fn select<'a>(
319        &'a self,
320        selection: impl Iterator<Item = usize> + 'a,
321    ) -> impl Iterator<Item = u64> + 'a {
322        let mut seg_iter = self.0.iter();
323        let mut cur_seg = seg_iter.next();
324        let mut rows_passed = 0;
325        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
326        let mut last_index = 0;
327        selection.filter_map(move |index| {
328            if index < last_index {
329                panic!("Selection is not sorted");
330            }
331            last_index = index;
332
333            cur_seg?;
334
335            while (index - rows_passed) >= cur_seg_len {
336                rows_passed += cur_seg_len;
337                cur_seg = seg_iter.next();
338                if let Some(cur_seg) = cur_seg {
339                    cur_seg_len = cur_seg.len();
340                } else {
341                    return None;
342                }
343            }
344
345            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
346        })
347    }
348
349    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
350    /// in the sequence.
351    ///
352    /// For example, given a mask that selects all even ids and a sequence that is
353    /// [80..85, 86..90, 14]
354    ///
355    /// this will return [0, 2, 4, 5, 7, 9]
356    /// because the range expands to
357    ///
358    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
359    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
360    ///
361    /// This function is useful when determining which row offsets to read from a fragment given
362    /// a mask.
363    #[instrument(level = "debug", skip_all)]
364    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
365        let mut offset = 0;
366        let mut ranges = Vec::new();
367        for segment in &self.0 {
368            match segment {
369                U64Segment::Range(range) => {
370                    let mut ids = RowIdTreeMap::from(range.clone());
371                    ids.mask(mask);
372                    ranges.extend(GroupingIterator::new(
373                        unsafe { ids.into_id_iter() }.map(|id| id - range.start + offset),
374                    ));
375                    offset += range.end - range.start;
376                }
377                U64Segment::RangeWithHoles { range, holes } => {
378                    let offset_start = offset;
379                    let mut ids = RowIdTreeMap::from(range.clone());
380                    offset += range.end - range.start;
381                    for hole in holes.iter() {
382                        if ids.remove(hole) {
383                            offset -= 1;
384                        }
385                    }
386                    ids.mask(mask);
387
388                    // Sadly we can't just subtract the offset because of the holes
389                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
390                    sorted_holes.sort_unstable();
391                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
392                    let mut holes_passed = 0;
393                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
394                        |id| {
395                            while let Some(next_hole) = next_holes_iter.peek() {
396                                if *next_hole < id {
397                                    next_holes_iter.next();
398                                    holes_passed += 1;
399                                } else {
400                                    break;
401                                }
402                            }
403                            id - range.start + offset_start - holes_passed
404                        },
405                    )));
406                }
407                U64Segment::RangeWithBitmap { range, bitmap } => {
408                    let mut ids = RowIdTreeMap::from(range.clone());
409                    let offset_start = offset;
410                    offset += range.end - range.start;
411                    for (i, val) in range.clone().enumerate() {
412                        if !bitmap.get(i) && ids.remove(val) {
413                            offset -= 1;
414                        }
415                    }
416                    ids.mask(mask);
417                    let mut bitmap_iter = bitmap.iter();
418                    let mut bitmap_iter_pos = 0;
419                    let mut holes_passed = 0;
420                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
421                        |id| {
422                            let offset_no_holes = id - range.start + offset_start;
423                            while bitmap_iter_pos < offset_no_holes {
424                                if !bitmap_iter.next().unwrap() {
425                                    holes_passed += 1;
426                                }
427                                bitmap_iter_pos += 1;
428                            }
429                            offset_no_holes - holes_passed
430                        },
431                    )));
432                }
433                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
434                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
435                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
436                        |(off, id)| {
437                            if mask.selected(id) {
438                                Some(off as u64 + offset)
439                            } else {
440                                None
441                            }
442                        },
443                    )));
444                    offset += array.len() as u64;
445                }
446            }
447        }
448        ranges
449    }
450}
451
452/// An iterator that groups row ids into ranges
453///
454/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
455/// this will return an iterator of [(1..4), (5..8), (10..13)]
456struct GroupingIterator<I: Iterator<Item = u64>> {
457    iter: I,
458    cur_range: Option<Range<u64>>,
459}
460
461impl<I: Iterator<Item = u64>> GroupingIterator<I> {
462    fn new(iter: I) -> Self {
463        Self {
464            iter,
465            cur_range: None,
466        }
467    }
468}
469
470impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
471    type Item = Range<u64>;
472
473    fn next(&mut self) -> Option<Self::Item> {
474        for id in self.iter.by_ref() {
475            if let Some(range) = self.cur_range.as_mut() {
476                if range.end == id {
477                    range.end = id + 1;
478                } else {
479                    let ret = Some(range.clone());
480                    self.cur_range = Some(id..id + 1);
481                    return ret;
482                }
483            } else {
484                self.cur_range = Some(id..id + 1);
485            }
486        }
487        self.cur_range.take()
488    }
489}
490
491impl From<&RowIdSequence> for RowIdTreeMap {
492    fn from(row_ids: &RowIdSequence) -> Self {
493        let mut tree_map = Self::new();
494        for segment in &row_ids.0 {
495            match segment {
496                U64Segment::Range(range) => {
497                    tree_map.insert_range(range.clone());
498                }
499                U64Segment::RangeWithBitmap { range, bitmap } => {
500                    tree_map.insert_range(range.clone());
501                    for (i, val) in range.clone().enumerate() {
502                        if !bitmap.get(i) {
503                            tree_map.remove(val);
504                        }
505                    }
506                }
507                U64Segment::RangeWithHoles { range, holes } => {
508                    tree_map.insert_range(range.clone());
509                    for hole in holes.iter() {
510                        tree_map.remove(hole);
511                    }
512                }
513                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
514                    for val in array.iter() {
515                        tree_map.insert(val);
516                    }
517                }
518            }
519        }
520        tree_map
521    }
522}
523
524#[derive(Debug)]
525pub struct RowIdSeqSlice<'a> {
526    /// Current slice of the segments we cover
527    segments: &'a [U64Segment],
528    /// Offset into the first segment to start iterating from
529    offset_start: usize,
530    /// Offset into the last segment to stop iterating at
531    offset_last: usize,
532}
533
534impl RowIdSeqSlice<'_> {
535    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
536        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
537        known_size -= self.offset_start;
538        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
539
540        let end = self.segments.len().saturating_sub(1);
541        self.segments
542            .iter()
543            .enumerate()
544            .flat_map(move |(i, segment)| {
545                match i {
546                    0 if self.segments.len() == 1 => {
547                        let len = self.offset_last - self.offset_start;
548                        // TODO: Optimize this so we don't have to use skip
549                        // (take is probably fine though.)
550                        Box::new(segment.iter().skip(self.offset_start).take(len))
551                            as Box<dyn Iterator<Item = u64>>
552                    }
553                    0 => Box::new(segment.iter().skip(self.offset_start)),
554                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
555                    _ => Box::new(segment.iter()),
556                }
557            })
558            .exact_size(known_size)
559    }
560}
561
562/// Re-chunk a sequences of row ids into chunks of a given size.
563///
564/// The sequences may less than chunk sizes, because the sequences only
565/// contains the row ids that we want to keep, they come from the updates records.
566/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
567/// So if the sequences are smaller than the chunk sizes, we need to
568/// assign the incremental row ids in the further step. This behavior is controlled by the
569/// `allow_incomplete` parameter.
570///
571/// # Errors
572///
573/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
574/// is not equal to the total number of row ids in the sequences.
575pub fn rechunk_sequences(
576    sequences: impl IntoIterator<Item = RowIdSequence>,
577    chunk_sizes: impl IntoIterator<Item = u64>,
578    allow_incomplete: bool,
579) -> Result<Vec<RowIdSequence>> {
580    // TODO: return an iterator. (with a good size hint?)
581    let chunk_size_iter = chunk_sizes.into_iter();
582    let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
583    let mut segment_iter = sequences
584        .into_iter()
585        .flat_map(|sequence| sequence.0.into_iter())
586        .peekable();
587
588    let mut segment_offset = 0_u64;
589    for chunk_size in chunk_size_iter {
590        let mut sequence = RowIdSequence(Vec::new());
591        let mut remaining = chunk_size;
592
593        let too_many_segments_error = || {
594            Error::invalid_input(
595                "Got too many segments for the provided chunk lengths",
596                location!(),
597            )
598        };
599
600        while remaining > 0 {
601            let remaining_in_segment = segment_iter
602                .peek()
603                .map_or(0, |segment| segment.len() as u64 - segment_offset);
604            match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
605                (std::cmp::Ordering::Greater, _) => {
606                    // Can only push part of the segment, we are done with this chunk.
607                    let segment = segment_iter
608                        .peek()
609                        .ok_or_else(too_many_segments_error)?
610                        .slice(segment_offset as usize, remaining as usize);
611                    sequence.extend(RowIdSequence(vec![segment]));
612                    segment_offset += remaining;
613                    remaining = 0;
614                }
615                (_, 0) => {
616                    // Can push the entire segment.
617                    if let Some(segment) = segment_iter.next() {
618                        sequence.extend(RowIdSequence(vec![segment]));
619                        remaining = 0;
620                    } else if allow_incomplete {
621                        remaining = 0;
622                    } else {
623                        return Err(too_many_segments_error());
624                    }
625                }
626                (_, _) => {
627                    // Push remaining segment
628                    let segment = segment_iter
629                        .next()
630                        .ok_or_else(too_many_segments_error)?
631                        .slice(segment_offset as usize, remaining_in_segment as usize);
632                    sequence.extend(RowIdSequence(vec![segment]));
633                    segment_offset = 0;
634                    remaining -= remaining_in_segment;
635                }
636            }
637        }
638
639        chunked_sequences.push(sequence);
640    }
641
642    if segment_iter.peek().is_some() {
643        return Err(Error::invalid_input(
644            "Got too few segments for the provided chunk lengths",
645            location!(),
646        ));
647    }
648
649    Ok(chunked_sequences)
650}
651
652/// Selects the row ids from a sequence based on the provided offsets.
653pub fn select_row_ids<'a>(
654    sequence: &'a RowIdSequence,
655    offsets: &'a ReadBatchParams,
656) -> Result<Vec<u64>> {
657    let out_of_bounds_err = |offset: u32| {
658        Error::invalid_input(
659            format!(
660                "Index out of bounds: {} for sequence of length {}",
661                offset,
662                sequence.len()
663            ),
664            location!(),
665        )
666    };
667
668    match offsets {
669        // TODO: Optimize this if indices are sorted, which is a common case.
670        ReadBatchParams::Indices(indices) => indices
671            .values()
672            .iter()
673            .map(|index| {
674                sequence
675                    .get(*index as usize)
676                    .ok_or_else(|| out_of_bounds_err(*index))
677            })
678            .collect(),
679        ReadBatchParams::Range(range) => {
680            if range.end > sequence.len() as usize {
681                return Err(out_of_bounds_err(range.end as u32));
682            }
683            let sequence = sequence.slice(range.start, range.end - range.start);
684            Ok(sequence.iter().collect())
685        }
686        ReadBatchParams::Ranges(ranges) => {
687            let num_rows = ranges
688                .iter()
689                .map(|r| (r.end - r.start) as usize)
690                .sum::<usize>();
691            let mut result = Vec::with_capacity(num_rows);
692            for range in ranges.as_ref() {
693                if range.end > sequence.len() {
694                    return Err(out_of_bounds_err(range.end as u32));
695                }
696                let sequence =
697                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
698                result.extend(sequence.iter());
699            }
700            Ok(result)
701        }
702
703        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
704        ReadBatchParams::RangeTo(to) => {
705            if to.end > sequence.len() as usize {
706                return Err(out_of_bounds_err(to.end as u32));
707            }
708            let len = to.end;
709            let sequence = sequence.slice(0, len);
710            Ok(sequence.iter().collect())
711        }
712        ReadBatchParams::RangeFrom(from) => {
713            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
714            Ok(sequence.iter().collect())
715        }
716    }
717}
718
719#[cfg(test)]
720mod test {
721    use super::*;
722
723    use pretty_assertions::assert_eq;
724    use test::bitmap::Bitmap;
725
726    #[test]
727    fn test_row_id_sequence_from_range() {
728        let sequence = RowIdSequence::from(0..10);
729        assert_eq!(sequence.len(), 10);
730        assert_eq!(sequence.is_empty(), false);
731
732        let iter = sequence.iter();
733        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
734    }
735
736    #[test]
737    fn test_row_id_sequence_extend() {
738        let mut sequence = RowIdSequence::from(0..10);
739        sequence.extend(RowIdSequence::from(10..20));
740        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
741
742        let mut sequence = RowIdSequence::from(0..10);
743        sequence.extend(RowIdSequence::from(20..30));
744        assert_eq!(
745            sequence.0,
746            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
747        );
748    }
749
750    #[test]
751    fn test_row_id_sequence_delete() {
752        let mut sequence = RowIdSequence::from(0..10);
753        sequence.delete(vec![1, 3, 5, 7, 9]);
754        let mut expected_bitmap = Bitmap::new_empty(9);
755        for i in [0, 2, 4, 6, 8] {
756            expected_bitmap.set(i as usize);
757        }
758        assert_eq!(
759            sequence.0,
760            vec![U64Segment::RangeWithBitmap {
761                range: 0..9,
762                bitmap: expected_bitmap
763            },]
764        );
765
766        let mut sequence = RowIdSequence::from(0..10);
767        sequence.extend(RowIdSequence::from(12..20));
768        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
769        assert_eq!(
770            sequence.0,
771            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
772        );
773
774        let mut sequence = RowIdSequence::from(0..10);
775        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
776        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
777    }
778
779    #[test]
780    fn test_row_id_slice() {
781        // The type of sequence isn't that relevant to the implementation, so
782        // we can just have a single one with all the segment types.
783        let sequence = RowIdSequence(vec![
784            U64Segment::Range(30..35), // 5
785            U64Segment::RangeWithHoles {
786                // 8
787                range: 50..60,
788                holes: vec![53, 54].into(),
789            },
790            U64Segment::SortedArray(vec![7, 9].into()), // 2
791            U64Segment::RangeWithBitmap {
792                range: 0..5,
793                bitmap: [true, false, true, false, true].as_slice().into(),
794            },
795            U64Segment::Array(vec![35, 39].into()),
796            U64Segment::Range(40..50),
797        ]);
798
799        // All possible offsets and lengths
800        for offset in 0..sequence.len() as usize {
801            for len in 0..sequence.len() as usize {
802                if offset + len > sequence.len() as usize {
803                    continue;
804                }
805                let slice = sequence.slice(offset, len);
806
807                let actual = slice.iter().collect::<Vec<_>>();
808                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
809                assert_eq!(
810                    actual, expected,
811                    "Failed for offset {} and len {}",
812                    offset, len
813                );
814
815                let (claimed_size, claimed_max) = slice.iter().size_hint();
816                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
817                assert_eq!(claimed_size, actual.len()); // Correct size hint
818            }
819        }
820    }
821
822    #[test]
823    fn test_row_id_slice_empty() {
824        let sequence = RowIdSequence::from(0..10);
825        let slice = sequence.slice(10, 0);
826        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
827    }
828
829    #[test]
830    fn test_row_id_sequence_rechunk() {
831        fn assert_rechunked(
832            input: Vec<RowIdSequence>,
833            chunk_sizes: Vec<u64>,
834            expected: Vec<RowIdSequence>,
835        ) {
836            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
837            assert_eq!(chunked, expected);
838        }
839
840        // Small pieces to larger ones
841        let many_segments = vec![
842            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
843            RowIdSequence::from(10..18),
844            RowIdSequence::from(18..28),
845            RowIdSequence::from(28..30),
846        ];
847        let fewer_segments = vec![
848            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
849            RowIdSequence::from(10..30),
850        ];
851        assert_rechunked(
852            many_segments.clone(),
853            fewer_segments.iter().map(|seq| seq.len()).collect(),
854            fewer_segments.clone(),
855        );
856
857        // Large pieces to smaller ones
858        assert_rechunked(
859            fewer_segments,
860            many_segments.iter().map(|seq| seq.len()).collect(),
861            many_segments.clone(),
862        );
863
864        // Equal pieces
865        assert_rechunked(
866            many_segments.clone(),
867            many_segments.iter().map(|seq| seq.len()).collect(),
868            many_segments.clone(),
869        );
870
871        // Too few segments -> error
872        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
873        assert!(result.is_err());
874
875        // Too many segments -> error
876        let result = rechunk_sequences(many_segments, vec![5], false);
877        assert!(result.is_err());
878    }
879
880    #[test]
881    fn test_select_row_ids() {
882        // All forms of offsets
883        let offsets = [
884            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
885            ReadBatchParams::Range(2..8),
886            ReadBatchParams::RangeFull,
887            ReadBatchParams::RangeTo(..5),
888            ReadBatchParams::RangeFrom(5..),
889            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
890        ];
891
892        // Sequences with all segment types. These have at least 10 elements,
893        // so they are valid for all the above offsets.
894        let sequences = [
895            RowIdSequence(vec![
896                U64Segment::Range(0..5),
897                U64Segment::RangeWithHoles {
898                    range: 50..60,
899                    holes: vec![53, 54].into(),
900                },
901                U64Segment::SortedArray(vec![7, 9].into()),
902            ]),
903            RowIdSequence(vec![
904                U64Segment::RangeWithBitmap {
905                    range: 0..5,
906                    bitmap: [true, false, true, false, true].as_slice().into(),
907                },
908                U64Segment::Array(vec![30, 20, 10].into()),
909                U64Segment::Range(40..50),
910            ]),
911        ];
912
913        for params in offsets {
914            for sequence in &sequences {
915                let row_ids = select_row_ids(sequence, &params).unwrap();
916                let flat_sequence = sequence.iter().collect::<Vec<_>>();
917
918                // Transform params into bounded ones
919                let selection: Vec<usize> = match &params {
920                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
921                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
922                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
923                    ReadBatchParams::Range(range) => range.clone().collect(),
924                    ReadBatchParams::Ranges(ranges) => ranges
925                        .iter()
926                        .flat_map(|r| r.start as usize..r.end as usize)
927                        .collect(),
928                    ReadBatchParams::Indices(indices) => {
929                        indices.values().iter().map(|i| *i as usize).collect()
930                    }
931                };
932
933                let expected = selection
934                    .into_iter()
935                    .map(|i| flat_sequence[i])
936                    .collect::<Vec<_>>();
937                assert_eq!(
938                    row_ids, expected,
939                    "Failed for params {:?} on the sequence {:?}",
940                    &params, sequence
941                );
942            }
943        }
944    }
945
946    #[test]
947    fn test_select_row_ids_out_of_bounds() {
948        let offsets = [
949            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
950            ReadBatchParams::Range(2..1000),
951            ReadBatchParams::RangeTo(..1000),
952        ];
953
954        let sequence = RowIdSequence::from(0..10);
955
956        for params in offsets {
957            let result = select_row_ids(&sequence, &params);
958            assert!(result.is_err());
959            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
960        }
961    }
962
963    #[test]
964    fn test_row_id_sequence_to_treemap() {
965        let sequence = RowIdSequence(vec![
966            U64Segment::Range(0..5),
967            U64Segment::RangeWithHoles {
968                range: 50..60,
969                holes: vec![53, 54].into(),
970            },
971            U64Segment::SortedArray(vec![7, 9].into()),
972            U64Segment::RangeWithBitmap {
973                range: 10..15,
974                bitmap: [true, false, true, false, true].as_slice().into(),
975            },
976            U64Segment::Array(vec![35, 39].into()),
977            U64Segment::Range(40..50),
978        ]);
979
980        let tree_map = RowIdTreeMap::from(&sequence);
981        let expected = vec![
982            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
983            51, 52, 55, 56, 57, 58, 59,
984        ]
985        .into_iter()
986        .collect::<RowIdTreeMap>();
987        assert_eq!(tree_map, expected);
988    }
989
990    #[test]
991    fn test_row_id_mask() {
992        // 0, 1, 2, 3, 4
993        // 50, 51, 52, 55, 56, 57, 58, 59
994        // 7, 9
995        // 10, 12, 14
996        // 35, 39
997        let sequence = RowIdSequence(vec![
998            U64Segment::Range(0..5),
999            U64Segment::RangeWithHoles {
1000                range: 50..60,
1001                holes: vec![53, 54].into(),
1002            },
1003            U64Segment::SortedArray(vec![7, 9].into()),
1004            U64Segment::RangeWithBitmap {
1005                range: 10..15,
1006                bitmap: [true, false, true, false, true].as_slice().into(),
1007            },
1008            U64Segment::Array(vec![35, 39].into()),
1009        ]);
1010
1011        // Masking one in each segment
1012        let values_to_remove = [4, 55, 7, 12, 39];
1013        let positions_to_remove = sequence
1014            .iter()
1015            .enumerate()
1016            .filter_map(|(i, val)| {
1017                if values_to_remove.contains(&val) {
1018                    Some(i as u32)
1019                } else {
1020                    None
1021                }
1022            })
1023            .collect::<Vec<_>>();
1024        let mut sequence = sequence;
1025        sequence.mask(positions_to_remove).unwrap();
1026        let expected = RowIdSequence(vec![
1027            U64Segment::Range(0..4),
1028            U64Segment::RangeWithBitmap {
1029                range: 50..60,
1030                bitmap: [
1031                    true, true, true, false, false, false, true, true, true, true,
1032                ]
1033                .as_slice()
1034                .into(),
1035            },
1036            U64Segment::Range(9..10),
1037            U64Segment::RangeWithBitmap {
1038                range: 10..15,
1039                bitmap: [true, false, false, false, true].as_slice().into(),
1040            },
1041            U64Segment::Array(vec![35].into()),
1042        ]);
1043        assert_eq!(sequence, expected);
1044    }
1045
1046    #[test]
1047    fn test_row_id_mask_everything() {
1048        let mut sequence = RowIdSequence(vec![
1049            U64Segment::Range(0..5),
1050            U64Segment::SortedArray(vec![7, 9].into()),
1051        ]);
1052        sequence.mask(0..sequence.len() as u32).unwrap();
1053        let expected = RowIdSequence(vec![]);
1054        assert_eq!(sequence, expected);
1055    }
1056
1057    #[test]
1058    fn test_selection() {
1059        let sequence = RowIdSequence(vec![
1060            U64Segment::Range(0..5),
1061            U64Segment::Range(10..15),
1062            U64Segment::Range(20..25),
1063        ]);
1064        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1065        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1066    }
1067
1068    #[test]
1069    #[should_panic]
1070    fn test_selection_unsorted() {
1071        let sequence = RowIdSequence(vec![
1072            U64Segment::Range(0..5),
1073            U64Segment::Range(10..15),
1074            U64Segment::Range(20..25),
1075        ]);
1076        let _ = sequence
1077            .select(vec![2, 4, 3].into_iter())
1078            .collect::<Vec<_>>();
1079    }
1080
1081    #[test]
1082    fn test_mask_to_offset_ranges() {
1083        // Tests with a simple range segment
1084        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1085        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1086        let ranges = sequence.mask_to_offset_ranges(&mask);
1087        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1088
1089        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1090        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1091        let ranges = sequence.mask_to_offset_ranges(&mask);
1092        assert_eq!(ranges, vec![14..15]);
1093
1094        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1095        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1096        let ranges = sequence.mask_to_offset_ranges(&mask);
1097        assert_eq!(ranges, vec![0..14, 15..20]);
1098
1099        // Test with a range segment with holes
1100        // 0, 1, 3, 4, 5, 7, 8, 9
1101        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1102            range: 0..10,
1103            holes: vec![2, 6].into(),
1104        }]);
1105        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1106        let ranges = sequence.mask_to_offset_ranges(&mask);
1107        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1108
1109        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1110            range: 40..60,
1111            holes: vec![47, 43].into(),
1112        }]);
1113        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1114        let ranges = sequence.mask_to_offset_ranges(&mask);
1115        assert_eq!(ranges, vec![3..4]);
1116
1117        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1118            range: 40..60,
1119            holes: vec![47, 43].into(),
1120        }]);
1121        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1122        let ranges = sequence.mask_to_offset_ranges(&mask);
1123        assert_eq!(ranges, vec![0..3, 4..18]);
1124
1125        // Test with a range segment with bitmap
1126        // 0, 1, 4, 5, 6, 7
1127        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1128            range: 0..10,
1129            bitmap: [
1130                true, true, false, false, true, true, true, true, false, false,
1131            ]
1132            .as_slice()
1133            .into(),
1134        }]);
1135        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1136        let ranges = sequence.mask_to_offset_ranges(&mask);
1137        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1138
1139        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1140            range: 40..45,
1141            bitmap: [true, true, false, false, true].as_slice().into(),
1142        }]);
1143        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1144        let ranges = sequence.mask_to_offset_ranges(&mask);
1145        assert_eq!(ranges, vec![2..3]);
1146
1147        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1148            range: 40..45,
1149            bitmap: [true, true, false, false, true].as_slice().into(),
1150        }]);
1151        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1152        let ranges = sequence.mask_to_offset_ranges(&mask);
1153        assert_eq!(ranges, vec![0..2]);
1154
1155        // Test with a sorted array segment
1156        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1157        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1158        let ranges = sequence.mask_to_offset_ranges(&mask);
1159        assert_eq!(ranges, vec![0..1, 3..5]);
1160
1161        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1162        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1163        let ranges = sequence.mask_to_offset_ranges(&mask);
1164        assert_eq!(ranges, vec![0..1, 2..4]);
1165
1166        // Test with multiple segments
1167        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1168        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1169        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1170        let sequence = RowIdSequence(vec![
1171            U64Segment::Range(0..5),
1172            U64Segment::RangeWithHoles {
1173                range: 100..105,
1174                holes: vec![103].into(),
1175            },
1176            U64Segment::SortedArray(vec![44, 46, 78].into()),
1177        ]);
1178        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1179        let ranges = sequence.mask_to_offset_ranges(&mask);
1180        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1181
1182        // Test with empty mask (should select everything)
1183        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1184        let mask = RowIdMask::default();
1185        let ranges = sequence.mask_to_offset_ranges(&mask);
1186        assert_eq!(ranges, vec![0..10]);
1187
1188        // Test with allow nothing mask
1189        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1190        let mask = RowIdMask::allow_nothing();
1191        let ranges = sequence.mask_to_offset_ranges(&mask);
1192        assert_eq!(ranges, vec![]);
1193    }
1194}