Skip to main content

lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::Range;
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23pub mod version;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::FragmentRowIdIndex;
28pub use index::RowIdIndex;
29use lance_core::{
30    Error, Result,
31    utils::mask::{RowAddrMask, RowAddrTreeMap},
32};
33use lance_io::ReadBatchParams;
34pub use serde::{read_row_ids, write_row_ids};
35
36use crate::utils::LanceIteratorExtension;
37use lance_core::utils::mask::RowSetOps;
38use segment::U64Segment;
39use tracing::instrument;
40
41/// A sequence of row ids.
42///
43/// Row ids are u64s that:
44///
45/// 1. Are **unique** within a table (except for tombstones)
46/// 2. Are *often* but not always sorted and/or contiguous.
47///
48/// This sequence of row ids is optimized to be compact when the row ids are
49/// contiguous and sorted. However, it does not require that the row ids are
50/// contiguous or sorted.
51///
52/// We can make optimizations that assume uniqueness.
53#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
54pub struct RowIdSequence(Vec<U64Segment>);
55
56impl std::fmt::Display for RowIdSequence {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        let mut iter = self.iter();
59        let mut first_10 = Vec::new();
60        let mut last_10 = Vec::new();
61        for row_id in iter.by_ref() {
62            first_10.push(row_id);
63            if first_10.len() > 10 {
64                break;
65            }
66        }
67
68        while let Some(row_id) = iter.next_back() {
69            last_10.push(row_id);
70            if last_10.len() > 10 {
71                break;
72            }
73        }
74        last_10.reverse();
75
76        let theres_more = iter.next().is_some();
77
78        write!(f, "[")?;
79        for row_id in first_10 {
80            write!(f, "{}", row_id)?;
81        }
82        if theres_more {
83            write!(f, ", ...")?;
84        }
85        for row_id in last_10 {
86            write!(f, ", {}", row_id)?;
87        }
88        write!(f, "]")
89    }
90}
91
92impl From<Range<u64>> for RowIdSequence {
93    fn from(range: Range<u64>) -> Self {
94        Self(vec![U64Segment::Range(range)])
95    }
96}
97
98impl From<&[u64]> for RowIdSequence {
99    fn from(row_ids: &[u64]) -> Self {
100        Self(vec![U64Segment::from_slice(row_ids)])
101    }
102}
103
104impl RowIdSequence {
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
110        self.0.iter().flat_map(|segment| segment.iter())
111    }
112
113    pub fn len(&self) -> u64 {
114        self.0.iter().map(|segment| segment.len() as u64).sum()
115    }
116
117    pub fn is_empty(&self) -> bool {
118        self.0.is_empty()
119    }
120
121    /// Combines this row id sequence with another row id sequence.
122    pub fn extend(&mut self, other: Self) {
123        // If the last element of this sequence and the first element of next
124        // sequence are ranges, we might be able to combine them into a single
125        // range.
126        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
127            (self.0.last(), other.0.first())
128            && range1.end == range2.start
129        {
130            let new_range = U64Segment::Range(range1.start..range2.end);
131            self.0.pop();
132            self.0.push(new_range);
133            self.0.extend(other.0.into_iter().skip(1));
134            return;
135        }
136        // TODO: add other optimizations, such as combining two RangeWithHoles.
137        self.0.extend(other.0);
138    }
139
140    /// Remove a set of row ids from the sequence.
141    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
142        // Order the row ids by position in which they appear in the sequence.
143        let (row_ids, offsets) = self.find_ids(row_ids);
144
145        let capacity = self.0.capacity();
146        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
147        let mut remaining_segments = old_segments.as_slice();
148
149        for (segment_idx, range) in offsets {
150            let segments_handled = old_segments.len() - remaining_segments.len();
151            let segments_to_add = segment_idx - segments_handled;
152            self.0
153                .extend_from_slice(&remaining_segments[..segments_to_add]);
154            remaining_segments = &remaining_segments[segments_to_add..];
155
156            let segment;
157            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
158
159            let segment_ids = &row_ids[range];
160            self.0.push(segment.delete(segment_ids));
161        }
162
163        // Add the remaining segments.
164        self.0.extend_from_slice(remaining_segments);
165    }
166
167    /// Delete row ids by position.
168    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
169        let mut local_positions = Vec::new();
170        let mut positions_iter = positions.into_iter();
171        let mut curr_position = positions_iter.next();
172        let mut offset = 0;
173        let mut cutoff = 0;
174
175        for segment in &mut self.0 {
176            // Make vector of local positions
177            cutoff += segment.len() as u32;
178            while let Some(position) = curr_position {
179                if position >= cutoff {
180                    break;
181                }
182                local_positions.push(position - offset);
183                curr_position = positions_iter.next();
184            }
185
186            if !local_positions.is_empty() {
187                segment.mask(&local_positions);
188                local_positions.clear();
189            }
190            offset = cutoff;
191        }
192
193        self.0.retain(|segment| !segment.is_empty());
194
195        Ok(())
196    }
197
198    /// Find the row ids in the sequence.
199    ///
200    /// Returns the row ids sorted by their appearance in the sequence.
201    /// Also returns the segment index and the range where that segment's
202    /// row id matches are found in the returned row id vector.
203    fn find_ids(
204        &self,
205        row_ids: impl IntoIterator<Item = u64>,
206    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
207        // Often, the row ids will already be provided in the order they appear.
208        // So the optimal way to search will be to cycle through rather than
209        // restarting the search from the beginning each time.
210        let mut segment_iter = self.0.iter().enumerate().cycle();
211
212        let mut segment_matches = vec![Vec::new(); self.0.len()];
213
214        row_ids.into_iter().for_each(|row_id| {
215            let mut i = 0;
216            // If we've cycled through all segments, we know the row id is not in the sequence.
217            while i < self.0.len() {
218                let (segment_idx, segment) = segment_iter.next().unwrap();
219                if segment.range().is_some_and(|range| range.contains(&row_id))
220                    && let Some(offset) = segment.position(row_id)
221                {
222                    segment_matches.get_mut(segment_idx).unwrap().push(offset);
223                    // The row id was not found it the segment. It might be in a later segment.
224                }
225                i += 1;
226            }
227        });
228        for matches in &mut segment_matches {
229            matches.sort_unstable();
230        }
231
232        let mut offset = 0;
233        let segment_ranges = segment_matches
234            .iter()
235            .enumerate()
236            .filter(|(_, matches)| !matches.is_empty())
237            .map(|(segment_idx, matches)| {
238                let range = offset..offset + matches.len();
239                offset += matches.len();
240                (segment_idx, range)
241            })
242            .collect();
243        let row_ids = segment_matches
244            .into_iter()
245            .enumerate()
246            .flat_map(|(segment_idx, offset)| {
247                offset
248                    .into_iter()
249                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
250            })
251            .collect();
252
253        (row_ids, segment_ranges)
254    }
255
256    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
257        if len == 0 {
258            return RowIdSeqSlice {
259                segments: &[],
260                offset_start: 0,
261                offset_last: 0,
262            };
263        }
264
265        // Find the starting position
266        let mut offset_start = offset;
267        let mut segment_offset = 0;
268        for segment in &self.0 {
269            let segment_len = segment.len();
270            if offset_start < segment_len {
271                break;
272            }
273            offset_start -= segment_len;
274            segment_offset += 1;
275        }
276
277        // Find the ending position
278        let mut offset_last = offset_start + len;
279        let mut segment_offset_last = segment_offset;
280        for segment in &self.0[segment_offset..] {
281            let segment_len = segment.len();
282            if offset_last <= segment_len {
283                break;
284            }
285            offset_last -= segment_len;
286            segment_offset_last += 1;
287        }
288
289        RowIdSeqSlice {
290            segments: &self.0[segment_offset..=segment_offset_last],
291            offset_start,
292            offset_last,
293        }
294    }
295
296    /// Get the row id at the given index.
297    ///
298    /// If the index is out of bounds, this will return None.
299    pub fn get(&self, index: usize) -> Option<u64> {
300        let mut offset = 0;
301        for segment in &self.0 {
302            let segment_len = segment.len();
303            if index < offset + segment_len {
304                return segment.get(index - offset);
305            }
306            offset += segment_len;
307        }
308        None
309    }
310
311    /// Get row ids from the sequence based on the provided _sorted_ offsets
312    ///
313    /// Any out of bounds offsets will be ignored
314    ///
315    /// # Panics
316    ///
317    /// If the input selection is not sorted, this function will panic
318    pub fn select<'a>(
319        &'a self,
320        selection: impl Iterator<Item = usize> + 'a,
321    ) -> impl Iterator<Item = u64> + 'a {
322        let mut seg_iter = self.0.iter();
323        let mut cur_seg = seg_iter.next();
324        let mut rows_passed = 0;
325        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
326        let mut last_index = 0;
327        selection.filter_map(move |index| {
328            if index < last_index {
329                panic!("Selection is not sorted");
330            }
331            last_index = index;
332
333            cur_seg?;
334
335            while (index - rows_passed) >= cur_seg_len {
336                rows_passed += cur_seg_len;
337                cur_seg = seg_iter.next();
338                if let Some(cur_seg) = cur_seg {
339                    cur_seg_len = cur_seg.len();
340                } else {
341                    return None;
342                }
343            }
344
345            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
346        })
347    }
348
349    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
350    /// in the sequence.
351    ///
352    /// For example, given a mask that selects all even ids and a sequence that is
353    /// [80..85, 86..90, 14]
354    ///
355    /// this will return [0, 2, 4, 5, 7, 9]
356    /// because the range expands to
357    ///
358    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
359    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
360    ///
361    /// This function is useful when determining which row offsets to read from a fragment given
362    /// a mask.
363    #[instrument(level = "debug", skip_all)]
364    pub fn mask_to_offset_ranges(&self, mask: &RowAddrMask) -> Vec<Range<u64>> {
365        let mut offset = 0;
366        let mut ranges = Vec::new();
367        for segment in &self.0 {
368            match segment {
369                U64Segment::Range(range) => {
370                    let mut ids = RowAddrTreeMap::from(range.clone());
371                    ids.mask(mask);
372                    ranges.extend(GroupingIterator::new(
373                        unsafe { ids.into_addr_iter() }.map(|addr| addr - range.start + offset),
374                    ));
375                    offset += range.end - range.start;
376                }
377                U64Segment::RangeWithHoles { range, holes } => {
378                    let offset_start = offset;
379                    let mut ids = RowAddrTreeMap::from(range.clone());
380                    offset += range.end - range.start;
381                    for hole in holes.iter() {
382                        if ids.remove(hole) {
383                            offset -= 1;
384                        }
385                    }
386                    ids.mask(mask);
387
388                    // Sadly we can't just subtract the offset because of the holes
389                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
390                    sorted_holes.sort_unstable();
391                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
392                    let mut holes_passed = 0;
393                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
394                        |addr| {
395                            while let Some(next_hole) = next_holes_iter.peek() {
396                                if *next_hole < addr {
397                                    next_holes_iter.next();
398                                    holes_passed += 1;
399                                } else {
400                                    break;
401                                }
402                            }
403                            addr - range.start + offset_start - holes_passed
404                        },
405                    )));
406                }
407                U64Segment::RangeWithBitmap { range, bitmap } => {
408                    let mut ids = RowAddrTreeMap::from(range.clone());
409                    let offset_start = offset;
410                    offset += range.end - range.start;
411                    for (i, val) in range.clone().enumerate() {
412                        if !bitmap.get(i) && ids.remove(val) {
413                            offset -= 1;
414                        }
415                    }
416                    ids.mask(mask);
417                    let mut bitmap_iter = bitmap.iter();
418                    let mut bitmap_iter_pos = 0;
419                    let mut holes_passed = 0;
420                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
421                        |addr| {
422                            let offset_no_holes = addr - range.start + offset_start;
423                            while bitmap_iter_pos < offset_no_holes {
424                                if !bitmap_iter.next().unwrap() {
425                                    holes_passed += 1;
426                                }
427                                bitmap_iter_pos += 1;
428                            }
429                            offset_no_holes - holes_passed
430                        },
431                    )));
432                }
433                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
434                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
435                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
436                        |(off, id)| {
437                            if mask.selected(id) {
438                                Some(off as u64 + offset)
439                            } else {
440                                None
441                            }
442                        },
443                    )));
444                    offset += array.len() as u64;
445                }
446            }
447        }
448        ranges
449    }
450}
451
452/// An iterator that groups row ids into ranges
453///
454/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
455/// this will return an iterator of [(1..4), (5..8), (10..13)]
456struct GroupingIterator<I: Iterator<Item = u64>> {
457    iter: I,
458    cur_range: Option<Range<u64>>,
459}
460
461impl<I: Iterator<Item = u64>> GroupingIterator<I> {
462    fn new(iter: I) -> Self {
463        Self {
464            iter,
465            cur_range: None,
466        }
467    }
468}
469
470impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
471    type Item = Range<u64>;
472
473    fn next(&mut self) -> Option<Self::Item> {
474        for id in self.iter.by_ref() {
475            if let Some(range) = self.cur_range.as_mut() {
476                if range.end == id {
477                    range.end = id + 1;
478                } else {
479                    let ret = Some(range.clone());
480                    self.cur_range = Some(id..id + 1);
481                    return ret;
482                }
483            } else {
484                self.cur_range = Some(id..id + 1);
485            }
486        }
487        self.cur_range.take()
488    }
489}
490
491impl From<&RowIdSequence> for RowAddrTreeMap {
492    fn from(row_ids: &RowIdSequence) -> Self {
493        let mut tree_map = Self::new();
494        for segment in &row_ids.0 {
495            match segment {
496                U64Segment::Range(range) => {
497                    tree_map.insert_range(range.clone());
498                }
499                U64Segment::RangeWithBitmap { range, bitmap } => {
500                    tree_map.insert_range(range.clone());
501                    for (i, val) in range.clone().enumerate() {
502                        if !bitmap.get(i) {
503                            tree_map.remove(val);
504                        }
505                    }
506                }
507                U64Segment::RangeWithHoles { range, holes } => {
508                    tree_map.insert_range(range.clone());
509                    for hole in holes.iter() {
510                        tree_map.remove(hole);
511                    }
512                }
513                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
514                    for val in array.iter() {
515                        tree_map.insert(val);
516                    }
517                }
518            }
519        }
520        tree_map
521    }
522}
523
524#[derive(Debug)]
525pub struct RowIdSeqSlice<'a> {
526    /// Current slice of the segments we cover
527    segments: &'a [U64Segment],
528    /// Offset into the first segment to start iterating from
529    offset_start: usize,
530    /// Offset into the last segment to stop iterating at
531    offset_last: usize,
532}
533
534impl RowIdSeqSlice<'_> {
535    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
536        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
537        known_size -= self.offset_start;
538        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
539
540        let end = self.segments.len().saturating_sub(1);
541        self.segments
542            .iter()
543            .enumerate()
544            .flat_map(move |(i, segment)| {
545                match i {
546                    0 if self.segments.len() == 1 => {
547                        let len = self.offset_last - self.offset_start;
548                        // TODO: Optimize this so we don't have to use skip
549                        // (take is probably fine though.)
550                        Box::new(segment.iter().skip(self.offset_start).take(len))
551                            as Box<dyn Iterator<Item = u64>>
552                    }
553                    0 => Box::new(segment.iter().skip(self.offset_start)),
554                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
555                    _ => Box::new(segment.iter()),
556                }
557            })
558            .exact_size(known_size)
559    }
560}
561
562/// Re-chunk a sequences of row ids into chunks of a given size.
563///
564/// The sequences may less than chunk sizes, because the sequences only
565/// contains the row ids that we want to keep, they come from the updates records.
566/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
567/// So if the sequences are smaller than the chunk sizes, we need to
568/// assign the incremental row ids in the further step. This behavior is controlled by the
569/// `allow_incomplete` parameter.
570///
571/// # Errors
572///
573/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
574/// is not equal to the total number of row ids in the sequences.
575pub fn rechunk_sequences(
576    sequences: impl IntoIterator<Item = RowIdSequence>,
577    chunk_sizes: impl IntoIterator<Item = u64>,
578    allow_incomplete: bool,
579) -> Result<Vec<RowIdSequence>> {
580    // TODO: return an iterator. (with a good size hint?)
581    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
582    let total_chunks = chunk_sizes_vec.len();
583    let mut chunked_sequences = Vec::with_capacity(total_chunks);
584    let mut segment_iter = sequences
585        .into_iter()
586        .flat_map(|sequence| sequence.0.into_iter())
587        .peekable();
588
589    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
590        Error::invalid_input(format!(
591            "Got too few segments for chunk {}. Expected chunk size: {}, remaining needed: {}",
592            chunk_index, expected_chunk_size, remaining
593        ))
594    };
595
596    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
597        Error::invalid_input(format!(
598            "Got too many segments for the provided chunk lengths. Processed {} chunks out of {} expected",
599            processed_chunks, total_chunk_sizes
600        ))
601    };
602
603    let mut segment_offset = 0_u64;
604
605    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
606        let chunk_size = *chunk_size;
607        let mut sequence = RowIdSequence(Vec::new());
608        let mut remaining = chunk_size;
609
610        while remaining > 0 {
611            let remaining_in_segment = segment_iter
612                .peek()
613                .map_or(0, |segment| segment.len() as u64 - segment_offset);
614
615            // Step 1: Handle segment remaining to be empty(also empty seg) - skip and continue
616            if remaining_in_segment == 0 {
617                if segment_iter.next().is_some() {
618                    segment_offset = 0;
619                    continue;
620                } else {
621                    // No more segments available
622                    if allow_incomplete {
623                        break;
624                    } else {
625                        return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
626                    }
627                }
628            }
629
630            // Step 2: Handle still remaining segment based on size comparison
631            match remaining_in_segment.cmp(&remaining) {
632                std::cmp::Ordering::Greater => {
633                    // Segment is larger than remaining space - slice it
634                    let segment = segment_iter
635                        .peek()
636                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
637                        .slice(segment_offset as usize, remaining as usize);
638                    sequence.extend(RowIdSequence(vec![segment]));
639                    segment_offset += remaining;
640                    remaining = 0;
641                }
642                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
643                    // UNIFIED HANDLING: Both equal and less cases subtract from remaining
644                    // Equal case: remaining -= remaining_in_segment (remaining becomes 0)
645                    // Less case: remaining -= remaining_in_segment (remaining becomes positive)
646                    let segment = segment_iter
647                        .next()
648                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
649                        .slice(segment_offset as usize, remaining_in_segment as usize);
650                    sequence.extend(RowIdSequence(vec![segment]));
651                    segment_offset = 0;
652                    remaining -= remaining_in_segment;
653                }
654            }
655        }
656
657        chunked_sequences.push(sequence);
658    }
659
660    if segment_iter.peek().is_some() {
661        return Err(too_many_segments_error(
662            chunked_sequences.len(),
663            total_chunks,
664        ));
665    }
666
667    Ok(chunked_sequences)
668}
669
670/// Selects the row ids from a sequence based on the provided offsets.
671pub fn select_row_ids<'a>(
672    sequence: &'a RowIdSequence,
673    offsets: &'a ReadBatchParams,
674) -> Result<Vec<u64>> {
675    let out_of_bounds_err = |offset: u32| {
676        Error::invalid_input(format!(
677            "Index out of bounds: {} for sequence of length {}",
678            offset,
679            sequence.len()
680        ))
681    };
682
683    match offsets {
684        // TODO: Optimize this if indices are sorted, which is a common case.
685        ReadBatchParams::Indices(indices) => indices
686            .values()
687            .iter()
688            .map(|index| {
689                sequence
690                    .get(*index as usize)
691                    .ok_or_else(|| out_of_bounds_err(*index))
692            })
693            .collect(),
694        ReadBatchParams::Range(range) => {
695            if range.end > sequence.len() as usize {
696                return Err(out_of_bounds_err(range.end as u32));
697            }
698            let sequence = sequence.slice(range.start, range.end - range.start);
699            Ok(sequence.iter().collect())
700        }
701        ReadBatchParams::Ranges(ranges) => {
702            let num_rows = ranges
703                .iter()
704                .map(|r| (r.end - r.start) as usize)
705                .sum::<usize>();
706            let mut result = Vec::with_capacity(num_rows);
707            for range in ranges.as_ref() {
708                if range.end > sequence.len() {
709                    return Err(out_of_bounds_err(range.end as u32));
710                }
711                let sequence =
712                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
713                result.extend(sequence.iter());
714            }
715            Ok(result)
716        }
717
718        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
719        ReadBatchParams::RangeTo(to) => {
720            if to.end > sequence.len() as usize {
721                return Err(out_of_bounds_err(to.end as u32));
722            }
723            let len = to.end;
724            let sequence = sequence.slice(0, len);
725            Ok(sequence.iter().collect())
726        }
727        ReadBatchParams::RangeFrom(from) => {
728            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
729            Ok(sequence.iter().collect())
730        }
731    }
732}
733
734#[cfg(test)]
735mod test {
736    use super::*;
737
738    use pretty_assertions::assert_eq;
739    use test::bitmap::Bitmap;
740
741    #[test]
742    fn test_row_id_sequence_from_range() {
743        let sequence = RowIdSequence::from(0..10);
744        assert_eq!(sequence.len(), 10);
745        assert_eq!(sequence.is_empty(), false);
746
747        let iter = sequence.iter();
748        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
749    }
750
751    #[test]
752    fn test_row_id_sequence_extend() {
753        let mut sequence = RowIdSequence::from(0..10);
754        sequence.extend(RowIdSequence::from(10..20));
755        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
756
757        let mut sequence = RowIdSequence::from(0..10);
758        sequence.extend(RowIdSequence::from(20..30));
759        assert_eq!(
760            sequence.0,
761            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
762        );
763    }
764
765    #[test]
766    fn test_row_id_sequence_delete() {
767        let mut sequence = RowIdSequence::from(0..10);
768        sequence.delete(vec![1, 3, 5, 7, 9]);
769        let mut expected_bitmap = Bitmap::new_empty(9);
770        for i in [0, 2, 4, 6, 8] {
771            expected_bitmap.set(i as usize);
772        }
773        assert_eq!(
774            sequence.0,
775            vec![U64Segment::RangeWithBitmap {
776                range: 0..9,
777                bitmap: expected_bitmap
778            },]
779        );
780
781        let mut sequence = RowIdSequence::from(0..10);
782        sequence.extend(RowIdSequence::from(12..20));
783        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
784        assert_eq!(
785            sequence.0,
786            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
787        );
788
789        let mut sequence = RowIdSequence::from(0..10);
790        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
791        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
792    }
793
794    #[test]
795    fn test_row_id_slice() {
796        // The type of sequence isn't that relevant to the implementation, so
797        // we can just have a single one with all the segment types.
798        let sequence = RowIdSequence(vec![
799            U64Segment::Range(30..35), // 5
800            U64Segment::RangeWithHoles {
801                // 8
802                range: 50..60,
803                holes: vec![53, 54].into(),
804            },
805            U64Segment::SortedArray(vec![7, 9].into()), // 2
806            U64Segment::RangeWithBitmap {
807                range: 0..5,
808                bitmap: [true, false, true, false, true].as_slice().into(),
809            },
810            U64Segment::Array(vec![35, 39].into()),
811            U64Segment::Range(40..50),
812        ]);
813
814        // All possible offsets and lengths
815        for offset in 0..sequence.len() as usize {
816            for len in 0..sequence.len() as usize {
817                if offset + len > sequence.len() as usize {
818                    continue;
819                }
820                let slice = sequence.slice(offset, len);
821
822                let actual = slice.iter().collect::<Vec<_>>();
823                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
824                assert_eq!(
825                    actual, expected,
826                    "Failed for offset {} and len {}",
827                    offset, len
828                );
829
830                let (claimed_size, claimed_max) = slice.iter().size_hint();
831                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
832                assert_eq!(claimed_size, actual.len()); // Correct size hint
833            }
834        }
835    }
836
837    #[test]
838    fn test_row_id_slice_empty() {
839        let sequence = RowIdSequence::from(0..10);
840        let slice = sequence.slice(10, 0);
841        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
842    }
843
844    #[test]
845    fn test_row_id_sequence_rechunk() {
846        fn assert_rechunked(
847            input: Vec<RowIdSequence>,
848            chunk_sizes: Vec<u64>,
849            expected: Vec<RowIdSequence>,
850        ) {
851            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
852            assert_eq!(chunked, expected);
853        }
854
855        // Small pieces to larger ones
856        let many_segments = vec![
857            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
858            RowIdSequence::from(10..18),
859            RowIdSequence::from(18..28),
860            RowIdSequence::from(28..30),
861        ];
862        let fewer_segments = vec![
863            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
864            RowIdSequence::from(10..30),
865        ];
866        assert_rechunked(
867            many_segments.clone(),
868            fewer_segments.iter().map(|seq| seq.len()).collect(),
869            fewer_segments.clone(),
870        );
871
872        // Large pieces to smaller ones
873        assert_rechunked(
874            fewer_segments,
875            many_segments.iter().map(|seq| seq.len()).collect(),
876            many_segments.clone(),
877        );
878
879        // Equal pieces
880        assert_rechunked(
881            many_segments.clone(),
882            many_segments.iter().map(|seq| seq.len()).collect(),
883            many_segments.clone(),
884        );
885
886        // Too few segments -> error
887        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
888        assert!(result.is_err());
889
890        // Too many segments -> error
891        let result = rechunk_sequences(many_segments, vec![5], false);
892        assert!(result.is_err());
893    }
894
895    #[test]
896    fn test_select_row_ids() {
897        // All forms of offsets
898        let offsets = [
899            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
900            ReadBatchParams::Range(2..8),
901            ReadBatchParams::RangeFull,
902            ReadBatchParams::RangeTo(..5),
903            ReadBatchParams::RangeFrom(5..),
904            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
905        ];
906
907        // Sequences with all segment types. These have at least 10 elements,
908        // so they are valid for all the above offsets.
909        let sequences = [
910            RowIdSequence(vec![
911                U64Segment::Range(0..5),
912                U64Segment::RangeWithHoles {
913                    range: 50..60,
914                    holes: vec![53, 54].into(),
915                },
916                U64Segment::SortedArray(vec![7, 9].into()),
917            ]),
918            RowIdSequence(vec![
919                U64Segment::RangeWithBitmap {
920                    range: 0..5,
921                    bitmap: [true, false, true, false, true].as_slice().into(),
922                },
923                U64Segment::Array(vec![30, 20, 10].into()),
924                U64Segment::Range(40..50),
925            ]),
926        ];
927
928        for params in offsets {
929            for sequence in &sequences {
930                let row_ids = select_row_ids(sequence, &params).unwrap();
931                let flat_sequence = sequence.iter().collect::<Vec<_>>();
932
933                // Transform params into bounded ones
934                let selection: Vec<usize> = match &params {
935                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
936                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
937                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
938                    ReadBatchParams::Range(range) => range.clone().collect(),
939                    ReadBatchParams::Ranges(ranges) => ranges
940                        .iter()
941                        .flat_map(|r| r.start as usize..r.end as usize)
942                        .collect(),
943                    ReadBatchParams::Indices(indices) => {
944                        indices.values().iter().map(|i| *i as usize).collect()
945                    }
946                };
947
948                let expected = selection
949                    .into_iter()
950                    .map(|i| flat_sequence[i])
951                    .collect::<Vec<_>>();
952                assert_eq!(
953                    row_ids, expected,
954                    "Failed for params {:?} on the sequence {:?}",
955                    &params, sequence
956                );
957            }
958        }
959    }
960
961    #[test]
962    fn test_select_row_ids_out_of_bounds() {
963        let offsets = [
964            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
965            ReadBatchParams::Range(2..1000),
966            ReadBatchParams::RangeTo(..1000),
967        ];
968
969        let sequence = RowIdSequence::from(0..10);
970
971        for params in offsets {
972            let result = select_row_ids(&sequence, &params);
973            assert!(result.is_err());
974            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
975        }
976    }
977
978    #[test]
979    fn test_row_id_sequence_to_treemap() {
980        let sequence = RowIdSequence(vec![
981            U64Segment::Range(0..5),
982            U64Segment::RangeWithHoles {
983                range: 50..60,
984                holes: vec![53, 54].into(),
985            },
986            U64Segment::SortedArray(vec![7, 9].into()),
987            U64Segment::RangeWithBitmap {
988                range: 10..15,
989                bitmap: [true, false, true, false, true].as_slice().into(),
990            },
991            U64Segment::Array(vec![35, 39].into()),
992            U64Segment::Range(40..50),
993        ]);
994
995        let tree_map = RowAddrTreeMap::from(&sequence);
996        let expected = vec![
997            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
998            51, 52, 55, 56, 57, 58, 59,
999        ]
1000        .into_iter()
1001        .collect::<RowAddrTreeMap>();
1002        assert_eq!(tree_map, expected);
1003    }
1004
1005    #[test]
1006    fn test_row_addr_mask() {
1007        // 0, 1, 2, 3, 4
1008        // 50, 51, 52, 55, 56, 57, 58, 59
1009        // 7, 9
1010        // 10, 12, 14
1011        // 35, 39
1012        let sequence = RowIdSequence(vec![
1013            U64Segment::Range(0..5),
1014            U64Segment::RangeWithHoles {
1015                range: 50..60,
1016                holes: vec![53, 54].into(),
1017            },
1018            U64Segment::SortedArray(vec![7, 9].into()),
1019            U64Segment::RangeWithBitmap {
1020                range: 10..15,
1021                bitmap: [true, false, true, false, true].as_slice().into(),
1022            },
1023            U64Segment::Array(vec![35, 39].into()),
1024        ]);
1025
1026        // Masking one in each segment
1027        let values_to_remove = [4, 55, 7, 12, 39];
1028        let positions_to_remove = sequence
1029            .iter()
1030            .enumerate()
1031            .filter_map(|(i, val)| {
1032                if values_to_remove.contains(&val) {
1033                    Some(i as u32)
1034                } else {
1035                    None
1036                }
1037            })
1038            .collect::<Vec<_>>();
1039        let mut sequence = sequence;
1040        sequence.mask(positions_to_remove).unwrap();
1041        let expected = RowIdSequence(vec![
1042            U64Segment::Range(0..4),
1043            U64Segment::RangeWithBitmap {
1044                range: 50..60,
1045                bitmap: [
1046                    true, true, true, false, false, false, true, true, true, true,
1047                ]
1048                .as_slice()
1049                .into(),
1050            },
1051            U64Segment::Range(9..10),
1052            U64Segment::RangeWithBitmap {
1053                range: 10..15,
1054                bitmap: [true, false, false, false, true].as_slice().into(),
1055            },
1056            U64Segment::Array(vec![35].into()),
1057        ]);
1058        assert_eq!(sequence, expected);
1059    }
1060
1061    #[test]
1062    fn test_row_addr_mask_everything() {
1063        let mut sequence = RowIdSequence(vec![
1064            U64Segment::Range(0..5),
1065            U64Segment::SortedArray(vec![7, 9].into()),
1066        ]);
1067        sequence.mask(0..sequence.len() as u32).unwrap();
1068        let expected = RowIdSequence(vec![]);
1069        assert_eq!(sequence, expected);
1070    }
1071
1072    #[test]
1073    fn test_selection() {
1074        let sequence = RowIdSequence(vec![
1075            U64Segment::Range(0..5),
1076            U64Segment::Range(10..15),
1077            U64Segment::Range(20..25),
1078        ]);
1079        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1080        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1081    }
1082
1083    #[test]
1084    #[should_panic]
1085    fn test_selection_unsorted() {
1086        let sequence = RowIdSequence(vec![
1087            U64Segment::Range(0..5),
1088            U64Segment::Range(10..15),
1089            U64Segment::Range(20..25),
1090        ]);
1091        let _ = sequence
1092            .select(vec![2, 4, 3].into_iter())
1093            .collect::<Vec<_>>();
1094    }
1095
1096    #[test]
1097    fn test_mask_to_offset_ranges() {
1098        // Tests with a simple range segment
1099        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1100        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1101        let ranges = sequence.mask_to_offset_ranges(&mask);
1102        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1103
1104        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1105        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[54]));
1106        let ranges = sequence.mask_to_offset_ranges(&mask);
1107        assert_eq!(ranges, vec![14..15]);
1108
1109        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1110        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[54]));
1111        let ranges = sequence.mask_to_offset_ranges(&mask);
1112        assert_eq!(ranges, vec![0..14, 15..20]);
1113
1114        // Test with a range segment with holes
1115        // 0, 1, 3, 4, 5, 7, 8, 9
1116        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1117            range: 0..10,
1118            holes: vec![2, 6].into(),
1119        }]);
1120        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1121        let ranges = sequence.mask_to_offset_ranges(&mask);
1122        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1123
1124        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1125            range: 40..60,
1126            holes: vec![47, 43].into(),
1127        }]);
1128        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1129        let ranges = sequence.mask_to_offset_ranges(&mask);
1130        assert_eq!(ranges, vec![3..4]);
1131
1132        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1133            range: 40..60,
1134            holes: vec![47, 43].into(),
1135        }]);
1136        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1137        let ranges = sequence.mask_to_offset_ranges(&mask);
1138        assert_eq!(ranges, vec![0..3, 4..18]);
1139
1140        // Test with a range segment with bitmap
1141        // 0, 1, 4, 5, 6, 7
1142        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1143            range: 0..10,
1144            bitmap: [
1145                true, true, false, false, true, true, true, true, false, false,
1146            ]
1147            .as_slice()
1148            .into(),
1149        }]);
1150        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1151        let ranges = sequence.mask_to_offset_ranges(&mask);
1152        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1153
1154        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1155            range: 40..45,
1156            bitmap: [true, true, false, false, true].as_slice().into(),
1157        }]);
1158        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1159        let ranges = sequence.mask_to_offset_ranges(&mask);
1160        assert_eq!(ranges, vec![2..3]);
1161
1162        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1163            range: 40..45,
1164            bitmap: [true, true, false, false, true].as_slice().into(),
1165        }]);
1166        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1167        let ranges = sequence.mask_to_offset_ranges(&mask);
1168        assert_eq!(ranges, vec![0..2]);
1169
1170        // Test with a sorted array segment
1171        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1172        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1173        let ranges = sequence.mask_to_offset_ranges(&mask);
1174        assert_eq!(ranges, vec![0..1, 3..5]);
1175
1176        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1177        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1178        let ranges = sequence.mask_to_offset_ranges(&mask);
1179        assert_eq!(ranges, vec![0..1, 2..4]);
1180
1181        // Test with multiple segments
1182        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1183        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1184        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1185        let sequence = RowIdSequence(vec![
1186            U64Segment::Range(0..5),
1187            U64Segment::RangeWithHoles {
1188                range: 100..105,
1189                holes: vec![103].into(),
1190            },
1191            U64Segment::SortedArray(vec![44, 46, 78].into()),
1192        ]);
1193        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1194        let ranges = sequence.mask_to_offset_ranges(&mask);
1195        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1196
1197        // Test with empty mask (should select everything)
1198        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1199        let mask = RowAddrMask::default();
1200        let ranges = sequence.mask_to_offset_ranges(&mask);
1201        assert_eq!(ranges, vec![0..10]);
1202
1203        // Test with allow nothing mask
1204        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1205        let mask = RowAddrMask::allow_nothing();
1206        let ranges = sequence.mask_to_offset_ranges(&mask);
1207        assert_eq!(ranges, vec![]);
1208    }
1209
1210    #[test]
1211    fn test_row_id_sequence_rechunk_with_empty_segments() {
1212        // equal case (segment exactly fills remaining space)
1213        let input_sequences = vec![
1214            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1215            RowIdSequence::from(20..23), // [20, 21, 22] - 3 elements
1216        ];
1217        let chunk_sizes = vec![2, 3]; // First chunk wants 2, second wants 3
1218
1219        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1220        assert_eq!(result.len(), 2);
1221        assert_eq!(result[0].len(), 2);
1222        assert_eq!(result[1].len(), 3);
1223
1224        let first_chunk: Vec<u64> = result[0].iter().collect();
1225        let second_chunk: Vec<u64> = result[1].iter().collect();
1226        assert_eq!(first_chunk, vec![0, 1]);
1227        assert_eq!(second_chunk, vec![20, 21, 22]);
1228
1229        // less case (segment smaller than remaining space)
1230        let input_sequences = vec![
1231            RowIdSequence::from(0..2),   // [0, 1] - 2 elements (less than remaining)
1232            RowIdSequence::from(20..21), // [20] - 1 element (less than remaining)
1233            RowIdSequence::from(30..32), // [30, 31] - 2 elements (exactly fills remaining)
1234        ];
1235        let chunk_sizes = vec![5]; // Request 5 elements, have exactly 5
1236
1237        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1238        assert_eq!(result.len(), 1);
1239        assert_eq!(result[0].len(), 5);
1240
1241        let elements: Vec<u64> = result[0].iter().collect();
1242        assert_eq!(elements, vec![0, 1, 20, 30, 31]);
1243
1244        // empty segment in the middle
1245        let input_sequences = vec![
1246            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1247            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1248            RowIdSequence::from(20..22), // [20, 21] - 2 elements
1249        ];
1250        let chunk_sizes = vec![3, 1];
1251        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1252
1253        assert_eq!(result.len(), 2);
1254        assert_eq!(result[0].len(), 3);
1255        assert_eq!(result[1].len(), 1);
1256
1257        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1258        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1259        assert_eq!(first_chunk_elements, vec![0, 1, 20]);
1260        assert_eq!(second_chunk_elements, vec![21]);
1261
1262        // multiple empty segments
1263        let input_sequences = vec![
1264            RowIdSequence::from(0..1),   // [0] - 1 element
1265            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1266            RowIdSequence::from(20..20), // [] - 0 elements (empty)
1267            RowIdSequence::from(30..32), // [30, 31] - 2 elements
1268        ];
1269        let chunk_sizes = vec![3];
1270        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1271
1272        assert_eq!(result.len(), 1);
1273        assert_eq!(result[0].len(), 3);
1274
1275        let elements: Vec<u64> = result[0].iter().collect();
1276        assert_eq!(elements, vec![0, 30, 31]);
1277
1278        // empty segment at chunk boundary
1279        let input_sequences = vec![
1280            RowIdSequence::from(0..3), // [0, 1, 2] - 3 elements (exactly fills first chunk)
1281            RowIdSequence::from(10..10), // [] - 0 elements (empty, at boundary)
1282            RowIdSequence::from(20..22), // [20, 21] - 2 elements (for second chunk)
1283        ];
1284        let chunk_sizes = vec![3, 2];
1285        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1286
1287        assert_eq!(result.len(), 2);
1288        assert_eq!(result[0].len(), 3);
1289        assert_eq!(result[1].len(), 2);
1290
1291        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1292        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1293        assert_eq!(first_chunk_elements, vec![0, 1, 2]);
1294        assert_eq!(second_chunk_elements, vec![20, 21]);
1295
1296        // empty segments with allow_incomplete = true
1297        let input_sequences = vec![
1298            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1299            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1300        ];
1301        let chunk_sizes = vec![5]; // Request more than available
1302        let result = rechunk_sequences(input_sequences, chunk_sizes, true).unwrap();
1303
1304        assert_eq!(result.len(), 1);
1305        assert_eq!(result[0].len(), 2);
1306
1307        let elements: Vec<u64> = result[0].iter().collect();
1308        assert_eq!(elements, vec![0, 1]);
1309    }
1310}