lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23pub mod version;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::FragmentRowIdIndex;
28pub use index::RowIdIndex;
29use lance_core::{
30    utils::mask::{RowIdMask, RowIdTreeMap},
31    Error, Result,
32};
33use lance_io::ReadBatchParams;
34pub use serde::{read_row_ids, write_row_ids};
35
36use snafu::location;
37
38use crate::utils::LanceIteratorExtension;
39use segment::U64Segment;
40use tracing::instrument;
41
42/// A sequence of row ids.
43///
44/// Row ids are u64s that:
45///
46/// 1. Are **unique** within a table (except for tombstones)
47/// 2. Are *often* but not always sorted and/or contiguous.
48///
49/// This sequence of row ids is optimized to be compact when the row ids are
50/// contiguous and sorted. However, it does not require that the row ids are
51/// contiguous or sorted.
52///
53/// We can make optimizations that assume uniqueness.
54#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
55pub struct RowIdSequence(Vec<U64Segment>);
56
57impl std::fmt::Display for RowIdSequence {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        let mut iter = self.iter();
60        let mut first_10 = Vec::new();
61        let mut last_10 = Vec::new();
62        for row_id in iter.by_ref() {
63            first_10.push(row_id);
64            if first_10.len() > 10 {
65                break;
66            }
67        }
68
69        while let Some(row_id) = iter.next_back() {
70            last_10.push(row_id);
71            if last_10.len() > 10 {
72                break;
73            }
74        }
75        last_10.reverse();
76
77        let theres_more = iter.next().is_some();
78
79        write!(f, "[")?;
80        for row_id in first_10 {
81            write!(f, "{}", row_id)?;
82        }
83        if theres_more {
84            write!(f, ", ...")?;
85        }
86        for row_id in last_10 {
87            write!(f, ", {}", row_id)?;
88        }
89        write!(f, "]")
90    }
91}
92
93impl From<Range<u64>> for RowIdSequence {
94    fn from(range: Range<u64>) -> Self {
95        Self(vec![U64Segment::Range(range)])
96    }
97}
98
99impl From<&[u64]> for RowIdSequence {
100    fn from(row_ids: &[u64]) -> Self {
101        Self(vec![U64Segment::from_slice(row_ids)])
102    }
103}
104
105impl RowIdSequence {
106    pub fn new() -> Self {
107        Self::default()
108    }
109
110    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
111        self.0.iter().flat_map(|segment| segment.iter())
112    }
113
114    pub fn len(&self) -> u64 {
115        self.0.iter().map(|segment| segment.len() as u64).sum()
116    }
117
118    pub fn is_empty(&self) -> bool {
119        self.0.is_empty()
120    }
121
122    /// Combines this row id sequence with another row id sequence.
123    pub fn extend(&mut self, other: Self) {
124        // If the last element of this sequence and the first element of next
125        // sequence are ranges, we might be able to combine them into a single
126        // range.
127        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
128            (self.0.last(), other.0.first())
129        {
130            if range1.end == range2.start {
131                let new_range = U64Segment::Range(range1.start..range2.end);
132                self.0.pop();
133                self.0.push(new_range);
134                self.0.extend(other.0.into_iter().skip(1));
135                return;
136            }
137        }
138        // TODO: add other optimizations, such as combining two RangeWithHoles.
139        self.0.extend(other.0);
140    }
141
142    /// Remove a set of row ids from the sequence.
143    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
144        // Order the row ids by position in which they appear in the sequence.
145        let (row_ids, offsets) = self.find_ids(row_ids);
146
147        let capacity = self.0.capacity();
148        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
149        let mut remaining_segments = old_segments.as_slice();
150
151        for (segment_idx, range) in offsets {
152            let segments_handled = old_segments.len() - remaining_segments.len();
153            let segments_to_add = segment_idx - segments_handled;
154            self.0
155                .extend_from_slice(&remaining_segments[..segments_to_add]);
156            remaining_segments = &remaining_segments[segments_to_add..];
157
158            let segment;
159            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
160
161            let segment_ids = &row_ids[range];
162            self.0.push(segment.delete(segment_ids));
163        }
164
165        // Add the remaining segments.
166        self.0.extend_from_slice(remaining_segments);
167    }
168
169    /// Delete row ids by position.
170    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
171        let mut local_positions = Vec::new();
172        let mut positions_iter = positions.into_iter();
173        let mut curr_position = positions_iter.next();
174        let mut offset = 0;
175        let mut cutoff = 0;
176
177        for segment in &mut self.0 {
178            // Make vector of local positions
179            cutoff += segment.len() as u32;
180            while let Some(position) = curr_position {
181                if position >= cutoff {
182                    break;
183                }
184                local_positions.push(position - offset);
185                curr_position = positions_iter.next();
186            }
187
188            if !local_positions.is_empty() {
189                segment.mask(&local_positions);
190                local_positions.clear();
191            }
192            offset = cutoff;
193        }
194
195        self.0.retain(|segment| !segment.is_empty());
196
197        Ok(())
198    }
199
200    /// Find the row ids in the sequence.
201    ///
202    /// Returns the row ids sorted by their appearance in the sequence.
203    /// Also returns the segment index and the range where that segment's
204    /// row id matches are found in the returned row id vector.
205    fn find_ids(
206        &self,
207        row_ids: impl IntoIterator<Item = u64>,
208    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
209        // Often, the row ids will already be provided in the order they appear.
210        // So the optimal way to search will be to cycle through rather than
211        // restarting the search from the beginning each time.
212        let mut segment_iter = self.0.iter().enumerate().cycle();
213
214        let mut segment_matches = vec![Vec::new(); self.0.len()];
215
216        row_ids.into_iter().for_each(|row_id| {
217            let mut i = 0;
218            // If we've cycled through all segments, we know the row id is not in the sequence.
219            while i < self.0.len() {
220                let (segment_idx, segment) = segment_iter.next().unwrap();
221                if segment.range().is_some_and(|range| range.contains(&row_id)) {
222                    if let Some(offset) = segment.position(row_id) {
223                        segment_matches.get_mut(segment_idx).unwrap().push(offset);
224                    }
225                    // The row id was not found it the segment. It might be in a later segment.
226                }
227                i += 1;
228            }
229        });
230        for matches in &mut segment_matches {
231            matches.sort_unstable();
232        }
233
234        let mut offset = 0;
235        let segment_ranges = segment_matches
236            .iter()
237            .enumerate()
238            .filter(|(_, matches)| !matches.is_empty())
239            .map(|(segment_idx, matches)| {
240                let range = offset..offset + matches.len();
241                offset += matches.len();
242                (segment_idx, range)
243            })
244            .collect();
245        let row_ids = segment_matches
246            .into_iter()
247            .enumerate()
248            .flat_map(|(segment_idx, offset)| {
249                offset
250                    .into_iter()
251                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
252            })
253            .collect();
254
255        (row_ids, segment_ranges)
256    }
257
258    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
259        if len == 0 {
260            return RowIdSeqSlice {
261                segments: &[],
262                offset_start: 0,
263                offset_last: 0,
264            };
265        }
266
267        // Find the starting position
268        let mut offset_start = offset;
269        let mut segment_offset = 0;
270        for segment in &self.0 {
271            let segment_len = segment.len();
272            if offset_start < segment_len {
273                break;
274            }
275            offset_start -= segment_len;
276            segment_offset += 1;
277        }
278
279        // Find the ending position
280        let mut offset_last = offset_start + len;
281        let mut segment_offset_last = segment_offset;
282        for segment in &self.0[segment_offset..] {
283            let segment_len = segment.len();
284            if offset_last <= segment_len {
285                break;
286            }
287            offset_last -= segment_len;
288            segment_offset_last += 1;
289        }
290
291        RowIdSeqSlice {
292            segments: &self.0[segment_offset..=segment_offset_last],
293            offset_start,
294            offset_last,
295        }
296    }
297
298    /// Get the row id at the given index.
299    ///
300    /// If the index is out of bounds, this will return None.
301    pub fn get(&self, index: usize) -> Option<u64> {
302        let mut offset = 0;
303        for segment in &self.0 {
304            let segment_len = segment.len();
305            if index < offset + segment_len {
306                return segment.get(index - offset);
307            }
308            offset += segment_len;
309        }
310        None
311    }
312
313    /// Get row ids from the sequence based on the provided _sorted_ offsets
314    ///
315    /// Any out of bounds offsets will be ignored
316    ///
317    /// # Panics
318    ///
319    /// If the input selection is not sorted, this function will panic
320    pub fn select<'a>(
321        &'a self,
322        selection: impl Iterator<Item = usize> + 'a,
323    ) -> impl Iterator<Item = u64> + 'a {
324        let mut seg_iter = self.0.iter();
325        let mut cur_seg = seg_iter.next();
326        let mut rows_passed = 0;
327        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
328        let mut last_index = 0;
329        selection.filter_map(move |index| {
330            if index < last_index {
331                panic!("Selection is not sorted");
332            }
333            last_index = index;
334
335            cur_seg?;
336
337            while (index - rows_passed) >= cur_seg_len {
338                rows_passed += cur_seg_len;
339                cur_seg = seg_iter.next();
340                if let Some(cur_seg) = cur_seg {
341                    cur_seg_len = cur_seg.len();
342                } else {
343                    return None;
344                }
345            }
346
347            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
348        })
349    }
350
351    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
352    /// in the sequence.
353    ///
354    /// For example, given a mask that selects all even ids and a sequence that is
355    /// [80..85, 86..90, 14]
356    ///
357    /// this will return [0, 2, 4, 5, 7, 9]
358    /// because the range expands to
359    ///
360    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
361    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
362    ///
363    /// This function is useful when determining which row offsets to read from a fragment given
364    /// a mask.
365    #[instrument(level = "debug", skip_all)]
366    pub fn mask_to_offset_ranges(&self, mask: &RowIdMask) -> Vec<Range<u64>> {
367        let mut offset = 0;
368        let mut ranges = Vec::new();
369        for segment in &self.0 {
370            match segment {
371                U64Segment::Range(range) => {
372                    let mut ids = RowIdTreeMap::from(range.clone());
373                    ids.mask(mask);
374                    ranges.extend(GroupingIterator::new(
375                        unsafe { ids.into_addr_iter() }.map(|addr| addr - range.start + offset),
376                    ));
377                    offset += range.end - range.start;
378                }
379                U64Segment::RangeWithHoles { range, holes } => {
380                    let offset_start = offset;
381                    let mut ids = RowIdTreeMap::from(range.clone());
382                    offset += range.end - range.start;
383                    for hole in holes.iter() {
384                        if ids.remove(hole) {
385                            offset -= 1;
386                        }
387                    }
388                    ids.mask(mask);
389
390                    // Sadly we can't just subtract the offset because of the holes
391                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
392                    sorted_holes.sort_unstable();
393                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
394                    let mut holes_passed = 0;
395                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
396                        |addr| {
397                            while let Some(next_hole) = next_holes_iter.peek() {
398                                if *next_hole < addr {
399                                    next_holes_iter.next();
400                                    holes_passed += 1;
401                                } else {
402                                    break;
403                                }
404                            }
405                            addr - range.start + offset_start - holes_passed
406                        },
407                    )));
408                }
409                U64Segment::RangeWithBitmap { range, bitmap } => {
410                    let mut ids = RowIdTreeMap::from(range.clone());
411                    let offset_start = offset;
412                    offset += range.end - range.start;
413                    for (i, val) in range.clone().enumerate() {
414                        if !bitmap.get(i) && ids.remove(val) {
415                            offset -= 1;
416                        }
417                    }
418                    ids.mask(mask);
419                    let mut bitmap_iter = bitmap.iter();
420                    let mut bitmap_iter_pos = 0;
421                    let mut holes_passed = 0;
422                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
423                        |addr| {
424                            let offset_no_holes = addr - range.start + offset_start;
425                            while bitmap_iter_pos < offset_no_holes {
426                                if !bitmap_iter.next().unwrap() {
427                                    holes_passed += 1;
428                                }
429                                bitmap_iter_pos += 1;
430                            }
431                            offset_no_holes - holes_passed
432                        },
433                    )));
434                }
435                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
436                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
437                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
438                        |(off, id)| {
439                            if mask.selected(id) {
440                                Some(off as u64 + offset)
441                            } else {
442                                None
443                            }
444                        },
445                    )));
446                    offset += array.len() as u64;
447                }
448            }
449        }
450        ranges
451    }
452}
453
454/// An iterator that groups row ids into ranges
455///
456/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
457/// this will return an iterator of [(1..4), (5..8), (10..13)]
458struct GroupingIterator<I: Iterator<Item = u64>> {
459    iter: I,
460    cur_range: Option<Range<u64>>,
461}
462
463impl<I: Iterator<Item = u64>> GroupingIterator<I> {
464    fn new(iter: I) -> Self {
465        Self {
466            iter,
467            cur_range: None,
468        }
469    }
470}
471
472impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
473    type Item = Range<u64>;
474
475    fn next(&mut self) -> Option<Self::Item> {
476        for id in self.iter.by_ref() {
477            if let Some(range) = self.cur_range.as_mut() {
478                if range.end == id {
479                    range.end = id + 1;
480                } else {
481                    let ret = Some(range.clone());
482                    self.cur_range = Some(id..id + 1);
483                    return ret;
484                }
485            } else {
486                self.cur_range = Some(id..id + 1);
487            }
488        }
489        self.cur_range.take()
490    }
491}
492
493impl From<&RowIdSequence> for RowIdTreeMap {
494    fn from(row_ids: &RowIdSequence) -> Self {
495        let mut tree_map = Self::new();
496        for segment in &row_ids.0 {
497            match segment {
498                U64Segment::Range(range) => {
499                    tree_map.insert_range(range.clone());
500                }
501                U64Segment::RangeWithBitmap { range, bitmap } => {
502                    tree_map.insert_range(range.clone());
503                    for (i, val) in range.clone().enumerate() {
504                        if !bitmap.get(i) {
505                            tree_map.remove(val);
506                        }
507                    }
508                }
509                U64Segment::RangeWithHoles { range, holes } => {
510                    tree_map.insert_range(range.clone());
511                    for hole in holes.iter() {
512                        tree_map.remove(hole);
513                    }
514                }
515                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
516                    for val in array.iter() {
517                        tree_map.insert(val);
518                    }
519                }
520            }
521        }
522        tree_map
523    }
524}
525
526#[derive(Debug)]
527pub struct RowIdSeqSlice<'a> {
528    /// Current slice of the segments we cover
529    segments: &'a [U64Segment],
530    /// Offset into the first segment to start iterating from
531    offset_start: usize,
532    /// Offset into the last segment to stop iterating at
533    offset_last: usize,
534}
535
536impl RowIdSeqSlice<'_> {
537    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
538        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
539        known_size -= self.offset_start;
540        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
541
542        let end = self.segments.len().saturating_sub(1);
543        self.segments
544            .iter()
545            .enumerate()
546            .flat_map(move |(i, segment)| {
547                match i {
548                    0 if self.segments.len() == 1 => {
549                        let len = self.offset_last - self.offset_start;
550                        // TODO: Optimize this so we don't have to use skip
551                        // (take is probably fine though.)
552                        Box::new(segment.iter().skip(self.offset_start).take(len))
553                            as Box<dyn Iterator<Item = u64>>
554                    }
555                    0 => Box::new(segment.iter().skip(self.offset_start)),
556                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
557                    _ => Box::new(segment.iter()),
558                }
559            })
560            .exact_size(known_size)
561    }
562}
563
564/// Re-chunk a sequences of row ids into chunks of a given size.
565///
566/// The sequences may less than chunk sizes, because the sequences only
567/// contains the row ids that we want to keep, they come from the updates records.
568/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
569/// So if the sequences are smaller than the chunk sizes, we need to
570/// assign the incremental row ids in the further step. This behavior is controlled by the
571/// `allow_incomplete` parameter.
572///
573/// # Errors
574///
575/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
576/// is not equal to the total number of row ids in the sequences.
577pub fn rechunk_sequences(
578    sequences: impl IntoIterator<Item = RowIdSequence>,
579    chunk_sizes: impl IntoIterator<Item = u64>,
580    allow_incomplete: bool,
581) -> Result<Vec<RowIdSequence>> {
582    // TODO: return an iterator. (with a good size hint?)
583    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
584    let total_chunks = chunk_sizes_vec.len();
585    let mut chunked_sequences = Vec::with_capacity(total_chunks);
586    let mut segment_iter = sequences
587        .into_iter()
588        .flat_map(|sequence| sequence.0.into_iter())
589        .peekable();
590
591    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
592        Error::invalid_input(
593            format!(
594                "Got too few segments for chunk {}. Expected chunk size: {}, remaining needed: {}",
595                chunk_index, expected_chunk_size, remaining
596            ),
597            location!(),
598        )
599    };
600
601    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
602        Error::invalid_input(
603            format!(
604                "Got too many segments for the provided chunk lengths. Processed {} chunks out of {} expected",
605                processed_chunks, total_chunk_sizes
606            ),
607            location!(),
608        )
609    };
610
611    let mut segment_offset = 0_u64;
612
613    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
614        let chunk_size = *chunk_size;
615        let mut sequence = RowIdSequence(Vec::new());
616        let mut remaining = chunk_size;
617
618        while remaining > 0 {
619            let remaining_in_segment = segment_iter
620                .peek()
621                .map_or(0, |segment| segment.len() as u64 - segment_offset);
622
623            // Step 1: Handle segment remaining to be empty(also empty seg) - skip and continue
624            if remaining_in_segment == 0 {
625                if segment_iter.next().is_some() {
626                    segment_offset = 0;
627                    continue;
628                } else {
629                    // No more segments available
630                    if allow_incomplete {
631                        break;
632                    } else {
633                        return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
634                    }
635                }
636            }
637
638            // Step 2: Handle still remaining segment based on size comparison
639            match remaining_in_segment.cmp(&remaining) {
640                std::cmp::Ordering::Greater => {
641                    // Segment is larger than remaining space - slice it
642                    let segment = segment_iter
643                        .peek()
644                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
645                        .slice(segment_offset as usize, remaining as usize);
646                    sequence.extend(RowIdSequence(vec![segment]));
647                    segment_offset += remaining;
648                    remaining = 0;
649                }
650                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
651                    // UNIFIED HANDLING: Both equal and less cases subtract from remaining
652                    // Equal case: remaining -= remaining_in_segment (remaining becomes 0)
653                    // Less case: remaining -= remaining_in_segment (remaining becomes positive)
654                    let segment = segment_iter
655                        .next()
656                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
657                        .slice(segment_offset as usize, remaining_in_segment as usize);
658                    sequence.extend(RowIdSequence(vec![segment]));
659                    segment_offset = 0;
660                    remaining -= remaining_in_segment;
661                }
662            }
663        }
664
665        chunked_sequences.push(sequence);
666    }
667
668    if segment_iter.peek().is_some() {
669        return Err(too_many_segments_error(
670            chunked_sequences.len(),
671            total_chunks,
672        ));
673    }
674
675    Ok(chunked_sequences)
676}
677
678/// Selects the row ids from a sequence based on the provided offsets.
679pub fn select_row_ids<'a>(
680    sequence: &'a RowIdSequence,
681    offsets: &'a ReadBatchParams,
682) -> Result<Vec<u64>> {
683    let out_of_bounds_err = |offset: u32| {
684        Error::invalid_input(
685            format!(
686                "Index out of bounds: {} for sequence of length {}",
687                offset,
688                sequence.len()
689            ),
690            location!(),
691        )
692    };
693
694    match offsets {
695        // TODO: Optimize this if indices are sorted, which is a common case.
696        ReadBatchParams::Indices(indices) => indices
697            .values()
698            .iter()
699            .map(|index| {
700                sequence
701                    .get(*index as usize)
702                    .ok_or_else(|| out_of_bounds_err(*index))
703            })
704            .collect(),
705        ReadBatchParams::Range(range) => {
706            if range.end > sequence.len() as usize {
707                return Err(out_of_bounds_err(range.end as u32));
708            }
709            let sequence = sequence.slice(range.start, range.end - range.start);
710            Ok(sequence.iter().collect())
711        }
712        ReadBatchParams::Ranges(ranges) => {
713            let num_rows = ranges
714                .iter()
715                .map(|r| (r.end - r.start) as usize)
716                .sum::<usize>();
717            let mut result = Vec::with_capacity(num_rows);
718            for range in ranges.as_ref() {
719                if range.end > sequence.len() {
720                    return Err(out_of_bounds_err(range.end as u32));
721                }
722                let sequence =
723                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
724                result.extend(sequence.iter());
725            }
726            Ok(result)
727        }
728
729        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
730        ReadBatchParams::RangeTo(to) => {
731            if to.end > sequence.len() as usize {
732                return Err(out_of_bounds_err(to.end as u32));
733            }
734            let len = to.end;
735            let sequence = sequence.slice(0, len);
736            Ok(sequence.iter().collect())
737        }
738        ReadBatchParams::RangeFrom(from) => {
739            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
740            Ok(sequence.iter().collect())
741        }
742    }
743}
744
745#[cfg(test)]
746mod test {
747    use super::*;
748
749    use pretty_assertions::assert_eq;
750    use test::bitmap::Bitmap;
751
752    #[test]
753    fn test_row_id_sequence_from_range() {
754        let sequence = RowIdSequence::from(0..10);
755        assert_eq!(sequence.len(), 10);
756        assert_eq!(sequence.is_empty(), false);
757
758        let iter = sequence.iter();
759        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
760    }
761
762    #[test]
763    fn test_row_id_sequence_extend() {
764        let mut sequence = RowIdSequence::from(0..10);
765        sequence.extend(RowIdSequence::from(10..20));
766        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
767
768        let mut sequence = RowIdSequence::from(0..10);
769        sequence.extend(RowIdSequence::from(20..30));
770        assert_eq!(
771            sequence.0,
772            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
773        );
774    }
775
776    #[test]
777    fn test_row_id_sequence_delete() {
778        let mut sequence = RowIdSequence::from(0..10);
779        sequence.delete(vec![1, 3, 5, 7, 9]);
780        let mut expected_bitmap = Bitmap::new_empty(9);
781        for i in [0, 2, 4, 6, 8] {
782            expected_bitmap.set(i as usize);
783        }
784        assert_eq!(
785            sequence.0,
786            vec![U64Segment::RangeWithBitmap {
787                range: 0..9,
788                bitmap: expected_bitmap
789            },]
790        );
791
792        let mut sequence = RowIdSequence::from(0..10);
793        sequence.extend(RowIdSequence::from(12..20));
794        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
795        assert_eq!(
796            sequence.0,
797            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
798        );
799
800        let mut sequence = RowIdSequence::from(0..10);
801        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
802        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
803    }
804
805    #[test]
806    fn test_row_id_slice() {
807        // The type of sequence isn't that relevant to the implementation, so
808        // we can just have a single one with all the segment types.
809        let sequence = RowIdSequence(vec![
810            U64Segment::Range(30..35), // 5
811            U64Segment::RangeWithHoles {
812                // 8
813                range: 50..60,
814                holes: vec![53, 54].into(),
815            },
816            U64Segment::SortedArray(vec![7, 9].into()), // 2
817            U64Segment::RangeWithBitmap {
818                range: 0..5,
819                bitmap: [true, false, true, false, true].as_slice().into(),
820            },
821            U64Segment::Array(vec![35, 39].into()),
822            U64Segment::Range(40..50),
823        ]);
824
825        // All possible offsets and lengths
826        for offset in 0..sequence.len() as usize {
827            for len in 0..sequence.len() as usize {
828                if offset + len > sequence.len() as usize {
829                    continue;
830                }
831                let slice = sequence.slice(offset, len);
832
833                let actual = slice.iter().collect::<Vec<_>>();
834                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
835                assert_eq!(
836                    actual, expected,
837                    "Failed for offset {} and len {}",
838                    offset, len
839                );
840
841                let (claimed_size, claimed_max) = slice.iter().size_hint();
842                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
843                assert_eq!(claimed_size, actual.len()); // Correct size hint
844            }
845        }
846    }
847
848    #[test]
849    fn test_row_id_slice_empty() {
850        let sequence = RowIdSequence::from(0..10);
851        let slice = sequence.slice(10, 0);
852        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
853    }
854
855    #[test]
856    fn test_row_id_sequence_rechunk() {
857        fn assert_rechunked(
858            input: Vec<RowIdSequence>,
859            chunk_sizes: Vec<u64>,
860            expected: Vec<RowIdSequence>,
861        ) {
862            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
863            assert_eq!(chunked, expected);
864        }
865
866        // Small pieces to larger ones
867        let many_segments = vec![
868            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
869            RowIdSequence::from(10..18),
870            RowIdSequence::from(18..28),
871            RowIdSequence::from(28..30),
872        ];
873        let fewer_segments = vec![
874            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
875            RowIdSequence::from(10..30),
876        ];
877        assert_rechunked(
878            many_segments.clone(),
879            fewer_segments.iter().map(|seq| seq.len()).collect(),
880            fewer_segments.clone(),
881        );
882
883        // Large pieces to smaller ones
884        assert_rechunked(
885            fewer_segments,
886            many_segments.iter().map(|seq| seq.len()).collect(),
887            many_segments.clone(),
888        );
889
890        // Equal pieces
891        assert_rechunked(
892            many_segments.clone(),
893            many_segments.iter().map(|seq| seq.len()).collect(),
894            many_segments.clone(),
895        );
896
897        // Too few segments -> error
898        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
899        assert!(result.is_err());
900
901        // Too many segments -> error
902        let result = rechunk_sequences(many_segments, vec![5], false);
903        assert!(result.is_err());
904    }
905
906    #[test]
907    fn test_select_row_ids() {
908        // All forms of offsets
909        let offsets = [
910            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
911            ReadBatchParams::Range(2..8),
912            ReadBatchParams::RangeFull,
913            ReadBatchParams::RangeTo(..5),
914            ReadBatchParams::RangeFrom(5..),
915            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
916        ];
917
918        // Sequences with all segment types. These have at least 10 elements,
919        // so they are valid for all the above offsets.
920        let sequences = [
921            RowIdSequence(vec![
922                U64Segment::Range(0..5),
923                U64Segment::RangeWithHoles {
924                    range: 50..60,
925                    holes: vec![53, 54].into(),
926                },
927                U64Segment::SortedArray(vec![7, 9].into()),
928            ]),
929            RowIdSequence(vec![
930                U64Segment::RangeWithBitmap {
931                    range: 0..5,
932                    bitmap: [true, false, true, false, true].as_slice().into(),
933                },
934                U64Segment::Array(vec![30, 20, 10].into()),
935                U64Segment::Range(40..50),
936            ]),
937        ];
938
939        for params in offsets {
940            for sequence in &sequences {
941                let row_ids = select_row_ids(sequence, &params).unwrap();
942                let flat_sequence = sequence.iter().collect::<Vec<_>>();
943
944                // Transform params into bounded ones
945                let selection: Vec<usize> = match &params {
946                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
947                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
948                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
949                    ReadBatchParams::Range(range) => range.clone().collect(),
950                    ReadBatchParams::Ranges(ranges) => ranges
951                        .iter()
952                        .flat_map(|r| r.start as usize..r.end as usize)
953                        .collect(),
954                    ReadBatchParams::Indices(indices) => {
955                        indices.values().iter().map(|i| *i as usize).collect()
956                    }
957                };
958
959                let expected = selection
960                    .into_iter()
961                    .map(|i| flat_sequence[i])
962                    .collect::<Vec<_>>();
963                assert_eq!(
964                    row_ids, expected,
965                    "Failed for params {:?} on the sequence {:?}",
966                    &params, sequence
967                );
968            }
969        }
970    }
971
972    #[test]
973    fn test_select_row_ids_out_of_bounds() {
974        let offsets = [
975            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
976            ReadBatchParams::Range(2..1000),
977            ReadBatchParams::RangeTo(..1000),
978        ];
979
980        let sequence = RowIdSequence::from(0..10);
981
982        for params in offsets {
983            let result = select_row_ids(&sequence, &params);
984            assert!(result.is_err());
985            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
986        }
987    }
988
989    #[test]
990    fn test_row_id_sequence_to_treemap() {
991        let sequence = RowIdSequence(vec![
992            U64Segment::Range(0..5),
993            U64Segment::RangeWithHoles {
994                range: 50..60,
995                holes: vec![53, 54].into(),
996            },
997            U64Segment::SortedArray(vec![7, 9].into()),
998            U64Segment::RangeWithBitmap {
999                range: 10..15,
1000                bitmap: [true, false, true, false, true].as_slice().into(),
1001            },
1002            U64Segment::Array(vec![35, 39].into()),
1003            U64Segment::Range(40..50),
1004        ]);
1005
1006        let tree_map = RowIdTreeMap::from(&sequence);
1007        let expected = vec![
1008            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
1009            51, 52, 55, 56, 57, 58, 59,
1010        ]
1011        .into_iter()
1012        .collect::<RowIdTreeMap>();
1013        assert_eq!(tree_map, expected);
1014    }
1015
1016    #[test]
1017    fn test_row_id_mask() {
1018        // 0, 1, 2, 3, 4
1019        // 50, 51, 52, 55, 56, 57, 58, 59
1020        // 7, 9
1021        // 10, 12, 14
1022        // 35, 39
1023        let sequence = RowIdSequence(vec![
1024            U64Segment::Range(0..5),
1025            U64Segment::RangeWithHoles {
1026                range: 50..60,
1027                holes: vec![53, 54].into(),
1028            },
1029            U64Segment::SortedArray(vec![7, 9].into()),
1030            U64Segment::RangeWithBitmap {
1031                range: 10..15,
1032                bitmap: [true, false, true, false, true].as_slice().into(),
1033            },
1034            U64Segment::Array(vec![35, 39].into()),
1035        ]);
1036
1037        // Masking one in each segment
1038        let values_to_remove = [4, 55, 7, 12, 39];
1039        let positions_to_remove = sequence
1040            .iter()
1041            .enumerate()
1042            .filter_map(|(i, val)| {
1043                if values_to_remove.contains(&val) {
1044                    Some(i as u32)
1045                } else {
1046                    None
1047                }
1048            })
1049            .collect::<Vec<_>>();
1050        let mut sequence = sequence;
1051        sequence.mask(positions_to_remove).unwrap();
1052        let expected = RowIdSequence(vec![
1053            U64Segment::Range(0..4),
1054            U64Segment::RangeWithBitmap {
1055                range: 50..60,
1056                bitmap: [
1057                    true, true, true, false, false, false, true, true, true, true,
1058                ]
1059                .as_slice()
1060                .into(),
1061            },
1062            U64Segment::Range(9..10),
1063            U64Segment::RangeWithBitmap {
1064                range: 10..15,
1065                bitmap: [true, false, false, false, true].as_slice().into(),
1066            },
1067            U64Segment::Array(vec![35].into()),
1068        ]);
1069        assert_eq!(sequence, expected);
1070    }
1071
1072    #[test]
1073    fn test_row_id_mask_everything() {
1074        let mut sequence = RowIdSequence(vec![
1075            U64Segment::Range(0..5),
1076            U64Segment::SortedArray(vec![7, 9].into()),
1077        ]);
1078        sequence.mask(0..sequence.len() as u32).unwrap();
1079        let expected = RowIdSequence(vec![]);
1080        assert_eq!(sequence, expected);
1081    }
1082
1083    #[test]
1084    fn test_selection() {
1085        let sequence = RowIdSequence(vec![
1086            U64Segment::Range(0..5),
1087            U64Segment::Range(10..15),
1088            U64Segment::Range(20..25),
1089        ]);
1090        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1091        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1092    }
1093
1094    #[test]
1095    #[should_panic]
1096    fn test_selection_unsorted() {
1097        let sequence = RowIdSequence(vec![
1098            U64Segment::Range(0..5),
1099            U64Segment::Range(10..15),
1100            U64Segment::Range(20..25),
1101        ]);
1102        let _ = sequence
1103            .select(vec![2, 4, 3].into_iter())
1104            .collect::<Vec<_>>();
1105    }
1106
1107    #[test]
1108    fn test_mask_to_offset_ranges() {
1109        // Tests with a simple range segment
1110        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1111        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1112        let ranges = sequence.mask_to_offset_ranges(&mask);
1113        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1114
1115        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1116        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[54]));
1117        let ranges = sequence.mask_to_offset_ranges(&mask);
1118        assert_eq!(ranges, vec![14..15]);
1119
1120        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1121        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[54]));
1122        let ranges = sequence.mask_to_offset_ranges(&mask);
1123        assert_eq!(ranges, vec![0..14, 15..20]);
1124
1125        // Test with a range segment with holes
1126        // 0, 1, 3, 4, 5, 7, 8, 9
1127        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1128            range: 0..10,
1129            holes: vec![2, 6].into(),
1130        }]);
1131        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1132        let ranges = sequence.mask_to_offset_ranges(&mask);
1133        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1134
1135        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1136            range: 40..60,
1137            holes: vec![47, 43].into(),
1138        }]);
1139        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1140        let ranges = sequence.mask_to_offset_ranges(&mask);
1141        assert_eq!(ranges, vec![3..4]);
1142
1143        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1144            range: 40..60,
1145            holes: vec![47, 43].into(),
1146        }]);
1147        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1148        let ranges = sequence.mask_to_offset_ranges(&mask);
1149        assert_eq!(ranges, vec![0..3, 4..18]);
1150
1151        // Test with a range segment with bitmap
1152        // 0, 1, 4, 5, 6, 7
1153        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1154            range: 0..10,
1155            bitmap: [
1156                true, true, false, false, true, true, true, true, false, false,
1157            ]
1158            .as_slice()
1159            .into(),
1160        }]);
1161        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1162        let ranges = sequence.mask_to_offset_ranges(&mask);
1163        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1164
1165        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1166            range: 40..45,
1167            bitmap: [true, true, false, false, true].as_slice().into(),
1168        }]);
1169        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[44]));
1170        let ranges = sequence.mask_to_offset_ranges(&mask);
1171        assert_eq!(ranges, vec![2..3]);
1172
1173        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1174            range: 40..45,
1175            bitmap: [true, true, false, false, true].as_slice().into(),
1176        }]);
1177        let mask = RowIdMask::from_block(RowIdTreeMap::from_iter(&[44]));
1178        let ranges = sequence.mask_to_offset_ranges(&mask);
1179        assert_eq!(ranges, vec![0..2]);
1180
1181        // Test with a sorted array segment
1182        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1183        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1184        let ranges = sequence.mask_to_offset_ranges(&mask);
1185        assert_eq!(ranges, vec![0..1, 3..5]);
1186
1187        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1188        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 6, 8]));
1189        let ranges = sequence.mask_to_offset_ranges(&mask);
1190        assert_eq!(ranges, vec![0..1, 2..4]);
1191
1192        // Test with multiple segments
1193        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1194        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1195        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1196        let sequence = RowIdSequence(vec![
1197            U64Segment::Range(0..5),
1198            U64Segment::RangeWithHoles {
1199                range: 100..105,
1200                holes: vec![103].into(),
1201            },
1202            U64Segment::SortedArray(vec![44, 46, 78].into()),
1203        ]);
1204        let mask = RowIdMask::from_allowed(RowIdTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1205        let ranges = sequence.mask_to_offset_ranges(&mask);
1206        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1207
1208        // Test with empty mask (should select everything)
1209        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1210        let mask = RowIdMask::default();
1211        let ranges = sequence.mask_to_offset_ranges(&mask);
1212        assert_eq!(ranges, vec![0..10]);
1213
1214        // Test with allow nothing mask
1215        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1216        let mask = RowIdMask::allow_nothing();
1217        let ranges = sequence.mask_to_offset_ranges(&mask);
1218        assert_eq!(ranges, vec![]);
1219    }
1220
1221    #[test]
1222    fn test_row_id_sequence_rechunk_with_empty_segments() {
1223        // equal case (segment exactly fills remaining space)
1224        let input_sequences = vec![
1225            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1226            RowIdSequence::from(20..23), // [20, 21, 22] - 3 elements
1227        ];
1228        let chunk_sizes = vec![2, 3]; // First chunk wants 2, second wants 3
1229
1230        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1231        assert_eq!(result.len(), 2);
1232        assert_eq!(result[0].len(), 2);
1233        assert_eq!(result[1].len(), 3);
1234
1235        let first_chunk: Vec<u64> = result[0].iter().collect();
1236        let second_chunk: Vec<u64> = result[1].iter().collect();
1237        assert_eq!(first_chunk, vec![0, 1]);
1238        assert_eq!(second_chunk, vec![20, 21, 22]);
1239
1240        // less case (segment smaller than remaining space)
1241        let input_sequences = vec![
1242            RowIdSequence::from(0..2),   // [0, 1] - 2 elements (less than remaining)
1243            RowIdSequence::from(20..21), // [20] - 1 element (less than remaining)
1244            RowIdSequence::from(30..32), // [30, 31] - 2 elements (exactly fills remaining)
1245        ];
1246        let chunk_sizes = vec![5]; // Request 5 elements, have exactly 5
1247
1248        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1249        assert_eq!(result.len(), 1);
1250        assert_eq!(result[0].len(), 5);
1251
1252        let elements: Vec<u64> = result[0].iter().collect();
1253        assert_eq!(elements, vec![0, 1, 20, 30, 31]);
1254
1255        // empty segment in the middle
1256        let input_sequences = vec![
1257            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1258            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1259            RowIdSequence::from(20..22), // [20, 21] - 2 elements
1260        ];
1261        let chunk_sizes = vec![3, 1];
1262        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1263
1264        assert_eq!(result.len(), 2);
1265        assert_eq!(result[0].len(), 3);
1266        assert_eq!(result[1].len(), 1);
1267
1268        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1269        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1270        assert_eq!(first_chunk_elements, vec![0, 1, 20]);
1271        assert_eq!(second_chunk_elements, vec![21]);
1272
1273        // multiple empty segments
1274        let input_sequences = vec![
1275            RowIdSequence::from(0..1),   // [0] - 1 element
1276            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1277            RowIdSequence::from(20..20), // [] - 0 elements (empty)
1278            RowIdSequence::from(30..32), // [30, 31] - 2 elements
1279        ];
1280        let chunk_sizes = vec![3];
1281        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1282
1283        assert_eq!(result.len(), 1);
1284        assert_eq!(result[0].len(), 3);
1285
1286        let elements: Vec<u64> = result[0].iter().collect();
1287        assert_eq!(elements, vec![0, 30, 31]);
1288
1289        // empty segment at chunk boundary
1290        let input_sequences = vec![
1291            RowIdSequence::from(0..3), // [0, 1, 2] - 3 elements (exactly fills first chunk)
1292            RowIdSequence::from(10..10), // [] - 0 elements (empty, at boundary)
1293            RowIdSequence::from(20..22), // [20, 21] - 2 elements (for second chunk)
1294        ];
1295        let chunk_sizes = vec![3, 2];
1296        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1297
1298        assert_eq!(result.len(), 2);
1299        assert_eq!(result[0].len(), 3);
1300        assert_eq!(result[1].len(), 2);
1301
1302        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1303        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1304        assert_eq!(first_chunk_elements, vec![0, 1, 2]);
1305        assert_eq!(second_chunk_elements, vec![20, 21]);
1306
1307        // empty segments with allow_incomplete = true
1308        let input_sequences = vec![
1309            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1310            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1311        ];
1312        let chunk_sizes = vec![5]; // Request more than available
1313        let result = rechunk_sequences(input_sequences, chunk_sizes, true).unwrap();
1314
1315        assert_eq!(result.len(), 1);
1316        assert_eq!(result[0].len(), 2);
1317
1318        let elements: Vec<u64> = result[0].iter().collect();
1319        assert_eq!(elements, vec![0, 1]);
1320    }
1321}