lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15
16// TODO: replace this with Arrow BooleanBuffer.
17
18// These are all internal data structures, and are private.
19mod bitmap;
20mod encoded_array;
21mod index;
22mod segment;
23mod serde;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::RowIdIndex;
28use lance_core::{
29    utils::mask::{RowIdMask, RowIdTreeMap},
30    Error, Result,
31};
32use lance_io::ReadBatchParams;
33pub use serde::{read_row_ids, write_row_ids};
34
35use snafu::location;
36
37use segment::U64Segment;
38
39use crate::utils::LanceIteratorExtension;
40
41/// A sequence of row ids.
42///
43/// Row ids are u64s that:
44///
45/// 1. Are **unique** within a table (except for tombstones)
46/// 2. Are *often* but not always sorted and/or contiguous.
47///
48/// This sequence of row ids is optimized to be compact when the row ids are
49/// contiguous and sorted. However, it does not require that the row ids are
50/// contiguous or sorted.
51///
52/// We can make optimizations that assume uniqueness.
53#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq)]
54pub struct RowIdSequence(Vec<U64Segment>);
55
56impl std::fmt::Display for RowIdSequence {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        let mut iter = self.iter();
59        let mut first_10 = Vec::new();
60        let mut last_10 = Vec::new();
61        for row_id in iter.by_ref() {
62            first_10.push(row_id);
63            if first_10.len() > 10 {
64                break;
65            }
66        }
67
68        while let Some(row_id) = iter.next_back() {
69            last_10.push(row_id);
70            if last_10.len() > 10 {
71                break;
72            }
73        }
74        last_10.reverse();
75
76        let theres_more = iter.next().is_some();
77
78        write!(f, "[")?;
79        for row_id in first_10 {
80            write!(f, "{}", row_id)?;
81        }
82        if theres_more {
83            write!(f, ", ...")?;
84        }
85        for row_id in last_10 {
86            write!(f, ", {}", row_id)?;
87        }
88        write!(f, "]")
89    }
90}
91
92impl From<Range<u64>> for RowIdSequence {
93    fn from(range: Range<u64>) -> Self {
94        Self(vec![U64Segment::Range(range)])
95    }
96}
97
98impl RowIdSequence {
99    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
100        self.0.iter().flat_map(|segment| segment.iter())
101    }
102
103    pub fn len(&self) -> u64 {
104        self.0.iter().map(|segment| segment.len() as u64).sum()
105    }
106
107    pub fn is_empty(&self) -> bool {
108        self.0.is_empty()
109    }
110
111    /// Combines this row id sequence with another row id sequence.
112    pub fn extend(&mut self, other: Self) {
113        // If the last element of this sequence and the first element of next
114        // sequence are ranges, we might be able to combine them into a single
115        // range.
116        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
117            (self.0.last(), other.0.first())
118        {
119            if range1.end == range2.start {
120                let new_range = U64Segment::Range(range1.start..range2.end);
121                self.0.pop();
122                self.0.push(new_range);
123                self.0.extend(other.0.into_iter().skip(1));
124                return;
125            }
126        }
127        // TODO: add other optimizations, such as combining two RangeWithHoles.
128        self.0.extend(other.0);
129    }
130
131    /// Remove a set of row ids from the sequence.
132    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
133        // Order the row ids by position in which they appear in the sequence.
134        let (row_ids, offsets) = self.find_ids(row_ids);
135
136        let capacity = self.0.capacity();
137        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
138        let mut remaining_segments = old_segments.as_slice();
139
140        for (segment_idx, range) in offsets {
141            let segments_handled = old_segments.len() - remaining_segments.len();
142            let segments_to_add = segment_idx - segments_handled;
143            self.0
144                .extend_from_slice(&remaining_segments[..segments_to_add]);
145            remaining_segments = &remaining_segments[segments_to_add..];
146
147            let segment;
148            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
149
150            let segment_ids = &row_ids[range];
151            self.0.push(segment.delete(segment_ids));
152        }
153
154        // Add the remaining segments.
155        self.0.extend_from_slice(remaining_segments);
156    }
157
158    /// Delete row ids by position.
159    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
160        let mut local_positions = Vec::new();
161        let mut positions_iter = positions.into_iter();
162        let mut curr_position = positions_iter.next();
163        let mut offset = 0;
164        let mut cutoff = 0;
165
166        for segment in &mut self.0 {
167            // Make vector of local positions
168            cutoff += segment.len() as u32;
169            while let Some(position) = curr_position {
170                if position >= cutoff {
171                    break;
172                }
173                local_positions.push(position - offset);
174                curr_position = positions_iter.next();
175            }
176
177            if !local_positions.is_empty() {
178                segment.mask(&local_positions);
179                local_positions.clear();
180            }
181            offset = cutoff;
182        }
183
184        self.0.retain(|segment| segment.len() != 0);
185
186        Ok(())
187    }
188
189    /// Find the row ids in the sequence.
190    ///
191    /// Returns the row ids sorted by their appearance in the sequence.
192    /// Also returns the segment index and the range where that segment's
193    /// row id matches are found in the returned row id vector.
194    fn find_ids(
195        &self,
196        row_ids: impl IntoIterator<Item = u64>,
197    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
198        // Often, the row ids will already be provided in the order they appear.
199        // So the optimal way to search will be to cycle through rather than
200        // restarting the search from the beginning each time.
201        let mut segment_iter = self.0.iter().enumerate().cycle();
202
203        let mut segment_matches = vec![Vec::new(); self.0.len()];
204
205        row_ids.into_iter().for_each(|row_id| {
206            let mut i = 0;
207            // If we've cycled through all segments, we know the row id is not in the sequence.
208            while i < self.0.len() {
209                let (segment_idx, segment) = segment_iter.next().unwrap();
210                if segment.range().is_some_and(|range| range.contains(&row_id)) {
211                    if let Some(offset) = segment.position(row_id) {
212                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
213                    }
214                    // The row id was not found it the segment. It might be in a later segment.
215                }
216                i += 1;
217            }
218        });
219        for matches in &mut segment_matches {
220            matches.sort_unstable();
221        }
222
223        let mut offset = 0;
224        let segment_ranges = segment_matches
225            .iter()
226            .enumerate()
227            .filter(|(_, matches)| !matches.is_empty())
228            .map(|(segment_idx, matches)| {
229                let range = offset..offset + matches.len();
230                offset += matches.len();
231                (segment_idx, range)
232            })
233            .collect();
234        let row_ids = segment_matches
235            .into_iter()
236            .enumerate()
237            .flat_map(|(segment_idx, offset)| {
238                offset
239                    .into_iter()
240                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
241            })
242            .collect();
243
244        (row_ids, segment_ranges)
245    }
246
247    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
248        if len == 0 {
249            return RowIdSeqSlice {
250                segments: &[],
251                offset_start: 0,
252                offset_last: 0,
253            };
254        }
255
256        // Find the starting position
257        let mut offset_start = offset;
258        let mut segment_offset = 0;
259        for segment in &self.0 {
260            let segment_len = segment.len();
261            if offset_start < segment_len {
262                break;
263            }
264            offset_start -= segment_len;
265            segment_offset += 1;
266        }
267
268        // Find the ending position
269        let mut offset_last = offset_start + len;
270        let mut segment_offset_last = segment_offset;
271        for segment in &self.0[segment_offset..] {
272            let segment_len = segment.len();
273            if offset_last <= segment_len {
274                break;
275            }
276            offset_last -= segment_len;
277            segment_offset_last += 1;
278        }
279
280        RowIdSeqSlice {
281            segments: &self.0[segment_offset..=segment_offset_last],
282            offset_start,
283            offset_last,
284        }
285    }
286
287    /// Get the row id at the given index.
288    ///
289    /// If the index is out of bounds, this will return None.
290    pub fn get(&self, index: usize) -> Option<u64> {
291        let mut offset = 0;
292        for segment in &self.0 {
293            let segment_len = segment.len();
294            if index < offset + segment_len {
295                return segment.get(index - offset);
296            }
297            offset += segment_len;
298        }
299        None
300    }
301
302    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
303    /// in the sequence.
304    ///
305    /// For example, given a mask that selects all even ids and a sequence that is
306    /// [80..85, 86..90, 14]
307    ///
308    /// this will return [0, 2, 4, 5, 7, 9]
309    /// because the range expands to
310    ///
311    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
312    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
313    ///
314    /// This function is useful when determining which row offsets to read from a fragment given
315    /// a mask.
316    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
317        let mut offset = 0;
318        let mut ranges = Vec::new();
319        for segment in &self.0 {
320            match segment {
321                U64Segment::Range(range) => {
322                    let mut ids = RowIdTreeMap::from(range.clone());
323                    ids.mask(mask);
324                    ranges.extend(GroupingIterator::new(
325                        unsafe { ids.into_id_iter() }.map(|id| id - range.start + offset),
326                    ));
327                    offset += range.end - range.start;
328                }
329                U64Segment::RangeWithHoles { range, holes } => {
330                    let offset_start = offset;
331                    let mut ids = RowIdTreeMap::from(range.clone());
332                    offset += range.end - range.start;
333                    for hole in holes.iter() {
334                        if ids.remove(hole) {
335                            offset -= 1;
336                        }
337                    }
338                    ids.mask(mask);
339
340                    // Sadly we can't just subtract the offset because of the holes
341                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
342                    sorted_holes.sort_unstable();
343                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
344                    let mut holes_passed = 0;
345                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
346                        |id| {
347                            while let Some(next_hole) = next_holes_iter.peek() {
348                                if *next_hole < id {
349                                    next_holes_iter.next();
350                                    holes_passed += 1;
351                                } else {
352                                    break;
353                                }
354                            }
355                            id - range.start + offset_start - holes_passed
356                        },
357                    )));
358                }
359                U64Segment::RangeWithBitmap { range, bitmap } => {
360                    let mut ids = RowIdTreeMap::from(range.clone());
361                    let offset_start = offset;
362                    offset += range.end - range.start;
363                    for (i, val) in range.clone().enumerate() {
364                        if !bitmap.get(i) && ids.remove(val) {
365                            offset -= 1;
366                        }
367                    }
368                    ids.mask(mask);
369                    let mut bitmap_iter = bitmap.iter();
370                    let mut bitmap_iter_pos = 0;
371                    let mut holes_passed = 0;
372                    ranges.extend(GroupingIterator::new(unsafe { ids.into_id_iter() }.map(
373                        |id| {
374                            let offset_no_holes = id - range.start + offset_start;
375                            while bitmap_iter_pos < offset_no_holes {
376                                if !bitmap_iter.next().unwrap() {
377                                    holes_passed += 1;
378                                }
379                                bitmap_iter_pos += 1;
380                            }
381                            offset_no_holes - holes_passed
382                        },
383                    )));
384                }
385                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
386                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
387                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
388                        |(off, id)| {
389                            if mask.selected(id) {
390                                Some(off as u64 + offset)
391                            } else {
392                                None
393                            }
394                        },
395                    )));
396                    offset += array.len() as u64;
397                }
398            }
399        }
400        ranges
401    }
402}
403
404/// An iterator that groups row ids into ranges
405///
406/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
407/// this will return an iterator of [(1..4), (5..8), (10..13)]
408struct GroupingIterator<I: Iterator<Item = u64>> {
409    iter: I,
410    cur_range: Option<Range<u64>>,
411}
412
413impl<I: Iterator<Item = u64>> GroupingIterator<I> {
414    fn new(iter: I) -> Self {
415        Self {
416            iter,
417            cur_range: None,
418        }
419    }
420}
421
422impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
423    type Item = Range<u64>;
424
425    fn next(&mut self) -> Option<Self::Item> {
426        for id in self.iter.by_ref() {
427            if let Some(range) = self.cur_range.as_mut() {
428                if range.end == id {
429                    range.end = id + 1;
430                } else {
431                    let ret = Some(range.clone());
432                    self.cur_range = Some(id..id + 1);
433                    return ret;
434                }
435            } else {
436                self.cur_range = Some(id..id + 1);
437            }
438        }
439        self.cur_range.take()
440    }
441}
442
443impl From<&RowIdSequence> for RowIdTreeMap {
444    fn from(row_ids: &RowIdSequence) -> Self {
445        let mut tree_map = Self::new();
446        for segment in &row_ids.0 {
447            match segment {
448                U64Segment::Range(range) => {
449                    tree_map.insert_range(range.clone());
450                }
451                U64Segment::RangeWithBitmap { range, bitmap } => {
452                    tree_map.insert_range(range.clone());
453                    for (i, val) in range.clone().enumerate() {
454                        if !bitmap.get(i) {
455                            tree_map.remove(val);
456                        }
457                    }
458                }
459                U64Segment::RangeWithHoles { range, holes } => {
460                    tree_map.insert_range(range.clone());
461                    for hole in holes.iter() {
462                        tree_map.remove(hole);
463                    }
464                }
465                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
466                    for val in array.iter() {
467                        tree_map.insert(val);
468                    }
469                }
470            }
471        }
472        tree_map
473    }
474}
475
476#[derive(Debug)]
477pub struct RowIdSeqSlice<'a> {
478    /// Current slice of the segments we cover
479    segments: &'a [U64Segment],
480    /// Offset into the first segment to start iterating from
481    offset_start: usize,
482    /// Offset into the last segment to stop iterating at
483    offset_last: usize,
484}
485
486impl RowIdSeqSlice<'_> {
487    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
488        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
489        known_size -= self.offset_start;
490        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
491
492        let end = self.segments.len().saturating_sub(1);
493        self.segments
494            .iter()
495            .enumerate()
496            .flat_map(move |(i, segment)| {
497                match i {
498                    0 if self.segments.len() == 1 => {
499                        let len = self.offset_last - self.offset_start;
500                        // TODO: Optimize this so we don't have to use skip
501                        // (take is probably fine though.)
502                        Box::new(segment.iter().skip(self.offset_start).take(len))
503                            as Box<dyn Iterator<Item = u64>>
504                    }
505                    0 => Box::new(segment.iter().skip(self.offset_start)),
506                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
507                    _ => Box::new(segment.iter()),
508                }
509            })
510            .exact_size(known_size)
511    }
512}
513
514/// Re-chunk a sequences of row ids into chunks of a given size.
515///
516/// # Errors
517///
518/// Will return an error if the sum of the chunk sizes is not equal to the total
519/// number of row ids in the sequences.
520pub fn rechunk_sequences(
521    sequences: impl IntoIterator<Item = RowIdSequence>,
522    chunk_sizes: impl IntoIterator<Item = u64>,
523) -> Result<Vec<RowIdSequence>> {
524    // TODO: return an iterator. (with a good size hint?)
525    let chunk_size_iter = chunk_sizes.into_iter();
526    let mut chunked_sequences = Vec::with_capacity(chunk_size_iter.size_hint().0);
527    let mut segment_iter = sequences
528        .into_iter()
529        .flat_map(|sequence| sequence.0.into_iter())
530        .peekable();
531
532    let mut segment_offset = 0_u64;
533    for chunk_size in chunk_size_iter {
534        let mut sequence = RowIdSequence(Vec::new());
535        let mut remaining = chunk_size;
536
537        let too_many_segments_error = || {
538            Error::invalid_input(
539                "Got too many segments for the provided chunk lengths",
540                location!(),
541            )
542        };
543
544        while remaining > 0 {
545            let remaining_in_segment = segment_iter
546                .peek()
547                .map_or(0, |segment| segment.len() as u64 - segment_offset);
548            match (remaining_in_segment.cmp(&remaining), remaining_in_segment) {
549                (std::cmp::Ordering::Greater, _) => {
550                    // Can only push part of the segment, we are done with this chunk.
551                    let segment = segment_iter
552                        .peek()
553                        .ok_or_else(too_many_segments_error)?
554                        .slice(segment_offset as usize, remaining as usize);
555                    sequence.extend(RowIdSequence(vec![segment]));
556                    segment_offset += remaining;
557                    remaining = 0;
558                }
559                (_, 0) => {
560                    // Can push the entire segment.
561                    let segment = segment_iter.next().ok_or_else(too_many_segments_error)?;
562                    sequence.extend(RowIdSequence(vec![segment]));
563                    remaining = 0;
564                }
565                (_, _) => {
566                    // Push remaining segment
567                    let segment = segment_iter
568                        .next()
569                        .ok_or_else(too_many_segments_error)?
570                        .slice(segment_offset as usize, remaining_in_segment as usize);
571                    sequence.extend(RowIdSequence(vec![segment]));
572                    segment_offset = 0;
573                    remaining -= remaining_in_segment;
574                }
575            }
576        }
577
578        chunked_sequences.push(sequence);
579    }
580
581    if segment_iter.peek().is_some() {
582        return Err(Error::invalid_input(
583            "Got too few segments for the provided chunk lengths",
584            location!(),
585        ));
586    }
587
588    Ok(chunked_sequences)
589}
590
591/// Selects the row ids from a sequence based on the provided offsets.
592pub fn select_row_ids<'a>(
593    sequence: &'a RowIdSequence,
594    offsets: &'a ReadBatchParams,
595) -> Result<Vec<u64>> {
596    let out_of_bounds_err = |offset: u32| {
597        Error::invalid_input(
598            format!(
599                "Index out of bounds: {} for sequence of length {}",
600                offset,
601                sequence.len()
602            ),
603            location!(),
604        )
605    };
606
607    match offsets {
608        // TODO: Optimize this if indices are sorted, which is a common case.
609        ReadBatchParams::Indices(indices) => indices
610            .values()
611            .iter()
612            .map(|index| {
613                sequence
614                    .get(*index as usize)
615                    .ok_or_else(|| out_of_bounds_err(*index))
616            })
617            .collect(),
618        ReadBatchParams::Range(range) => {
619            if range.end > sequence.len() as usize {
620                return Err(out_of_bounds_err(range.end as u32));
621            }
622            let sequence = sequence.slice(range.start, range.end - range.start);
623            Ok(sequence.iter().collect())
624        }
625        ReadBatchParams::Ranges(ranges) => {
626            let num_rows = ranges
627                .iter()
628                .map(|r| (r.end - r.start) as usize)
629                .sum::<usize>();
630            let mut result = Vec::with_capacity(num_rows);
631            for range in ranges.as_ref() {
632                if range.end > sequence.len() {
633                    return Err(out_of_bounds_err(range.end as u32));
634                }
635                let sequence =
636                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
637                result.extend(sequence.iter());
638            }
639            Ok(result)
640        }
641
642        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
643        ReadBatchParams::RangeTo(to) => {
644            if to.end > sequence.len() as usize {
645                return Err(out_of_bounds_err(to.end as u32));
646            }
647            let len = to.end;
648            let sequence = sequence.slice(0, len);
649            Ok(sequence.iter().collect())
650        }
651        ReadBatchParams::RangeFrom(from) => {
652            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
653            Ok(sequence.iter().collect())
654        }
655    }
656}
657
658#[cfg(test)]
659mod test {
660    use super::*;
661
662    use pretty_assertions::assert_eq;
663    use test::bitmap::Bitmap;
664
665    #[test]
666    fn test_row_id_sequence_from_range() {
667        let sequence = RowIdSequence::from(0..10);
668        assert_eq!(sequence.len(), 10);
669        assert_eq!(sequence.is_empty(), false);
670
671        let iter = sequence.iter();
672        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
673    }
674
675    #[test]
676    fn test_row_id_sequence_extend() {
677        let mut sequence = RowIdSequence::from(0..10);
678        sequence.extend(RowIdSequence::from(10..20));
679        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
680
681        let mut sequence = RowIdSequence::from(0..10);
682        sequence.extend(RowIdSequence::from(20..30));
683        assert_eq!(
684            sequence.0,
685            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
686        );
687    }
688
689    #[test]
690    fn test_row_id_sequence_delete() {
691        let mut sequence = RowIdSequence::from(0..10);
692        sequence.delete(vec![1, 3, 5, 7, 9]);
693        let mut expected_bitmap = Bitmap::new_empty(9);
694        for i in [0, 2, 4, 6, 8] {
695            expected_bitmap.set(i as usize);
696        }
697        assert_eq!(
698            sequence.0,
699            vec![U64Segment::RangeWithBitmap {
700                range: 0..9,
701                bitmap: expected_bitmap
702            },]
703        );
704
705        let mut sequence = RowIdSequence::from(0..10);
706        sequence.extend(RowIdSequence::from(12..20));
707        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
708        assert_eq!(
709            sequence.0,
710            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
711        );
712
713        let mut sequence = RowIdSequence::from(0..10);
714        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
715        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
716    }
717
718    #[test]
719    fn test_row_id_slice() {
720        // The type of sequence isn't that relevant to the implementation, so
721        // we can just have a single one with all the segment types.
722        let sequence = RowIdSequence(vec![
723            U64Segment::Range(30..35), // 5
724            U64Segment::RangeWithHoles {
725                // 8
726                range: 50..60,
727                holes: vec![53, 54].into(),
728            },
729            U64Segment::SortedArray(vec![7, 9].into()), // 2
730            U64Segment::RangeWithBitmap {
731                range: 0..5,
732                bitmap: [true, false, true, false, true].as_slice().into(),
733            },
734            U64Segment::Array(vec![35, 39].into()),
735            U64Segment::Range(40..50),
736        ]);
737
738        // All possible offsets and lengths
739        for offset in 0..sequence.len() as usize {
740            for len in 0..sequence.len() as usize {
741                if offset + len > sequence.len() as usize {
742                    continue;
743                }
744                let slice = sequence.slice(offset, len);
745
746                let actual = slice.iter().collect::<Vec<_>>();
747                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
748                assert_eq!(
749                    actual, expected,
750                    "Failed for offset {} and len {}",
751                    offset, len
752                );
753
754                let (claimed_size, claimed_max) = slice.iter().size_hint();
755                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
756                assert_eq!(claimed_size, actual.len()); // Correct size hint
757            }
758        }
759    }
760
761    #[test]
762    fn test_row_id_slice_empty() {
763        let sequence = RowIdSequence::from(0..10);
764        let slice = sequence.slice(10, 0);
765        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
766    }
767
768    #[test]
769    fn test_row_id_sequence_rechunk() {
770        fn assert_rechunked(
771            input: Vec<RowIdSequence>,
772            chunk_sizes: Vec<u64>,
773            expected: Vec<RowIdSequence>,
774        ) {
775            let chunked = rechunk_sequences(input, chunk_sizes).unwrap();
776            assert_eq!(chunked, expected);
777        }
778
779        // Small pieces to larger ones
780        let many_segments = vec![
781            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
782            RowIdSequence::from(10..18),
783            RowIdSequence::from(18..28),
784            RowIdSequence::from(28..30),
785        ];
786        let fewer_segments = vec![
787            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
788            RowIdSequence::from(10..30),
789        ];
790        assert_rechunked(
791            many_segments.clone(),
792            fewer_segments.iter().map(|seq| seq.len()).collect(),
793            fewer_segments.clone(),
794        );
795
796        // Large pieces to smaller ones
797        assert_rechunked(
798            fewer_segments,
799            many_segments.iter().map(|seq| seq.len()).collect(),
800            many_segments.clone(),
801        );
802
803        // Equal pieces
804        assert_rechunked(
805            many_segments.clone(),
806            many_segments.iter().map(|seq| seq.len()).collect(),
807            many_segments.clone(),
808        );
809
810        // Too few segments -> error
811        let result = rechunk_sequences(many_segments.clone(), vec![100]);
812        assert!(result.is_err());
813
814        // Too many segments -> error
815        let result = rechunk_sequences(many_segments, vec![5]);
816        assert!(result.is_err());
817    }
818
819    #[test]
820    fn test_select_row_ids() {
821        // All forms of offsets
822        let offsets = [
823            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
824            ReadBatchParams::Range(2..8),
825            ReadBatchParams::RangeFull,
826            ReadBatchParams::RangeTo(..5),
827            ReadBatchParams::RangeFrom(5..),
828            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
829        ];
830
831        // Sequences with all segment types. These have at least 10 elements,
832        // so they are valid for all the above offsets.
833        let sequences = [
834            RowIdSequence(vec![
835                U64Segment::Range(0..5),
836                U64Segment::RangeWithHoles {
837                    range: 50..60,
838                    holes: vec![53, 54].into(),
839                },
840                U64Segment::SortedArray(vec![7, 9].into()),
841            ]),
842            RowIdSequence(vec![
843                U64Segment::RangeWithBitmap {
844                    range: 0..5,
845                    bitmap: [true, false, true, false, true].as_slice().into(),
846                },
847                U64Segment::Array(vec![30, 20, 10].into()),
848                U64Segment::Range(40..50),
849            ]),
850        ];
851
852        for params in offsets {
853            for sequence in &sequences {
854                let row_ids = select_row_ids(sequence, &params).unwrap();
855                let flat_sequence = sequence.iter().collect::<Vec<_>>();
856
857                // Transform params into bounded ones
858                let selection: Vec<usize> = match &params {
859                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
860                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
861                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
862                    ReadBatchParams::Range(range) => range.clone().collect(),
863                    ReadBatchParams::Ranges(ranges) => ranges
864                        .iter()
865                        .flat_map(|r| r.start as usize..r.end as usize)
866                        .collect(),
867                    ReadBatchParams::Indices(indices) => {
868                        indices.values().iter().map(|i| *i as usize).collect()
869                    }
870                };
871
872                let expected = selection
873                    .into_iter()
874                    .map(|i| flat_sequence[i])
875                    .collect::<Vec<_>>();
876                assert_eq!(
877                    row_ids, expected,
878                    "Failed for params {:?} on the sequence {:?}",
879                    &params, sequence
880                );
881            }
882        }
883    }
884
885    #[test]
886    fn test_select_row_ids_out_of_bounds() {
887        let offsets = [
888            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
889            ReadBatchParams::Range(2..1000),
890            ReadBatchParams::RangeTo(..1000),
891        ];
892
893        let sequence = RowIdSequence::from(0..10);
894
895        for params in offsets {
896            let result = select_row_ids(&sequence, &params);
897            assert!(result.is_err());
898            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
899        }
900    }
901
902    #[test]
903    fn test_row_id_sequence_to_treemap() {
904        let sequence = RowIdSequence(vec![
905            U64Segment::Range(0..5),
906            U64Segment::RangeWithHoles {
907                range: 50..60,
908                holes: vec![53, 54].into(),
909            },
910            U64Segment::SortedArray(vec![7, 9].into()),
911            U64Segment::RangeWithBitmap {
912                range: 10..15,
913                bitmap: [true, false, true, false, true].as_slice().into(),
914            },
915            U64Segment::Array(vec![35, 39].into()),
916            U64Segment::Range(40..50),
917        ]);
918
919        let tree_map = RowIdTreeMap::from(&sequence);
920        let expected = vec![
921            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
922            51, 52, 55, 56, 57, 58, 59,
923        ]
924        .into_iter()
925        .collect::<RowIdTreeMap>();
926        assert_eq!(tree_map, expected);
927    }
928
929    #[test]
930    fn test_row_id_mask() {
931        // 0, 1, 2, 3, 4
932        // 50, 51, 52, 55, 56, 57, 58, 59
933        // 7, 9
934        // 10, 12, 14
935        // 35, 39
936        let sequence = RowIdSequence(vec![
937            U64Segment::Range(0..5),
938            U64Segment::RangeWithHoles {
939                range: 50..60,
940                holes: vec![53, 54].into(),
941            },
942            U64Segment::SortedArray(vec![7, 9].into()),
943            U64Segment::RangeWithBitmap {
944                range: 10..15,
945                bitmap: [true, false, true, false, true].as_slice().into(),
946            },
947            U64Segment::Array(vec![35, 39].into()),
948        ]);
949
950        // Masking one in each segment
951        let values_to_remove = [4, 55, 7, 12, 39];
952        let positions_to_remove = sequence
953            .iter()
954            .enumerate()
955            .filter_map(|(i, val)| {
956                if values_to_remove.contains(&val) {
957                    Some(i as u32)
958                } else {
959                    None
960                }
961            })
962            .collect::<Vec<_>>();
963        let mut sequence = sequence;
964        sequence.mask(positions_to_remove).unwrap();
965        let expected = RowIdSequence(vec![
966            U64Segment::Range(0..4),
967            U64Segment::RangeWithBitmap {
968                range: 50..60,
969                bitmap: [
970                    true, true, true, false, false, false, true, true, true, true,
971                ]
972                .as_slice()
973                .into(),
974            },
975            U64Segment::Range(9..10),
976            U64Segment::RangeWithBitmap {
977                range: 10..15,
978                bitmap: [true, false, false, false, true].as_slice().into(),
979            },
980            U64Segment::Array(vec![35].into()),
981        ]);
982        assert_eq!(sequence, expected);
983    }
984
985    #[test]
986    fn test_row_id_mask_everything() {
987        let mut sequence = RowIdSequence(vec![
988            U64Segment::Range(0..5),
989            U64Segment::SortedArray(vec![7, 9].into()),
990        ]);
991        sequence.mask(0..sequence.len() as u32).unwrap();
992        let expected = RowIdSequence(vec![]);
993        assert_eq!(sequence, expected);
994    }
995
996    #[test]
997    fn test_mask_to_offset_ranges() {
998        // Tests with a simple range segment
999        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1000        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1001        let ranges = sequence.mask_to_offset_ranges(&mask);
1002        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1003
1004        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1005        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1006        let ranges = sequence.mask_to_offset_ranges(&mask);
1007        assert_eq!(ranges, vec![14..15]);
1008
1009        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1010        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1011        let ranges = sequence.mask_to_offset_ranges(&mask);
1012        assert_eq!(ranges, vec![0..14, 15..20]);
1013
1014        // Test with a range segment with holes
1015        // 0, 1, 3, 4, 5, 7, 8, 9
1016        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1017            range: 0..10,
1018            holes: vec![2, 6].into(),
1019        }]);
1020        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1021        let ranges = sequence.mask_to_offset_ranges(&mask);
1022        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1023
1024        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1025            range: 40..60,
1026            holes: vec![47, 43].into(),
1027        }]);
1028        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1029        let ranges = sequence.mask_to_offset_ranges(&mask);
1030        assert_eq!(ranges, vec![3..4]);
1031
1032        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1033            range: 40..60,
1034            holes: vec![47, 43].into(),
1035        }]);
1036        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1037        let ranges = sequence.mask_to_offset_ranges(&mask);
1038        assert_eq!(ranges, vec![0..3, 4..18]);
1039
1040        // Test with a range segment with bitmap
1041        // 0, 1, 4, 5, 6, 7
1042        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1043            range: 0..10,
1044            bitmap: [
1045                true, true, false, false, true, true, true, true, false, false,
1046            ]
1047            .as_slice()
1048            .into(),
1049        }]);
1050        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1051        let ranges = sequence.mask_to_offset_ranges(&mask);
1052        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1053
1054        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1055            range: 40..45,
1056            bitmap: [true, true, false, false, true].as_slice().into(),
1057        }]);
1058        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1059        let ranges = sequence.mask_to_offset_ranges(&mask);
1060        assert_eq!(ranges, vec![2..3]);
1061
1062        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1063            range: 40..45,
1064            bitmap: [true, true, false, false, true].as_slice().into(),
1065        }]);
1066        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1067        let ranges = sequence.mask_to_offset_ranges(&mask);
1068        assert_eq!(ranges, vec![0..2]);
1069
1070        // Test with a sorted array segment
1071        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1072        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1073        let ranges = sequence.mask_to_offset_ranges(&mask);
1074        assert_eq!(ranges, vec![0..1, 3..5]);
1075
1076        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1077        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1078        let ranges = sequence.mask_to_offset_ranges(&mask);
1079        assert_eq!(ranges, vec![0..1, 2..4]);
1080
1081        // Test with multiple segments
1082        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1083        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1084        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1085        let sequence = RowIdSequence(vec![
1086            U64Segment::Range(0..5),
1087            U64Segment::RangeWithHoles {
1088                range: 100..105,
1089                holes: vec![103].into(),
1090            },
1091            U64Segment::SortedArray(vec![44, 46, 78].into()),
1092        ]);
1093        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1094        let ranges = sequence.mask_to_offset_ranges(&mask);
1095        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1096
1097        // Test with empty mask (should select everything)
1098        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1099        let mask = RowIdMask::default();
1100        let ranges = sequence.mask_to_offset_ranges(&mask);
1101        assert_eq!(ranges, vec![0..10]);
1102
1103        // Test with allow nothing mask
1104        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1105        let mask = RowIdMask::allow_nothing();
1106        let ranges = sequence.mask_to_offset_ranges(&mask);
1107        assert_eq!(ranges, vec![]);
1108    }
1109}