Skip to main content

lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::{Range, RangeInclusive};
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23pub mod version;
24
25use lance_core::deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::FragmentRowIdIndex;
28pub use index::RowIdIndex;
29use lance_core::{Error, Result};
30use lance_io::ReadBatchParams;
31use lance_select::{RowAddrMask, RowAddrTreeMap, RowSetOps};
32pub use serde::{read_row_ids, write_row_ids};
33
34use crate::utils::LanceIteratorExtension;
35use segment::U64Segment;
36use tracing::instrument;
37
38/// A sequence of row ids.
39///
40/// Row ids are u64s that:
41///
42/// 1. Are **unique** within a table (except for tombstones)
43/// 2. Are *often* but not always sorted and/or contiguous.
44///
45/// This sequence of row ids is optimized to be compact when the row ids are
46/// contiguous and sorted. However, it does not require that the row ids are
47/// contiguous or sorted.
48///
49/// We can make optimizations that assume uniqueness.
50#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
51pub struct RowIdSequence(Vec<U64Segment>);
52
53impl std::fmt::Display for RowIdSequence {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        let mut iter = self.iter();
56        let mut first_10 = Vec::new();
57        let mut last_10 = Vec::new();
58        for row_id in iter.by_ref() {
59            first_10.push(row_id);
60            if first_10.len() > 10 {
61                break;
62            }
63        }
64
65        while let Some(row_id) = iter.next_back() {
66            last_10.push(row_id);
67            if last_10.len() > 10 {
68                break;
69            }
70        }
71        last_10.reverse();
72
73        let theres_more = iter.next().is_some();
74
75        write!(f, "[")?;
76        for row_id in first_10 {
77            write!(f, "{}", row_id)?;
78        }
79        if theres_more {
80            write!(f, ", ...")?;
81        }
82        for row_id in last_10 {
83            write!(f, ", {}", row_id)?;
84        }
85        write!(f, "]")
86    }
87}
88
89impl From<Range<u64>> for RowIdSequence {
90    fn from(range: Range<u64>) -> Self {
91        Self(vec![U64Segment::Range(range)])
92    }
93}
94
95impl From<&[u64]> for RowIdSequence {
96    fn from(row_ids: &[u64]) -> Self {
97        Self(vec![U64Segment::from_slice(row_ids)])
98    }
99}
100
101impl RowIdSequence {
102    pub fn new() -> Self {
103        Self::default()
104    }
105
106    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
107        self.0.iter().flat_map(|segment| segment.iter())
108    }
109
110    pub fn len(&self) -> u64 {
111        self.0.iter().map(|segment| segment.len() as u64).sum()
112    }
113
114    pub fn is_empty(&self) -> bool {
115        self.0.is_empty()
116    }
117
118    /// Returns the bounding range `[min, max]` across all row IDs in this sequence,
119    /// or `None` if the sequence contains no values.
120    ///
121    /// This is a conservative bounding box: a value falling within the returned range
122    /// is not guaranteed to exist in the sequence (segments may be sparse), but any
123    /// value that *does* exist is guaranteed to fall within the range.  This makes
124    /// the result suitable as a cheap pre-filter before a full scan.
125    pub fn row_id_range(&self) -> Option<RangeInclusive<u64>> {
126        let min = self
127            .0
128            .iter()
129            .filter_map(|s| s.range())
130            .map(|r| *r.start())
131            .min()?;
132        let max = self
133            .0
134            .iter()
135            .filter_map(|s| s.range())
136            .map(|r| *r.end())
137            .max()?;
138        Some(min..=max)
139    }
140
141    /// Combines this row id sequence with another row id sequence.
142    pub fn extend(&mut self, other: Self) {
143        // If the last element of this sequence and the first element of next
144        // sequence are ranges, we might be able to combine them into a single
145        // range.
146        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
147            (self.0.last(), other.0.first())
148            && range1.end == range2.start
149        {
150            let new_range = U64Segment::Range(range1.start..range2.end);
151            self.0.pop();
152            self.0.push(new_range);
153            self.0.extend(other.0.into_iter().skip(1));
154            return;
155        }
156        // TODO: add other optimizations, such as combining two RangeWithHoles.
157        self.0.extend(other.0);
158    }
159
160    /// Remove a set of row ids from the sequence.
161    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
162        // Order the row ids by position in which they appear in the sequence.
163        let (row_ids, offsets) = self.find_ids(row_ids);
164
165        let capacity = self.0.capacity();
166        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
167        let mut remaining_segments = old_segments.as_slice();
168
169        for (segment_idx, range) in offsets {
170            let segments_handled = old_segments.len() - remaining_segments.len();
171            let segments_to_add = segment_idx - segments_handled;
172            self.0
173                .extend_from_slice(&remaining_segments[..segments_to_add]);
174            remaining_segments = &remaining_segments[segments_to_add..];
175
176            let segment;
177            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
178
179            let segment_ids = &row_ids[range];
180            self.0.push(segment.delete(segment_ids));
181        }
182
183        // Add the remaining segments.
184        self.0.extend_from_slice(remaining_segments);
185    }
186
187    /// Delete row ids by position.
188    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
189        let mut local_positions = Vec::new();
190        let mut positions_iter = positions.into_iter();
191        let mut curr_position = positions_iter.next();
192        let mut offset = 0;
193        let mut cutoff = 0;
194
195        for segment in &mut self.0 {
196            // Make vector of local positions
197            cutoff += segment.len() as u32;
198            while let Some(position) = curr_position {
199                if position >= cutoff {
200                    break;
201                }
202                local_positions.push(position - offset);
203                curr_position = positions_iter.next();
204            }
205
206            if !local_positions.is_empty() {
207                segment.mask(&local_positions);
208                local_positions.clear();
209            }
210            offset = cutoff;
211        }
212
213        self.0.retain(|segment| !segment.is_empty());
214
215        Ok(())
216    }
217
218    /// Find the row ids in the sequence.
219    ///
220    /// Returns the row ids sorted by their appearance in the sequence.
221    /// Also returns the segment index and the range where that segment's
222    /// row id matches are found in the returned row id vector.
223    fn find_ids(
224        &self,
225        row_ids: impl IntoIterator<Item = u64>,
226    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
227        // Often, the row ids will already be provided in the order they appear.
228        // So the optimal way to search will be to cycle through rather than
229        // restarting the search from the beginning each time.
230        let mut segment_iter = self.0.iter().enumerate().cycle();
231
232        let mut segment_matches = vec![Vec::new(); self.0.len()];
233
234        row_ids.into_iter().for_each(|row_id| {
235            let mut i = 0;
236            // If we've cycled through all segments, we know the row id is not in the sequence.
237            while i < self.0.len() {
238                let (segment_idx, segment) = segment_iter.next().unwrap();
239                if segment.range().is_some_and(|range| range.contains(&row_id))
240                    && let Some(offset) = segment.position(row_id)
241                {
242                    segment_matches.get_mut(segment_idx).unwrap().push(offset);
243                    // The row id was not found it the segment. It might be in a later segment.
244                }
245                i += 1;
246            }
247        });
248        for matches in &mut segment_matches {
249            matches.sort_unstable();
250        }
251
252        let mut offset = 0;
253        let segment_ranges = segment_matches
254            .iter()
255            .enumerate()
256            .filter(|(_, matches)| !matches.is_empty())
257            .map(|(segment_idx, matches)| {
258                let range = offset..offset + matches.len();
259                offset += matches.len();
260                (segment_idx, range)
261            })
262            .collect();
263        let row_ids = segment_matches
264            .into_iter()
265            .enumerate()
266            .flat_map(|(segment_idx, offset)| {
267                offset
268                    .into_iter()
269                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
270            })
271            .collect();
272
273        (row_ids, segment_ranges)
274    }
275
276    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
277        if len == 0 {
278            return RowIdSeqSlice {
279                segments: &[],
280                offset_start: 0,
281                offset_last: 0,
282            };
283        }
284
285        // Find the starting position
286        let mut offset_start = offset;
287        let mut segment_offset = 0;
288        for segment in &self.0 {
289            let segment_len = segment.len();
290            if offset_start < segment_len {
291                break;
292            }
293            offset_start -= segment_len;
294            segment_offset += 1;
295        }
296
297        // Find the ending position
298        let mut offset_last = offset_start + len;
299        let mut segment_offset_last = segment_offset;
300        for segment in &self.0[segment_offset..] {
301            let segment_len = segment.len();
302            if offset_last <= segment_len {
303                break;
304            }
305            offset_last -= segment_len;
306            segment_offset_last += 1;
307        }
308
309        RowIdSeqSlice {
310            segments: &self.0[segment_offset..=segment_offset_last],
311            offset_start,
312            offset_last,
313        }
314    }
315
316    /// Get the row id at the given index.
317    ///
318    /// If the index is out of bounds, this will return None.
319    pub fn get(&self, index: usize) -> Option<u64> {
320        let mut offset = 0;
321        for segment in &self.0 {
322            let segment_len = segment.len();
323            if index < offset + segment_len {
324                return segment.get(index - offset);
325            }
326            offset += segment_len;
327        }
328        None
329    }
330
331    /// Get row ids from the sequence based on the provided _sorted_ offsets
332    ///
333    /// Any out of bounds offsets will be ignored
334    ///
335    /// # Panics
336    ///
337    /// If the input selection is not sorted, this function will panic
338    pub fn select<'a>(
339        &'a self,
340        selection: impl Iterator<Item = usize> + 'a,
341    ) -> impl Iterator<Item = u64> + 'a {
342        let mut seg_iter = self.0.iter();
343        let mut cur_seg = seg_iter.next();
344        let mut rows_passed = 0;
345        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
346        let mut last_index = 0;
347        selection.filter_map(move |index| {
348            if index < last_index {
349                panic!("Selection is not sorted");
350            }
351            last_index = index;
352
353            cur_seg?;
354
355            while (index - rows_passed) >= cur_seg_len {
356                rows_passed += cur_seg_len;
357                cur_seg = seg_iter.next();
358                if let Some(cur_seg) = cur_seg {
359                    cur_seg_len = cur_seg.len();
360                } else {
361                    return None;
362                }
363            }
364
365            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
366        })
367    }
368
369    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
370    /// in the sequence.
371    ///
372    /// For example, given a mask that selects all even ids and a sequence that is
373    /// [80..85, 86..90, 14]
374    ///
375    /// this will return [0, 2, 4, 5, 7, 9]
376    /// because the range expands to
377    ///
378    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
379    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
380    ///
381    /// This function is useful when determining which row offsets to read from a fragment given
382    /// a mask.
383    #[instrument(level = "debug", skip_all)]
384    pub fn mask_to_offset_ranges(&self, mask: &RowAddrMask) -> Vec<Range<u64>> {
385        let mut offset = 0;
386        let mut ranges = Vec::new();
387        for segment in &self.0 {
388            match segment {
389                U64Segment::Range(range) => {
390                    let mut ids = RowAddrTreeMap::from(range.clone());
391                    ids.mask(mask);
392                    // Range-aware path: walk the bitmap's runs directly via
393                    // iter_runs so the per-row cost collapses to per-run cost.
394                    // SAFETY: built from a u64 range; no Full entries possible.
395                    let mut cur: Option<Range<u64>> = None;
396                    for (fragment, run) in unsafe { ids.iter_runs() } {
397                        let frag = u64::from(fragment);
398                        let run_start = (frag << 32) | u64::from(*run.start());
399                        let run_end_excl = (frag << 32) | (u64::from(*run.end()) + 1);
400                        let start = run_start - range.start + offset;
401                        let end = run_end_excl - range.start + offset;
402                        match cur.as_mut() {
403                            Some(c) if c.end == start => c.end = end,
404                            Some(c) => {
405                                ranges.push(std::mem::replace(c, start..end));
406                            }
407                            None => cur = Some(start..end),
408                        }
409                    }
410                    if let Some(c) = cur {
411                        ranges.push(c);
412                    }
413                    offset += range.end - range.start;
414                }
415                U64Segment::RangeWithHoles { range, holes } => {
416                    let offset_start = offset;
417                    let mut ids = RowAddrTreeMap::from(range.clone());
418                    offset += range.end - range.start;
419                    for hole in holes.iter() {
420                        if ids.remove(hole) {
421                            offset -= 1;
422                        }
423                    }
424                    ids.mask(mask);
425
426                    // Sadly we can't just subtract the offset because of the holes
427                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
428                    sorted_holes.sort_unstable();
429                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
430                    let mut holes_passed = 0;
431                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
432                        |addr| {
433                            while let Some(next_hole) = next_holes_iter.peek() {
434                                if *next_hole < addr {
435                                    next_holes_iter.next();
436                                    holes_passed += 1;
437                                } else {
438                                    break;
439                                }
440                            }
441                            addr - range.start + offset_start - holes_passed
442                        },
443                    )));
444                }
445                U64Segment::RangeWithBitmap { range, bitmap } => {
446                    let mut ids = RowAddrTreeMap::from(range.clone());
447                    let offset_start = offset;
448                    offset += range.end - range.start;
449                    for (i, val) in range.clone().enumerate() {
450                        if !bitmap.get(i) && ids.remove(val) {
451                            offset -= 1;
452                        }
453                    }
454                    ids.mask(mask);
455                    let mut bitmap_iter = bitmap.iter();
456                    let mut bitmap_iter_pos = 0;
457                    let mut holes_passed = 0;
458                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
459                        |addr| {
460                            let position_in_range = addr - range.start;
461                            while bitmap_iter_pos < position_in_range {
462                                if !bitmap_iter.next().unwrap() {
463                                    holes_passed += 1;
464                                }
465                                bitmap_iter_pos += 1;
466                            }
467                            offset_start + position_in_range - holes_passed
468                        },
469                    )));
470                }
471                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
472                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
473                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
474                        |(off, id)| {
475                            if mask.selected(id) {
476                                Some(off as u64 + offset)
477                            } else {
478                                None
479                            }
480                        },
481                    )));
482                    offset += array.len() as u64;
483                }
484            }
485        }
486        ranges
487    }
488}
489
490/// An iterator that groups row ids into ranges
491///
492/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
493/// this will return an iterator of [(1..4), (5..8), (10..13)]
494struct GroupingIterator<I: Iterator<Item = u64>> {
495    iter: I,
496    cur_range: Option<Range<u64>>,
497}
498
499impl<I: Iterator<Item = u64>> GroupingIterator<I> {
500    fn new(iter: I) -> Self {
501        Self {
502            iter,
503            cur_range: None,
504        }
505    }
506}
507
508impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
509    type Item = Range<u64>;
510
511    fn next(&mut self) -> Option<Self::Item> {
512        for id in self.iter.by_ref() {
513            if let Some(range) = self.cur_range.as_mut() {
514                if range.end == id {
515                    range.end = id + 1;
516                } else {
517                    let ret = Some(range.clone());
518                    self.cur_range = Some(id..id + 1);
519                    return ret;
520                }
521            } else {
522                self.cur_range = Some(id..id + 1);
523            }
524        }
525        self.cur_range.take()
526    }
527}
528
529impl From<&RowIdSequence> for RowAddrTreeMap {
530    fn from(row_ids: &RowIdSequence) -> Self {
531        let mut tree_map = Self::new();
532        for segment in &row_ids.0 {
533            match segment {
534                U64Segment::Range(range) => {
535                    tree_map.insert_range(range.clone());
536                }
537                U64Segment::RangeWithBitmap { range, bitmap } => {
538                    tree_map.insert_range(range.clone());
539                    for (i, val) in range.clone().enumerate() {
540                        if !bitmap.get(i) {
541                            tree_map.remove(val);
542                        }
543                    }
544                }
545                U64Segment::RangeWithHoles { range, holes } => {
546                    tree_map.insert_range(range.clone());
547                    for hole in holes.iter() {
548                        tree_map.remove(hole);
549                    }
550                }
551                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
552                    for val in array.iter() {
553                        tree_map.insert(val);
554                    }
555                }
556            }
557        }
558        tree_map
559    }
560}
561
562#[derive(Debug)]
563pub struct RowIdSeqSlice<'a> {
564    /// Current slice of the segments we cover
565    segments: &'a [U64Segment],
566    /// Offset into the first segment to start iterating from
567    offset_start: usize,
568    /// Offset into the last segment to stop iterating at
569    offset_last: usize,
570}
571
572impl RowIdSeqSlice<'_> {
573    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
574        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
575        known_size -= self.offset_start;
576        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
577
578        let end = self.segments.len().saturating_sub(1);
579        self.segments
580            .iter()
581            .enumerate()
582            .flat_map(move |(i, segment)| {
583                match i {
584                    0 if self.segments.len() == 1 => {
585                        let len = self.offset_last - self.offset_start;
586                        // TODO: Optimize this so we don't have to use skip
587                        // (take is probably fine though.)
588                        Box::new(segment.iter().skip(self.offset_start).take(len))
589                            as Box<dyn Iterator<Item = u64>>
590                    }
591                    0 => Box::new(segment.iter().skip(self.offset_start)),
592                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
593                    _ => Box::new(segment.iter()),
594                }
595            })
596            .exact_size(known_size)
597    }
598}
599
600/// Re-chunk a sequences of row ids into chunks of a given size.
601///
602/// The sequences may less than chunk sizes, because the sequences only
603/// contains the row ids that we want to keep, they come from the updates records.
604/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
605/// So if the sequences are smaller than the chunk sizes, we need to
606/// assign the incremental row ids in the further step. This behavior is controlled by the
607/// `allow_incomplete` parameter.
608///
609/// # Errors
610///
611/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
612/// is not equal to the total number of row ids in the sequences.
613pub fn rechunk_sequences(
614    sequences: impl IntoIterator<Item = RowIdSequence>,
615    chunk_sizes: impl IntoIterator<Item = u64>,
616    allow_incomplete: bool,
617) -> Result<Vec<RowIdSequence>> {
618    // TODO: return an iterator. (with a good size hint?)
619    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
620    let total_chunks = chunk_sizes_vec.len();
621    let mut chunked_sequences = Vec::with_capacity(total_chunks);
622    let mut segment_iter = sequences
623        .into_iter()
624        .flat_map(|sequence| sequence.0.into_iter())
625        .peekable();
626
627    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
628        Error::invalid_input(format!(
629            "Got too few segments for chunk {}. Expected chunk size: {}, remaining needed: {}",
630            chunk_index, expected_chunk_size, remaining
631        ))
632    };
633
634    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
635        Error::invalid_input(format!(
636            "Got too many segments for the provided chunk lengths. Processed {} chunks out of {} expected",
637            processed_chunks, total_chunk_sizes
638        ))
639    };
640
641    let mut segment_offset = 0_u64;
642
643    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
644        let chunk_size = *chunk_size;
645        let mut sequence = RowIdSequence(Vec::new());
646        let mut remaining = chunk_size;
647
648        while remaining > 0 {
649            let remaining_in_segment = segment_iter
650                .peek()
651                .map_or(0, |segment| segment.len() as u64 - segment_offset);
652
653            // Step 1: Handle segment remaining to be empty(also empty seg) - skip and continue
654            if remaining_in_segment == 0 {
655                if segment_iter.next().is_some() {
656                    segment_offset = 0;
657                    continue;
658                } else {
659                    // No more segments available
660                    if allow_incomplete {
661                        break;
662                    } else {
663                        return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
664                    }
665                }
666            }
667
668            // Step 2: Handle still remaining segment based on size comparison
669            match remaining_in_segment.cmp(&remaining) {
670                std::cmp::Ordering::Greater => {
671                    // Segment is larger than remaining space - slice it
672                    let segment = segment_iter
673                        .peek()
674                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
675                        .slice(segment_offset as usize, remaining as usize);
676                    sequence.extend(RowIdSequence(vec![segment]));
677                    segment_offset += remaining;
678                    remaining = 0;
679                }
680                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
681                    // UNIFIED HANDLING: Both equal and less cases subtract from remaining
682                    // Equal case: remaining -= remaining_in_segment (remaining becomes 0)
683                    // Less case: remaining -= remaining_in_segment (remaining becomes positive)
684                    let segment = segment_iter
685                        .next()
686                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
687                        .slice(segment_offset as usize, remaining_in_segment as usize);
688                    sequence.extend(RowIdSequence(vec![segment]));
689                    segment_offset = 0;
690                    remaining -= remaining_in_segment;
691                }
692            }
693        }
694
695        chunked_sequences.push(sequence);
696    }
697
698    if segment_iter.peek().is_some() {
699        return Err(too_many_segments_error(
700            chunked_sequences.len(),
701            total_chunks,
702        ));
703    }
704
705    Ok(chunked_sequences)
706}
707
708/// Selects the row ids from a sequence based on the provided offsets.
709pub fn select_row_ids<'a>(
710    sequence: &'a RowIdSequence,
711    offsets: &'a ReadBatchParams,
712) -> Result<Vec<u64>> {
713    let out_of_bounds_err = |offset: u32| {
714        Error::invalid_input(format!(
715            "Index out of bounds: {} for sequence of length {}",
716            offset,
717            sequence.len()
718        ))
719    };
720
721    match offsets {
722        // TODO: Optimize this if indices are sorted, which is a common case.
723        ReadBatchParams::Indices(indices) => indices
724            .values()
725            .iter()
726            .map(|index| {
727                sequence
728                    .get(*index as usize)
729                    .ok_or_else(|| out_of_bounds_err(*index))
730            })
731            .collect(),
732        ReadBatchParams::Range(range) => {
733            if range.end > sequence.len() as usize {
734                return Err(out_of_bounds_err(range.end as u32));
735            }
736            let sequence = sequence.slice(range.start, range.end - range.start);
737            Ok(sequence.iter().collect())
738        }
739        ReadBatchParams::Ranges(ranges) => {
740            let num_rows = ranges
741                .iter()
742                .map(|r| (r.end - r.start) as usize)
743                .sum::<usize>();
744            let mut result = Vec::with_capacity(num_rows);
745            for range in ranges.as_ref() {
746                if range.end > sequence.len() {
747                    return Err(out_of_bounds_err(range.end as u32));
748                }
749                let sequence =
750                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
751                result.extend(sequence.iter());
752            }
753            Ok(result)
754        }
755
756        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
757        ReadBatchParams::RangeTo(to) => {
758            if to.end > sequence.len() as usize {
759                return Err(out_of_bounds_err(to.end as u32));
760            }
761            let len = to.end;
762            let sequence = sequence.slice(0, len);
763            Ok(sequence.iter().collect())
764        }
765        ReadBatchParams::RangeFrom(from) => {
766            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
767            Ok(sequence.iter().collect())
768        }
769    }
770}
771
772#[cfg(test)]
773mod test {
774    use super::*;
775
776    use pretty_assertions::assert_eq;
777    use test::bitmap::Bitmap;
778
779    #[test]
780    fn test_row_id_sequence_from_range() {
781        let sequence = RowIdSequence::from(0..10);
782        assert_eq!(sequence.len(), 10);
783        assert_eq!(sequence.is_empty(), false);
784
785        let iter = sequence.iter();
786        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
787    }
788
789    #[test]
790    fn test_row_id_sequence_extend() {
791        let mut sequence = RowIdSequence::from(0..10);
792        sequence.extend(RowIdSequence::from(10..20));
793        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
794
795        let mut sequence = RowIdSequence::from(0..10);
796        sequence.extend(RowIdSequence::from(20..30));
797        assert_eq!(
798            sequence.0,
799            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
800        );
801    }
802
803    #[test]
804    fn test_row_id_sequence_delete() {
805        let mut sequence = RowIdSequence::from(0..10);
806        sequence.delete(vec![1, 3, 5, 7, 9]);
807        let mut expected_bitmap = Bitmap::new_empty(9);
808        for i in [0, 2, 4, 6, 8] {
809            expected_bitmap.set(i as usize);
810        }
811        assert_eq!(
812            sequence.0,
813            vec![U64Segment::RangeWithBitmap {
814                range: 0..9,
815                bitmap: expected_bitmap
816            },]
817        );
818
819        let mut sequence = RowIdSequence::from(0..10);
820        sequence.extend(RowIdSequence::from(12..20));
821        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
822        assert_eq!(
823            sequence.0,
824            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
825        );
826
827        let mut sequence = RowIdSequence::from(0..10);
828        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
829        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
830    }
831
832    #[test]
833    fn test_row_id_slice() {
834        // The type of sequence isn't that relevant to the implementation, so
835        // we can just have a single one with all the segment types.
836        let sequence = RowIdSequence(vec![
837            U64Segment::Range(30..35), // 5
838            U64Segment::RangeWithHoles {
839                // 8
840                range: 50..60,
841                holes: vec![53, 54].into(),
842            },
843            U64Segment::SortedArray(vec![7, 9].into()), // 2
844            U64Segment::RangeWithBitmap {
845                range: 0..5,
846                bitmap: [true, false, true, false, true].as_slice().into(),
847            },
848            U64Segment::Array(vec![35, 39].into()),
849            U64Segment::Range(40..50),
850        ]);
851
852        // All possible offsets and lengths
853        for offset in 0..sequence.len() as usize {
854            for len in 0..sequence.len() as usize {
855                if offset + len > sequence.len() as usize {
856                    continue;
857                }
858                let slice = sequence.slice(offset, len);
859
860                let actual = slice.iter().collect::<Vec<_>>();
861                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
862                assert_eq!(
863                    actual, expected,
864                    "Failed for offset {} and len {}",
865                    offset, len
866                );
867
868                let (claimed_size, claimed_max) = slice.iter().size_hint();
869                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
870                assert_eq!(claimed_size, actual.len()); // Correct size hint
871            }
872        }
873    }
874
875    #[test]
876    fn test_row_id_slice_empty() {
877        let sequence = RowIdSequence::from(0..10);
878        let slice = sequence.slice(10, 0);
879        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
880    }
881
882    #[test]
883    fn test_row_id_sequence_rechunk() {
884        fn assert_rechunked(
885            input: Vec<RowIdSequence>,
886            chunk_sizes: Vec<u64>,
887            expected: Vec<RowIdSequence>,
888        ) {
889            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
890            assert_eq!(chunked, expected);
891        }
892
893        // Small pieces to larger ones
894        let many_segments = vec![
895            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
896            RowIdSequence::from(10..18),
897            RowIdSequence::from(18..28),
898            RowIdSequence::from(28..30),
899        ];
900        let fewer_segments = vec![
901            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
902            RowIdSequence::from(10..30),
903        ];
904        assert_rechunked(
905            many_segments.clone(),
906            fewer_segments.iter().map(|seq| seq.len()).collect(),
907            fewer_segments.clone(),
908        );
909
910        // Large pieces to smaller ones
911        assert_rechunked(
912            fewer_segments,
913            many_segments.iter().map(|seq| seq.len()).collect(),
914            many_segments.clone(),
915        );
916
917        // Equal pieces
918        assert_rechunked(
919            many_segments.clone(),
920            many_segments.iter().map(|seq| seq.len()).collect(),
921            many_segments.clone(),
922        );
923
924        // Too few segments -> error
925        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
926        assert!(result.is_err());
927
928        // Too many segments -> error
929        let result = rechunk_sequences(many_segments, vec![5], false);
930        assert!(result.is_err());
931    }
932
933    #[test]
934    fn test_select_row_ids() {
935        // All forms of offsets
936        let offsets = [
937            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
938            ReadBatchParams::Range(2..8),
939            ReadBatchParams::RangeFull,
940            ReadBatchParams::RangeTo(..5),
941            ReadBatchParams::RangeFrom(5..),
942            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
943        ];
944
945        // Sequences with all segment types. These have at least 10 elements,
946        // so they are valid for all the above offsets.
947        let sequences = [
948            RowIdSequence(vec![
949                U64Segment::Range(0..5),
950                U64Segment::RangeWithHoles {
951                    range: 50..60,
952                    holes: vec![53, 54].into(),
953                },
954                U64Segment::SortedArray(vec![7, 9].into()),
955            ]),
956            RowIdSequence(vec![
957                U64Segment::RangeWithBitmap {
958                    range: 0..5,
959                    bitmap: [true, false, true, false, true].as_slice().into(),
960                },
961                U64Segment::Array(vec![30, 20, 10].into()),
962                U64Segment::Range(40..50),
963            ]),
964        ];
965
966        for params in offsets {
967            for sequence in &sequences {
968                let row_ids = select_row_ids(sequence, &params).unwrap();
969                let flat_sequence = sequence.iter().collect::<Vec<_>>();
970
971                // Transform params into bounded ones
972                let selection: Vec<usize> = match &params {
973                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
974                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
975                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
976                    ReadBatchParams::Range(range) => range.clone().collect(),
977                    ReadBatchParams::Ranges(ranges) => ranges
978                        .iter()
979                        .flat_map(|r| r.start as usize..r.end as usize)
980                        .collect(),
981                    ReadBatchParams::Indices(indices) => {
982                        indices.values().iter().map(|i| *i as usize).collect()
983                    }
984                };
985
986                let expected = selection
987                    .into_iter()
988                    .map(|i| flat_sequence[i])
989                    .collect::<Vec<_>>();
990                assert_eq!(
991                    row_ids, expected,
992                    "Failed for params {:?} on the sequence {:?}",
993                    &params, sequence
994                );
995            }
996        }
997    }
998
999    #[test]
1000    fn test_select_row_ids_out_of_bounds() {
1001        let offsets = [
1002            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
1003            ReadBatchParams::Range(2..1000),
1004            ReadBatchParams::RangeTo(..1000),
1005        ];
1006
1007        let sequence = RowIdSequence::from(0..10);
1008
1009        for params in offsets {
1010            let result = select_row_ids(&sequence, &params);
1011            assert!(result.is_err());
1012            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
1013        }
1014    }
1015
1016    #[test]
1017    fn test_row_id_sequence_to_treemap() {
1018        let sequence = RowIdSequence(vec![
1019            U64Segment::Range(0..5),
1020            U64Segment::RangeWithHoles {
1021                range: 50..60,
1022                holes: vec![53, 54].into(),
1023            },
1024            U64Segment::SortedArray(vec![7, 9].into()),
1025            U64Segment::RangeWithBitmap {
1026                range: 10..15,
1027                bitmap: [true, false, true, false, true].as_slice().into(),
1028            },
1029            U64Segment::Array(vec![35, 39].into()),
1030            U64Segment::Range(40..50),
1031        ]);
1032
1033        let tree_map = RowAddrTreeMap::from(&sequence);
1034        let expected = vec![
1035            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
1036            51, 52, 55, 56, 57, 58, 59,
1037        ]
1038        .into_iter()
1039        .collect::<RowAddrTreeMap>();
1040        assert_eq!(tree_map, expected);
1041    }
1042
1043    #[test]
1044    fn test_row_addr_mask() {
1045        // 0, 1, 2, 3, 4
1046        // 50, 51, 52, 55, 56, 57, 58, 59
1047        // 7, 9
1048        // 10, 12, 14
1049        // 35, 39
1050        let sequence = RowIdSequence(vec![
1051            U64Segment::Range(0..5),
1052            U64Segment::RangeWithHoles {
1053                range: 50..60,
1054                holes: vec![53, 54].into(),
1055            },
1056            U64Segment::SortedArray(vec![7, 9].into()),
1057            U64Segment::RangeWithBitmap {
1058                range: 10..15,
1059                bitmap: [true, false, true, false, true].as_slice().into(),
1060            },
1061            U64Segment::Array(vec![35, 39].into()),
1062        ]);
1063
1064        // Masking one in each segment
1065        let values_to_remove = [4, 55, 7, 12, 39];
1066        let positions_to_remove = sequence
1067            .iter()
1068            .enumerate()
1069            .filter_map(|(i, val)| {
1070                if values_to_remove.contains(&val) {
1071                    Some(i as u32)
1072                } else {
1073                    None
1074                }
1075            })
1076            .collect::<Vec<_>>();
1077        let mut sequence = sequence;
1078        sequence.mask(positions_to_remove).unwrap();
1079        let expected = RowIdSequence(vec![
1080            U64Segment::Range(0..4),
1081            U64Segment::RangeWithBitmap {
1082                range: 50..60,
1083                bitmap: [
1084                    true, true, true, false, false, false, true, true, true, true,
1085                ]
1086                .as_slice()
1087                .into(),
1088            },
1089            U64Segment::Range(9..10),
1090            U64Segment::RangeWithBitmap {
1091                range: 10..15,
1092                bitmap: [true, false, false, false, true].as_slice().into(),
1093            },
1094            U64Segment::Array(vec![35].into()),
1095        ]);
1096        assert_eq!(sequence, expected);
1097    }
1098
1099    #[test]
1100    fn test_row_addr_mask_everything() {
1101        let mut sequence = RowIdSequence(vec![
1102            U64Segment::Range(0..5),
1103            U64Segment::SortedArray(vec![7, 9].into()),
1104        ]);
1105        sequence.mask(0..sequence.len() as u32).unwrap();
1106        let expected = RowIdSequence(vec![]);
1107        assert_eq!(sequence, expected);
1108    }
1109
1110    #[test]
1111    fn test_selection() {
1112        let sequence = RowIdSequence(vec![
1113            U64Segment::Range(0..5),
1114            U64Segment::Range(10..15),
1115            U64Segment::Range(20..25),
1116        ]);
1117        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1118        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1119    }
1120
1121    #[test]
1122    #[should_panic]
1123    fn test_selection_unsorted() {
1124        let sequence = RowIdSequence(vec![
1125            U64Segment::Range(0..5),
1126            U64Segment::Range(10..15),
1127            U64Segment::Range(20..25),
1128        ]);
1129        let _ = sequence
1130            .select(vec![2, 4, 3].into_iter())
1131            .collect::<Vec<_>>();
1132    }
1133
1134    #[test]
1135    fn test_mask_to_offset_ranges() {
1136        // Tests with a simple range segment
1137        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1138        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1139        let ranges = sequence.mask_to_offset_ranges(&mask);
1140        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1141
1142        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1143        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[54]));
1144        let ranges = sequence.mask_to_offset_ranges(&mask);
1145        assert_eq!(ranges, vec![14..15]);
1146
1147        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1148        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[54]));
1149        let ranges = sequence.mask_to_offset_ranges(&mask);
1150        assert_eq!(ranges, vec![0..14, 15..20]);
1151
1152        // Test with a range segment with holes
1153        // 0, 1, 3, 4, 5, 7, 8, 9
1154        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1155            range: 0..10,
1156            holes: vec![2, 6].into(),
1157        }]);
1158        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1159        let ranges = sequence.mask_to_offset_ranges(&mask);
1160        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1161
1162        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1163            range: 40..60,
1164            holes: vec![47, 43].into(),
1165        }]);
1166        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1167        let ranges = sequence.mask_to_offset_ranges(&mask);
1168        assert_eq!(ranges, vec![3..4]);
1169
1170        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1171            range: 40..60,
1172            holes: vec![47, 43].into(),
1173        }]);
1174        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1175        let ranges = sequence.mask_to_offset_ranges(&mask);
1176        assert_eq!(ranges, vec![0..3, 4..18]);
1177
1178        // Test with a range segment with bitmap
1179        // 0, 1, 4, 5, 6, 7
1180        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1181            range: 0..10,
1182            bitmap: [
1183                true, true, false, false, true, true, true, true, false, false,
1184            ]
1185            .as_slice()
1186            .into(),
1187        }]);
1188        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1189        let ranges = sequence.mask_to_offset_ranges(&mask);
1190        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1191
1192        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1193            range: 40..45,
1194            bitmap: [true, true, false, false, true].as_slice().into(),
1195        }]);
1196        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1197        let ranges = sequence.mask_to_offset_ranges(&mask);
1198        assert_eq!(ranges, vec![2..3]);
1199
1200        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1201            range: 40..45,
1202            bitmap: [true, true, false, false, true].as_slice().into(),
1203        }]);
1204        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1205        let ranges = sequence.mask_to_offset_ranges(&mask);
1206        assert_eq!(ranges, vec![0..2]);
1207
1208        // Test with a sorted array segment
1209        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1210        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1211        let ranges = sequence.mask_to_offset_ranges(&mask);
1212        assert_eq!(ranges, vec![0..1, 3..5]);
1213
1214        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1215        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1216        let ranges = sequence.mask_to_offset_ranges(&mask);
1217        assert_eq!(ranges, vec![0..1, 2..4]);
1218
1219        // Test with multiple segments
1220        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1221        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1222        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1223        let sequence = RowIdSequence(vec![
1224            U64Segment::Range(0..5),
1225            U64Segment::RangeWithHoles {
1226                range: 100..105,
1227                holes: vec![103].into(),
1228            },
1229            U64Segment::SortedArray(vec![44, 46, 78].into()),
1230        ]);
1231        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1232        let ranges = sequence.mask_to_offset_ranges(&mask);
1233        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1234
1235        // Test with empty mask (should select everything)
1236        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1237        let mask = RowAddrMask::default();
1238        let ranges = sequence.mask_to_offset_ranges(&mask);
1239        assert_eq!(ranges, vec![0..10]);
1240
1241        // Test with allow nothing mask
1242        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1243        let mask = RowAddrMask::allow_nothing();
1244        let ranges = sequence.mask_to_offset_ranges(&mask);
1245        assert_eq!(ranges, vec![]);
1246    }
1247
1248    #[test]
1249    fn test_row_id_sequence_rechunk_with_empty_segments() {
1250        // equal case (segment exactly fills remaining space)
1251        let input_sequences = vec![
1252            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1253            RowIdSequence::from(20..23), // [20, 21, 22] - 3 elements
1254        ];
1255        let chunk_sizes = vec![2, 3]; // First chunk wants 2, second wants 3
1256
1257        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1258        assert_eq!(result.len(), 2);
1259        assert_eq!(result[0].len(), 2);
1260        assert_eq!(result[1].len(), 3);
1261
1262        let first_chunk: Vec<u64> = result[0].iter().collect();
1263        let second_chunk: Vec<u64> = result[1].iter().collect();
1264        assert_eq!(first_chunk, vec![0, 1]);
1265        assert_eq!(second_chunk, vec![20, 21, 22]);
1266
1267        // less case (segment smaller than remaining space)
1268        let input_sequences = vec![
1269            RowIdSequence::from(0..2),   // [0, 1] - 2 elements (less than remaining)
1270            RowIdSequence::from(20..21), // [20] - 1 element (less than remaining)
1271            RowIdSequence::from(30..32), // [30, 31] - 2 elements (exactly fills remaining)
1272        ];
1273        let chunk_sizes = vec![5]; // Request 5 elements, have exactly 5
1274
1275        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1276        assert_eq!(result.len(), 1);
1277        assert_eq!(result[0].len(), 5);
1278
1279        let elements: Vec<u64> = result[0].iter().collect();
1280        assert_eq!(elements, vec![0, 1, 20, 30, 31]);
1281
1282        // empty segment in the middle
1283        let input_sequences = vec![
1284            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1285            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1286            RowIdSequence::from(20..22), // [20, 21] - 2 elements
1287        ];
1288        let chunk_sizes = vec![3, 1];
1289        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1290
1291        assert_eq!(result.len(), 2);
1292        assert_eq!(result[0].len(), 3);
1293        assert_eq!(result[1].len(), 1);
1294
1295        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1296        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1297        assert_eq!(first_chunk_elements, vec![0, 1, 20]);
1298        assert_eq!(second_chunk_elements, vec![21]);
1299
1300        // multiple empty segments
1301        let input_sequences = vec![
1302            RowIdSequence::from(0..1),   // [0] - 1 element
1303            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1304            RowIdSequence::from(20..20), // [] - 0 elements (empty)
1305            RowIdSequence::from(30..32), // [30, 31] - 2 elements
1306        ];
1307        let chunk_sizes = vec![3];
1308        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1309
1310        assert_eq!(result.len(), 1);
1311        assert_eq!(result[0].len(), 3);
1312
1313        let elements: Vec<u64> = result[0].iter().collect();
1314        assert_eq!(elements, vec![0, 30, 31]);
1315
1316        // empty segment at chunk boundary
1317        let input_sequences = vec![
1318            RowIdSequence::from(0..3), // [0, 1, 2] - 3 elements (exactly fills first chunk)
1319            RowIdSequence::from(10..10), // [] - 0 elements (empty, at boundary)
1320            RowIdSequence::from(20..22), // [20, 21] - 2 elements (for second chunk)
1321        ];
1322        let chunk_sizes = vec![3, 2];
1323        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1324
1325        assert_eq!(result.len(), 2);
1326        assert_eq!(result[0].len(), 3);
1327        assert_eq!(result[1].len(), 2);
1328
1329        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1330        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1331        assert_eq!(first_chunk_elements, vec![0, 1, 2]);
1332        assert_eq!(second_chunk_elements, vec![20, 21]);
1333
1334        // empty segments with allow_incomplete = true
1335        let input_sequences = vec![
1336            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1337            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1338        ];
1339        let chunk_sizes = vec![5]; // Request more than available
1340        let result = rechunk_sequences(input_sequences, chunk_sizes, true).unwrap();
1341
1342        assert_eq!(result.len(), 1);
1343        assert_eq!(result[0].len(), 2);
1344
1345        let elements: Vec<u64> = result[0].iter().collect();
1346        assert_eq!(elements, vec![0, 1]);
1347    }
1348
1349    #[test]
1350    fn test_row_id_range_empty() {
1351        let seq = RowIdSequence::from(0u64..0);
1352        assert_eq!(seq.row_id_range(), None);
1353    }
1354
1355    #[test]
1356    fn test_row_id_range_single_contiguous() {
1357        let seq = RowIdSequence::from(10u64..20);
1358        assert_eq!(seq.row_id_range(), Some(10..=19));
1359    }
1360
1361    #[test]
1362    fn test_row_id_range_unsorted_array() {
1363        // Array variant: range() returns min..=max as bounding box
1364        let seq = RowIdSequence::from([50u64, 10, 30].as_slice());
1365        let r = seq.row_id_range().unwrap();
1366        assert!(*r.start() <= 10);
1367        assert!(*r.end() >= 50);
1368    }
1369
1370    #[test]
1371    fn test_row_id_range_multi_segment() {
1372        // Two disjoint ranges; bounding box should span both
1373        let mut seq = RowIdSequence::from(0u64..5);
1374        seq.extend(RowIdSequence::from(100u64..105));
1375        let r = seq.row_id_range().unwrap();
1376        assert_eq!(*r.start(), 0);
1377        assert_eq!(*r.end(), 104);
1378    }
1379}