Skip to main content

lance_table/
rowids.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3//! Indices for mapping row ids to their corresponding addresses.
4//!
5//! Each fragment in a table has a [RowIdSequence] that contains the row ids
6//! in the order they appear in the fragment. The [RowIdIndex] aggregates these
7//! sequences and maps row ids to their corresponding addresses across the
8//! whole dataset.
9//!
10//! [RowIdSequence]s are serialized individually and stored in the fragment
11//! metadata. Use [read_row_ids] and [write_row_ids] to read and write these
12//! sequences. The on-disk format is designed to align well with the in-memory
13//! representation, to avoid unnecessary deserialization.
14use std::ops::{Range, RangeInclusive};
15// TODO: replace this with Arrow BooleanBuffer.
16
17// These are all internal data structures, and are private.
18mod bitmap;
19mod encoded_array;
20mod index;
21pub mod segment;
22mod serde;
23pub mod version;
24
25use deepsize::DeepSizeOf;
26// These are the public API.
27pub use index::FragmentRowIdIndex;
28pub use index::RowIdIndex;
29use lance_core::{
30    Error, Result,
31    utils::mask::{RowAddrMask, RowAddrTreeMap},
32};
33use lance_io::ReadBatchParams;
34pub use serde::{read_row_ids, write_row_ids};
35
36use crate::utils::LanceIteratorExtension;
37use lance_core::utils::mask::RowSetOps;
38use segment::U64Segment;
39use tracing::instrument;
40
41/// A sequence of row ids.
42///
43/// Row ids are u64s that:
44///
45/// 1. Are **unique** within a table (except for tombstones)
46/// 2. Are *often* but not always sorted and/or contiguous.
47///
48/// This sequence of row ids is optimized to be compact when the row ids are
49/// contiguous and sorted. However, it does not require that the row ids are
50/// contiguous or sorted.
51///
52/// We can make optimizations that assume uniqueness.
53#[derive(Debug, Clone, DeepSizeOf, PartialEq, Eq, Default)]
54pub struct RowIdSequence(Vec<U64Segment>);
55
56impl std::fmt::Display for RowIdSequence {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        let mut iter = self.iter();
59        let mut first_10 = Vec::new();
60        let mut last_10 = Vec::new();
61        for row_id in iter.by_ref() {
62            first_10.push(row_id);
63            if first_10.len() > 10 {
64                break;
65            }
66        }
67
68        while let Some(row_id) = iter.next_back() {
69            last_10.push(row_id);
70            if last_10.len() > 10 {
71                break;
72            }
73        }
74        last_10.reverse();
75
76        let theres_more = iter.next().is_some();
77
78        write!(f, "[")?;
79        for row_id in first_10 {
80            write!(f, "{}", row_id)?;
81        }
82        if theres_more {
83            write!(f, ", ...")?;
84        }
85        for row_id in last_10 {
86            write!(f, ", {}", row_id)?;
87        }
88        write!(f, "]")
89    }
90}
91
92impl From<Range<u64>> for RowIdSequence {
93    fn from(range: Range<u64>) -> Self {
94        Self(vec![U64Segment::Range(range)])
95    }
96}
97
98impl From<&[u64]> for RowIdSequence {
99    fn from(row_ids: &[u64]) -> Self {
100        Self(vec![U64Segment::from_slice(row_ids)])
101    }
102}
103
104impl RowIdSequence {
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    pub fn iter(&self) -> impl DoubleEndedIterator<Item = u64> + '_ {
110        self.0.iter().flat_map(|segment| segment.iter())
111    }
112
113    pub fn len(&self) -> u64 {
114        self.0.iter().map(|segment| segment.len() as u64).sum()
115    }
116
117    pub fn is_empty(&self) -> bool {
118        self.0.is_empty()
119    }
120
121    /// Returns the bounding range `[min, max]` across all row IDs in this sequence,
122    /// or `None` if the sequence contains no values.
123    ///
124    /// This is a conservative bounding box: a value falling within the returned range
125    /// is not guaranteed to exist in the sequence (segments may be sparse), but any
126    /// value that *does* exist is guaranteed to fall within the range.  This makes
127    /// the result suitable as a cheap pre-filter before a full scan.
128    pub fn row_id_range(&self) -> Option<RangeInclusive<u64>> {
129        let min = self
130            .0
131            .iter()
132            .filter_map(|s| s.range())
133            .map(|r| *r.start())
134            .min()?;
135        let max = self
136            .0
137            .iter()
138            .filter_map(|s| s.range())
139            .map(|r| *r.end())
140            .max()?;
141        Some(min..=max)
142    }
143
144    /// Combines this row id sequence with another row id sequence.
145    pub fn extend(&mut self, other: Self) {
146        // If the last element of this sequence and the first element of next
147        // sequence are ranges, we might be able to combine them into a single
148        // range.
149        if let (Some(U64Segment::Range(range1)), Some(U64Segment::Range(range2))) =
150            (self.0.last(), other.0.first())
151            && range1.end == range2.start
152        {
153            let new_range = U64Segment::Range(range1.start..range2.end);
154            self.0.pop();
155            self.0.push(new_range);
156            self.0.extend(other.0.into_iter().skip(1));
157            return;
158        }
159        // TODO: add other optimizations, such as combining two RangeWithHoles.
160        self.0.extend(other.0);
161    }
162
163    /// Remove a set of row ids from the sequence.
164    pub fn delete(&mut self, row_ids: impl IntoIterator<Item = u64>) {
165        // Order the row ids by position in which they appear in the sequence.
166        let (row_ids, offsets) = self.find_ids(row_ids);
167
168        let capacity = self.0.capacity();
169        let old_segments = std::mem::replace(&mut self.0, Vec::with_capacity(capacity));
170        let mut remaining_segments = old_segments.as_slice();
171
172        for (segment_idx, range) in offsets {
173            let segments_handled = old_segments.len() - remaining_segments.len();
174            let segments_to_add = segment_idx - segments_handled;
175            self.0
176                .extend_from_slice(&remaining_segments[..segments_to_add]);
177            remaining_segments = &remaining_segments[segments_to_add..];
178
179            let segment;
180            (segment, remaining_segments) = remaining_segments.split_first().unwrap();
181
182            let segment_ids = &row_ids[range];
183            self.0.push(segment.delete(segment_ids));
184        }
185
186        // Add the remaining segments.
187        self.0.extend_from_slice(remaining_segments);
188    }
189
190    /// Delete row ids by position.
191    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
192        let mut local_positions = Vec::new();
193        let mut positions_iter = positions.into_iter();
194        let mut curr_position = positions_iter.next();
195        let mut offset = 0;
196        let mut cutoff = 0;
197
198        for segment in &mut self.0 {
199            // Make vector of local positions
200            cutoff += segment.len() as u32;
201            while let Some(position) = curr_position {
202                if position >= cutoff {
203                    break;
204                }
205                local_positions.push(position - offset);
206                curr_position = positions_iter.next();
207            }
208
209            if !local_positions.is_empty() {
210                segment.mask(&local_positions);
211                local_positions.clear();
212            }
213            offset = cutoff;
214        }
215
216        self.0.retain(|segment| !segment.is_empty());
217
218        Ok(())
219    }
220
221    /// Find the row ids in the sequence.
222    ///
223    /// Returns the row ids sorted by their appearance in the sequence.
224    /// Also returns the segment index and the range where that segment's
225    /// row id matches are found in the returned row id vector.
226    fn find_ids(
227        &self,
228        row_ids: impl IntoIterator<Item = u64>,
229    ) -> (Vec<u64>, Vec<(usize, Range<usize>)>) {
230        // Often, the row ids will already be provided in the order they appear.
231        // So the optimal way to search will be to cycle through rather than
232        // restarting the search from the beginning each time.
233        let mut segment_iter = self.0.iter().enumerate().cycle();
234
235        let mut segment_matches = vec![Vec::new(); self.0.len()];
236
237        row_ids.into_iter().for_each(|row_id| {
238            let mut i = 0;
239            // If we've cycled through all segments, we know the row id is not in the sequence.
240            while i < self.0.len() {
241                let (segment_idx, segment) = segment_iter.next().unwrap();
242                if segment.range().is_some_and(|range| range.contains(&row_id))
243                    && let Some(offset) = segment.position(row_id)
244                {
245                    segment_matches.get_mut(segment_idx).unwrap().push(offset);
246                    // The row id was not found it the segment. It might be in a later segment.
247                }
248                i += 1;
249            }
250        });
251        for matches in &mut segment_matches {
252            matches.sort_unstable();
253        }
254
255        let mut offset = 0;
256        let segment_ranges = segment_matches
257            .iter()
258            .enumerate()
259            .filter(|(_, matches)| !matches.is_empty())
260            .map(|(segment_idx, matches)| {
261                let range = offset..offset + matches.len();
262                offset += matches.len();
263                (segment_idx, range)
264            })
265            .collect();
266        let row_ids = segment_matches
267            .into_iter()
268            .enumerate()
269            .flat_map(|(segment_idx, offset)| {
270                offset
271                    .into_iter()
272                    .map(move |offset| self.0[segment_idx].get(offset).unwrap())
273            })
274            .collect();
275
276        (row_ids, segment_ranges)
277    }
278
279    pub fn slice(&self, offset: usize, len: usize) -> RowIdSeqSlice<'_> {
280        if len == 0 {
281            return RowIdSeqSlice {
282                segments: &[],
283                offset_start: 0,
284                offset_last: 0,
285            };
286        }
287
288        // Find the starting position
289        let mut offset_start = offset;
290        let mut segment_offset = 0;
291        for segment in &self.0 {
292            let segment_len = segment.len();
293            if offset_start < segment_len {
294                break;
295            }
296            offset_start -= segment_len;
297            segment_offset += 1;
298        }
299
300        // Find the ending position
301        let mut offset_last = offset_start + len;
302        let mut segment_offset_last = segment_offset;
303        for segment in &self.0[segment_offset..] {
304            let segment_len = segment.len();
305            if offset_last <= segment_len {
306                break;
307            }
308            offset_last -= segment_len;
309            segment_offset_last += 1;
310        }
311
312        RowIdSeqSlice {
313            segments: &self.0[segment_offset..=segment_offset_last],
314            offset_start,
315            offset_last,
316        }
317    }
318
319    /// Get the row id at the given index.
320    ///
321    /// If the index is out of bounds, this will return None.
322    pub fn get(&self, index: usize) -> Option<u64> {
323        let mut offset = 0;
324        for segment in &self.0 {
325            let segment_len = segment.len();
326            if index < offset + segment_len {
327                return segment.get(index - offset);
328            }
329            offset += segment_len;
330        }
331        None
332    }
333
334    /// Get row ids from the sequence based on the provided _sorted_ offsets
335    ///
336    /// Any out of bounds offsets will be ignored
337    ///
338    /// # Panics
339    ///
340    /// If the input selection is not sorted, this function will panic
341    pub fn select<'a>(
342        &'a self,
343        selection: impl Iterator<Item = usize> + 'a,
344    ) -> impl Iterator<Item = u64> + 'a {
345        let mut seg_iter = self.0.iter();
346        let mut cur_seg = seg_iter.next();
347        let mut rows_passed = 0;
348        let mut cur_seg_len = cur_seg.map(|seg| seg.len()).unwrap_or(0);
349        let mut last_index = 0;
350        selection.filter_map(move |index| {
351            if index < last_index {
352                panic!("Selection is not sorted");
353            }
354            last_index = index;
355
356            cur_seg?;
357
358            while (index - rows_passed) >= cur_seg_len {
359                rows_passed += cur_seg_len;
360                cur_seg = seg_iter.next();
361                if let Some(cur_seg) = cur_seg {
362                    cur_seg_len = cur_seg.len();
363                } else {
364                    return None;
365                }
366            }
367
368            Some(cur_seg.unwrap().get(index - rows_passed).unwrap())
369        })
370    }
371
372    /// Given a mask of row ids, calculate the offset ranges of the row ids that are present
373    /// in the sequence.
374    ///
375    /// For example, given a mask that selects all even ids and a sequence that is
376    /// [80..85, 86..90, 14]
377    ///
378    /// this will return [0, 2, 4, 5, 7, 9]
379    /// because the range expands to
380    ///
381    /// [80, 81, 82, 83, 84, 86, 87, 88, 89, 14] with offsets
382    /// [ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9]
383    ///
384    /// This function is useful when determining which row offsets to read from a fragment given
385    /// a mask.
386    #[instrument(level = "debug", skip_all)]
387    pub fn mask_to_offset_ranges(&self, mask: &RowAddrMask) -> Vec<Range<u64>> {
388        let mut offset = 0;
389        let mut ranges = Vec::new();
390        for segment in &self.0 {
391            match segment {
392                U64Segment::Range(range) => {
393                    let mut ids = RowAddrTreeMap::from(range.clone());
394                    ids.mask(mask);
395                    ranges.extend(GroupingIterator::new(
396                        unsafe { ids.into_addr_iter() }.map(|addr| addr - range.start + offset),
397                    ));
398                    offset += range.end - range.start;
399                }
400                U64Segment::RangeWithHoles { range, holes } => {
401                    let offset_start = offset;
402                    let mut ids = RowAddrTreeMap::from(range.clone());
403                    offset += range.end - range.start;
404                    for hole in holes.iter() {
405                        if ids.remove(hole) {
406                            offset -= 1;
407                        }
408                    }
409                    ids.mask(mask);
410
411                    // Sadly we can't just subtract the offset because of the holes
412                    let mut sorted_holes = holes.clone().into_iter().collect::<Vec<_>>();
413                    sorted_holes.sort_unstable();
414                    let mut next_holes_iter = sorted_holes.into_iter().peekable();
415                    let mut holes_passed = 0;
416                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
417                        |addr| {
418                            while let Some(next_hole) = next_holes_iter.peek() {
419                                if *next_hole < addr {
420                                    next_holes_iter.next();
421                                    holes_passed += 1;
422                                } else {
423                                    break;
424                                }
425                            }
426                            addr - range.start + offset_start - holes_passed
427                        },
428                    )));
429                }
430                U64Segment::RangeWithBitmap { range, bitmap } => {
431                    let mut ids = RowAddrTreeMap::from(range.clone());
432                    let offset_start = offset;
433                    offset += range.end - range.start;
434                    for (i, val) in range.clone().enumerate() {
435                        if !bitmap.get(i) && ids.remove(val) {
436                            offset -= 1;
437                        }
438                    }
439                    ids.mask(mask);
440                    let mut bitmap_iter = bitmap.iter();
441                    let mut bitmap_iter_pos = 0;
442                    let mut holes_passed = 0;
443                    ranges.extend(GroupingIterator::new(unsafe { ids.into_addr_iter() }.map(
444                        |addr| {
445                            let position_in_range = addr - range.start;
446                            while bitmap_iter_pos < position_in_range {
447                                if !bitmap_iter.next().unwrap() {
448                                    holes_passed += 1;
449                                }
450                                bitmap_iter_pos += 1;
451                            }
452                            offset_start + position_in_range - holes_passed
453                        },
454                    )));
455                }
456                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
457                    // TODO: Could probably optimize the sorted array case to be O(N) instead of O(N log N)
458                    ranges.extend(GroupingIterator::new(array.iter().enumerate().filter_map(
459                        |(off, id)| {
460                            if mask.selected(id) {
461                                Some(off as u64 + offset)
462                            } else {
463                                None
464                            }
465                        },
466                    )));
467                    offset += array.len() as u64;
468                }
469            }
470        }
471        ranges
472    }
473}
474
475/// An iterator that groups row ids into ranges
476///
477/// For example, given an input iterator of [1, 2, 3, 5, 6, 7, 10, 11, 12]
478/// this will return an iterator of [(1..4), (5..8), (10..13)]
479struct GroupingIterator<I: Iterator<Item = u64>> {
480    iter: I,
481    cur_range: Option<Range<u64>>,
482}
483
484impl<I: Iterator<Item = u64>> GroupingIterator<I> {
485    fn new(iter: I) -> Self {
486        Self {
487            iter,
488            cur_range: None,
489        }
490    }
491}
492
493impl<I: Iterator<Item = u64>> Iterator for GroupingIterator<I> {
494    type Item = Range<u64>;
495
496    fn next(&mut self) -> Option<Self::Item> {
497        for id in self.iter.by_ref() {
498            if let Some(range) = self.cur_range.as_mut() {
499                if range.end == id {
500                    range.end = id + 1;
501                } else {
502                    let ret = Some(range.clone());
503                    self.cur_range = Some(id..id + 1);
504                    return ret;
505                }
506            } else {
507                self.cur_range = Some(id..id + 1);
508            }
509        }
510        self.cur_range.take()
511    }
512}
513
514impl From<&RowIdSequence> for RowAddrTreeMap {
515    fn from(row_ids: &RowIdSequence) -> Self {
516        let mut tree_map = Self::new();
517        for segment in &row_ids.0 {
518            match segment {
519                U64Segment::Range(range) => {
520                    tree_map.insert_range(range.clone());
521                }
522                U64Segment::RangeWithBitmap { range, bitmap } => {
523                    tree_map.insert_range(range.clone());
524                    for (i, val) in range.clone().enumerate() {
525                        if !bitmap.get(i) {
526                            tree_map.remove(val);
527                        }
528                    }
529                }
530                U64Segment::RangeWithHoles { range, holes } => {
531                    tree_map.insert_range(range.clone());
532                    for hole in holes.iter() {
533                        tree_map.remove(hole);
534                    }
535                }
536                U64Segment::SortedArray(array) | U64Segment::Array(array) => {
537                    for val in array.iter() {
538                        tree_map.insert(val);
539                    }
540                }
541            }
542        }
543        tree_map
544    }
545}
546
547#[derive(Debug)]
548pub struct RowIdSeqSlice<'a> {
549    /// Current slice of the segments we cover
550    segments: &'a [U64Segment],
551    /// Offset into the first segment to start iterating from
552    offset_start: usize,
553    /// Offset into the last segment to stop iterating at
554    offset_last: usize,
555}
556
557impl RowIdSeqSlice<'_> {
558    pub fn iter(&self) -> impl Iterator<Item = u64> + '_ {
559        let mut known_size = self.segments.iter().map(|segment| segment.len()).sum();
560        known_size -= self.offset_start;
561        known_size -= self.segments.last().map(|s| s.len()).unwrap_or_default() - self.offset_last;
562
563        let end = self.segments.len().saturating_sub(1);
564        self.segments
565            .iter()
566            .enumerate()
567            .flat_map(move |(i, segment)| {
568                match i {
569                    0 if self.segments.len() == 1 => {
570                        let len = self.offset_last - self.offset_start;
571                        // TODO: Optimize this so we don't have to use skip
572                        // (take is probably fine though.)
573                        Box::new(segment.iter().skip(self.offset_start).take(len))
574                            as Box<dyn Iterator<Item = u64>>
575                    }
576                    0 => Box::new(segment.iter().skip(self.offset_start)),
577                    i if i == end => Box::new(segment.iter().take(self.offset_last)),
578                    _ => Box::new(segment.iter()),
579                }
580            })
581            .exact_size(known_size)
582    }
583}
584
585/// Re-chunk a sequences of row ids into chunks of a given size.
586///
587/// The sequences may less than chunk sizes, because the sequences only
588/// contains the row ids that we want to keep, they come from the updates records.
589/// But the chunk sizes are based on the fragment physical rows(may contain inserted records).
590/// So if the sequences are smaller than the chunk sizes, we need to
591/// assign the incremental row ids in the further step. This behavior is controlled by the
592/// `allow_incomplete` parameter.
593///
594/// # Errors
595///
596/// If `allow_incomplete` is false, will return an error if the sum of the chunk sizes
597/// is not equal to the total number of row ids in the sequences.
598pub fn rechunk_sequences(
599    sequences: impl IntoIterator<Item = RowIdSequence>,
600    chunk_sizes: impl IntoIterator<Item = u64>,
601    allow_incomplete: bool,
602) -> Result<Vec<RowIdSequence>> {
603    // TODO: return an iterator. (with a good size hint?)
604    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
605    let total_chunks = chunk_sizes_vec.len();
606    let mut chunked_sequences = Vec::with_capacity(total_chunks);
607    let mut segment_iter = sequences
608        .into_iter()
609        .flat_map(|sequence| sequence.0.into_iter())
610        .peekable();
611
612    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
613        Error::invalid_input(format!(
614            "Got too few segments for chunk {}. Expected chunk size: {}, remaining needed: {}",
615            chunk_index, expected_chunk_size, remaining
616        ))
617    };
618
619    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
620        Error::invalid_input(format!(
621            "Got too many segments for the provided chunk lengths. Processed {} chunks out of {} expected",
622            processed_chunks, total_chunk_sizes
623        ))
624    };
625
626    let mut segment_offset = 0_u64;
627
628    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
629        let chunk_size = *chunk_size;
630        let mut sequence = RowIdSequence(Vec::new());
631        let mut remaining = chunk_size;
632
633        while remaining > 0 {
634            let remaining_in_segment = segment_iter
635                .peek()
636                .map_or(0, |segment| segment.len() as u64 - segment_offset);
637
638            // Step 1: Handle segment remaining to be empty(also empty seg) - skip and continue
639            if remaining_in_segment == 0 {
640                if segment_iter.next().is_some() {
641                    segment_offset = 0;
642                    continue;
643                } else {
644                    // No more segments available
645                    if allow_incomplete {
646                        break;
647                    } else {
648                        return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
649                    }
650                }
651            }
652
653            // Step 2: Handle still remaining segment based on size comparison
654            match remaining_in_segment.cmp(&remaining) {
655                std::cmp::Ordering::Greater => {
656                    // Segment is larger than remaining space - slice it
657                    let segment = segment_iter
658                        .peek()
659                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
660                        .slice(segment_offset as usize, remaining as usize);
661                    sequence.extend(RowIdSequence(vec![segment]));
662                    segment_offset += remaining;
663                    remaining = 0;
664                }
665                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
666                    // UNIFIED HANDLING: Both equal and less cases subtract from remaining
667                    // Equal case: remaining -= remaining_in_segment (remaining becomes 0)
668                    // Less case: remaining -= remaining_in_segment (remaining becomes positive)
669                    let segment = segment_iter
670                        .next()
671                        .ok_or_else(|| too_few_segments_error(chunk_index, chunk_size, remaining))?
672                        .slice(segment_offset as usize, remaining_in_segment as usize);
673                    sequence.extend(RowIdSequence(vec![segment]));
674                    segment_offset = 0;
675                    remaining -= remaining_in_segment;
676                }
677            }
678        }
679
680        chunked_sequences.push(sequence);
681    }
682
683    if segment_iter.peek().is_some() {
684        return Err(too_many_segments_error(
685            chunked_sequences.len(),
686            total_chunks,
687        ));
688    }
689
690    Ok(chunked_sequences)
691}
692
693/// Selects the row ids from a sequence based on the provided offsets.
694pub fn select_row_ids<'a>(
695    sequence: &'a RowIdSequence,
696    offsets: &'a ReadBatchParams,
697) -> Result<Vec<u64>> {
698    let out_of_bounds_err = |offset: u32| {
699        Error::invalid_input(format!(
700            "Index out of bounds: {} for sequence of length {}",
701            offset,
702            sequence.len()
703        ))
704    };
705
706    match offsets {
707        // TODO: Optimize this if indices are sorted, which is a common case.
708        ReadBatchParams::Indices(indices) => indices
709            .values()
710            .iter()
711            .map(|index| {
712                sequence
713                    .get(*index as usize)
714                    .ok_or_else(|| out_of_bounds_err(*index))
715            })
716            .collect(),
717        ReadBatchParams::Range(range) => {
718            if range.end > sequence.len() as usize {
719                return Err(out_of_bounds_err(range.end as u32));
720            }
721            let sequence = sequence.slice(range.start, range.end - range.start);
722            Ok(sequence.iter().collect())
723        }
724        ReadBatchParams::Ranges(ranges) => {
725            let num_rows = ranges
726                .iter()
727                .map(|r| (r.end - r.start) as usize)
728                .sum::<usize>();
729            let mut result = Vec::with_capacity(num_rows);
730            for range in ranges.as_ref() {
731                if range.end > sequence.len() {
732                    return Err(out_of_bounds_err(range.end as u32));
733                }
734                let sequence =
735                    sequence.slice(range.start as usize, (range.end - range.start) as usize);
736                result.extend(sequence.iter());
737            }
738            Ok(result)
739        }
740
741        ReadBatchParams::RangeFull => Ok(sequence.iter().collect()),
742        ReadBatchParams::RangeTo(to) => {
743            if to.end > sequence.len() as usize {
744                return Err(out_of_bounds_err(to.end as u32));
745            }
746            let len = to.end;
747            let sequence = sequence.slice(0, len);
748            Ok(sequence.iter().collect())
749        }
750        ReadBatchParams::RangeFrom(from) => {
751            let sequence = sequence.slice(from.start, sequence.len() as usize - from.start);
752            Ok(sequence.iter().collect())
753        }
754    }
755}
756
757#[cfg(test)]
758mod test {
759    use super::*;
760
761    use pretty_assertions::assert_eq;
762    use test::bitmap::Bitmap;
763
764    #[test]
765    fn test_row_id_sequence_from_range() {
766        let sequence = RowIdSequence::from(0..10);
767        assert_eq!(sequence.len(), 10);
768        assert_eq!(sequence.is_empty(), false);
769
770        let iter = sequence.iter();
771        assert_eq!(iter.collect::<Vec<_>>(), (0..10).collect::<Vec<_>>());
772    }
773
774    #[test]
775    fn test_row_id_sequence_extend() {
776        let mut sequence = RowIdSequence::from(0..10);
777        sequence.extend(RowIdSequence::from(10..20));
778        assert_eq!(sequence.0, vec![U64Segment::Range(0..20)]);
779
780        let mut sequence = RowIdSequence::from(0..10);
781        sequence.extend(RowIdSequence::from(20..30));
782        assert_eq!(
783            sequence.0,
784            vec![U64Segment::Range(0..10), U64Segment::Range(20..30)]
785        );
786    }
787
788    #[test]
789    fn test_row_id_sequence_delete() {
790        let mut sequence = RowIdSequence::from(0..10);
791        sequence.delete(vec![1, 3, 5, 7, 9]);
792        let mut expected_bitmap = Bitmap::new_empty(9);
793        for i in [0, 2, 4, 6, 8] {
794            expected_bitmap.set(i as usize);
795        }
796        assert_eq!(
797            sequence.0,
798            vec![U64Segment::RangeWithBitmap {
799                range: 0..9,
800                bitmap: expected_bitmap
801            },]
802        );
803
804        let mut sequence = RowIdSequence::from(0..10);
805        sequence.extend(RowIdSequence::from(12..20));
806        sequence.delete(vec![0, 9, 10, 11, 12, 13]);
807        assert_eq!(
808            sequence.0,
809            vec![U64Segment::Range(1..9), U64Segment::Range(14..20),]
810        );
811
812        let mut sequence = RowIdSequence::from(0..10);
813        sequence.delete(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
814        assert_eq!(sequence.0, vec![U64Segment::Range(0..0)]);
815    }
816
817    #[test]
818    fn test_row_id_slice() {
819        // The type of sequence isn't that relevant to the implementation, so
820        // we can just have a single one with all the segment types.
821        let sequence = RowIdSequence(vec![
822            U64Segment::Range(30..35), // 5
823            U64Segment::RangeWithHoles {
824                // 8
825                range: 50..60,
826                holes: vec![53, 54].into(),
827            },
828            U64Segment::SortedArray(vec![7, 9].into()), // 2
829            U64Segment::RangeWithBitmap {
830                range: 0..5,
831                bitmap: [true, false, true, false, true].as_slice().into(),
832            },
833            U64Segment::Array(vec![35, 39].into()),
834            U64Segment::Range(40..50),
835        ]);
836
837        // All possible offsets and lengths
838        for offset in 0..sequence.len() as usize {
839            for len in 0..sequence.len() as usize {
840                if offset + len > sequence.len() as usize {
841                    continue;
842                }
843                let slice = sequence.slice(offset, len);
844
845                let actual = slice.iter().collect::<Vec<_>>();
846                let expected = sequence.iter().skip(offset).take(len).collect::<Vec<_>>();
847                assert_eq!(
848                    actual, expected,
849                    "Failed for offset {} and len {}",
850                    offset, len
851                );
852
853                let (claimed_size, claimed_max) = slice.iter().size_hint();
854                assert_eq!(claimed_max, Some(claimed_size)); // Exact size hint
855                assert_eq!(claimed_size, actual.len()); // Correct size hint
856            }
857        }
858    }
859
860    #[test]
861    fn test_row_id_slice_empty() {
862        let sequence = RowIdSequence::from(0..10);
863        let slice = sequence.slice(10, 0);
864        assert_eq!(slice.iter().collect::<Vec<_>>(), Vec::<u64>::new());
865    }
866
867    #[test]
868    fn test_row_id_sequence_rechunk() {
869        fn assert_rechunked(
870            input: Vec<RowIdSequence>,
871            chunk_sizes: Vec<u64>,
872            expected: Vec<RowIdSequence>,
873        ) {
874            let chunked = rechunk_sequences(input, chunk_sizes, false).unwrap();
875            assert_eq!(chunked, expected);
876        }
877
878        // Small pieces to larger ones
879        let many_segments = vec![
880            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
881            RowIdSequence::from(10..18),
882            RowIdSequence::from(18..28),
883            RowIdSequence::from(28..30),
884        ];
885        let fewer_segments = vec![
886            RowIdSequence(vec![U64Segment::Range(0..5), U64Segment::Range(35..40)]),
887            RowIdSequence::from(10..30),
888        ];
889        assert_rechunked(
890            many_segments.clone(),
891            fewer_segments.iter().map(|seq| seq.len()).collect(),
892            fewer_segments.clone(),
893        );
894
895        // Large pieces to smaller ones
896        assert_rechunked(
897            fewer_segments,
898            many_segments.iter().map(|seq| seq.len()).collect(),
899            many_segments.clone(),
900        );
901
902        // Equal pieces
903        assert_rechunked(
904            many_segments.clone(),
905            many_segments.iter().map(|seq| seq.len()).collect(),
906            many_segments.clone(),
907        );
908
909        // Too few segments -> error
910        let result = rechunk_sequences(many_segments.clone(), vec![100], false);
911        assert!(result.is_err());
912
913        // Too many segments -> error
914        let result = rechunk_sequences(many_segments, vec![5], false);
915        assert!(result.is_err());
916    }
917
918    #[test]
919    fn test_select_row_ids() {
920        // All forms of offsets
921        let offsets = [
922            ReadBatchParams::Indices(vec![1, 3, 9, 5, 7, 6].into()),
923            ReadBatchParams::Range(2..8),
924            ReadBatchParams::RangeFull,
925            ReadBatchParams::RangeTo(..5),
926            ReadBatchParams::RangeFrom(5..),
927            ReadBatchParams::Ranges(vec![2..3, 5..10].into()),
928        ];
929
930        // Sequences with all segment types. These have at least 10 elements,
931        // so they are valid for all the above offsets.
932        let sequences = [
933            RowIdSequence(vec![
934                U64Segment::Range(0..5),
935                U64Segment::RangeWithHoles {
936                    range: 50..60,
937                    holes: vec![53, 54].into(),
938                },
939                U64Segment::SortedArray(vec![7, 9].into()),
940            ]),
941            RowIdSequence(vec![
942                U64Segment::RangeWithBitmap {
943                    range: 0..5,
944                    bitmap: [true, false, true, false, true].as_slice().into(),
945                },
946                U64Segment::Array(vec![30, 20, 10].into()),
947                U64Segment::Range(40..50),
948            ]),
949        ];
950
951        for params in offsets {
952            for sequence in &sequences {
953                let row_ids = select_row_ids(sequence, &params).unwrap();
954                let flat_sequence = sequence.iter().collect::<Vec<_>>();
955
956                // Transform params into bounded ones
957                let selection: Vec<usize> = match &params {
958                    ReadBatchParams::RangeFull => (0..flat_sequence.len()).collect(),
959                    ReadBatchParams::RangeTo(to) => (0..to.end).collect(),
960                    ReadBatchParams::RangeFrom(from) => (from.start..flat_sequence.len()).collect(),
961                    ReadBatchParams::Range(range) => range.clone().collect(),
962                    ReadBatchParams::Ranges(ranges) => ranges
963                        .iter()
964                        .flat_map(|r| r.start as usize..r.end as usize)
965                        .collect(),
966                    ReadBatchParams::Indices(indices) => {
967                        indices.values().iter().map(|i| *i as usize).collect()
968                    }
969                };
970
971                let expected = selection
972                    .into_iter()
973                    .map(|i| flat_sequence[i])
974                    .collect::<Vec<_>>();
975                assert_eq!(
976                    row_ids, expected,
977                    "Failed for params {:?} on the sequence {:?}",
978                    &params, sequence
979                );
980            }
981        }
982    }
983
984    #[test]
985    fn test_select_row_ids_out_of_bounds() {
986        let offsets = [
987            ReadBatchParams::Indices(vec![1, 1000, 4].into()),
988            ReadBatchParams::Range(2..1000),
989            ReadBatchParams::RangeTo(..1000),
990        ];
991
992        let sequence = RowIdSequence::from(0..10);
993
994        for params in offsets {
995            let result = select_row_ids(&sequence, &params);
996            assert!(result.is_err());
997            assert!(matches!(result.unwrap_err(), Error::InvalidInput { .. }));
998        }
999    }
1000
1001    #[test]
1002    fn test_row_id_sequence_to_treemap() {
1003        let sequence = RowIdSequence(vec![
1004            U64Segment::Range(0..5),
1005            U64Segment::RangeWithHoles {
1006                range: 50..60,
1007                holes: vec![53, 54].into(),
1008            },
1009            U64Segment::SortedArray(vec![7, 9].into()),
1010            U64Segment::RangeWithBitmap {
1011                range: 10..15,
1012                bitmap: [true, false, true, false, true].as_slice().into(),
1013            },
1014            U64Segment::Array(vec![35, 39].into()),
1015            U64Segment::Range(40..50),
1016        ]);
1017
1018        let tree_map = RowAddrTreeMap::from(&sequence);
1019        let expected = vec![
1020            0, 1, 2, 3, 4, 7, 9, 10, 12, 14, 35, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50,
1021            51, 52, 55, 56, 57, 58, 59,
1022        ]
1023        .into_iter()
1024        .collect::<RowAddrTreeMap>();
1025        assert_eq!(tree_map, expected);
1026    }
1027
1028    #[test]
1029    fn test_row_addr_mask() {
1030        // 0, 1, 2, 3, 4
1031        // 50, 51, 52, 55, 56, 57, 58, 59
1032        // 7, 9
1033        // 10, 12, 14
1034        // 35, 39
1035        let sequence = RowIdSequence(vec![
1036            U64Segment::Range(0..5),
1037            U64Segment::RangeWithHoles {
1038                range: 50..60,
1039                holes: vec![53, 54].into(),
1040            },
1041            U64Segment::SortedArray(vec![7, 9].into()),
1042            U64Segment::RangeWithBitmap {
1043                range: 10..15,
1044                bitmap: [true, false, true, false, true].as_slice().into(),
1045            },
1046            U64Segment::Array(vec![35, 39].into()),
1047        ]);
1048
1049        // Masking one in each segment
1050        let values_to_remove = [4, 55, 7, 12, 39];
1051        let positions_to_remove = sequence
1052            .iter()
1053            .enumerate()
1054            .filter_map(|(i, val)| {
1055                if values_to_remove.contains(&val) {
1056                    Some(i as u32)
1057                } else {
1058                    None
1059                }
1060            })
1061            .collect::<Vec<_>>();
1062        let mut sequence = sequence;
1063        sequence.mask(positions_to_remove).unwrap();
1064        let expected = RowIdSequence(vec![
1065            U64Segment::Range(0..4),
1066            U64Segment::RangeWithBitmap {
1067                range: 50..60,
1068                bitmap: [
1069                    true, true, true, false, false, false, true, true, true, true,
1070                ]
1071                .as_slice()
1072                .into(),
1073            },
1074            U64Segment::Range(9..10),
1075            U64Segment::RangeWithBitmap {
1076                range: 10..15,
1077                bitmap: [true, false, false, false, true].as_slice().into(),
1078            },
1079            U64Segment::Array(vec![35].into()),
1080        ]);
1081        assert_eq!(sequence, expected);
1082    }
1083
1084    #[test]
1085    fn test_row_addr_mask_everything() {
1086        let mut sequence = RowIdSequence(vec![
1087            U64Segment::Range(0..5),
1088            U64Segment::SortedArray(vec![7, 9].into()),
1089        ]);
1090        sequence.mask(0..sequence.len() as u32).unwrap();
1091        let expected = RowIdSequence(vec![]);
1092        assert_eq!(sequence, expected);
1093    }
1094
1095    #[test]
1096    fn test_selection() {
1097        let sequence = RowIdSequence(vec![
1098            U64Segment::Range(0..5),
1099            U64Segment::Range(10..15),
1100            U64Segment::Range(20..25),
1101        ]);
1102        let selection = sequence.select(vec![2, 4, 13, 14, 57].into_iter());
1103        assert_eq!(selection.collect::<Vec<_>>(), vec![2, 4, 23, 24]);
1104    }
1105
1106    #[test]
1107    #[should_panic]
1108    fn test_selection_unsorted() {
1109        let sequence = RowIdSequence(vec![
1110            U64Segment::Range(0..5),
1111            U64Segment::Range(10..15),
1112            U64Segment::Range(20..25),
1113        ]);
1114        let _ = sequence
1115            .select(vec![2, 4, 3].into_iter())
1116            .collect::<Vec<_>>();
1117    }
1118
1119    #[test]
1120    fn test_mask_to_offset_ranges() {
1121        // Tests with a simple range segment
1122        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1123        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1124        let ranges = sequence.mask_to_offset_ranges(&mask);
1125        assert_eq!(ranges, vec![0..1, 2..3, 4..5, 6..7, 8..9]);
1126
1127        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1128        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[54]));
1129        let ranges = sequence.mask_to_offset_ranges(&mask);
1130        assert_eq!(ranges, vec![14..15]);
1131
1132        let sequence = RowIdSequence(vec![U64Segment::Range(40..60)]);
1133        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[54]));
1134        let ranges = sequence.mask_to_offset_ranges(&mask);
1135        assert_eq!(ranges, vec![0..14, 15..20]);
1136
1137        // Test with a range segment with holes
1138        // 0, 1, 3, 4, 5, 7, 8, 9
1139        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1140            range: 0..10,
1141            holes: vec![2, 6].into(),
1142        }]);
1143        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1144        let ranges = sequence.mask_to_offset_ranges(&mask);
1145        assert_eq!(ranges, vec![0..1, 3..4, 6..7]);
1146
1147        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1148            range: 40..60,
1149            holes: vec![47, 43].into(),
1150        }]);
1151        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1152        let ranges = sequence.mask_to_offset_ranges(&mask);
1153        assert_eq!(ranges, vec![3..4]);
1154
1155        let sequence = RowIdSequence(vec![U64Segment::RangeWithHoles {
1156            range: 40..60,
1157            holes: vec![47, 43].into(),
1158        }]);
1159        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1160        let ranges = sequence.mask_to_offset_ranges(&mask);
1161        assert_eq!(ranges, vec![0..3, 4..18]);
1162
1163        // Test with a range segment with bitmap
1164        // 0, 1, 4, 5, 6, 7
1165        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1166            range: 0..10,
1167            bitmap: [
1168                true, true, false, false, true, true, true, true, false, false,
1169            ]
1170            .as_slice()
1171            .into(),
1172        }]);
1173        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 4, 6, 8]));
1174        let ranges = sequence.mask_to_offset_ranges(&mask);
1175        assert_eq!(ranges, vec![0..1, 2..3, 4..5]);
1176
1177        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1178            range: 40..45,
1179            bitmap: [true, true, false, false, true].as_slice().into(),
1180        }]);
1181        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[44]));
1182        let ranges = sequence.mask_to_offset_ranges(&mask);
1183        assert_eq!(ranges, vec![2..3]);
1184
1185        let sequence = RowIdSequence(vec![U64Segment::RangeWithBitmap {
1186            range: 40..45,
1187            bitmap: [true, true, false, false, true].as_slice().into(),
1188        }]);
1189        let mask = RowAddrMask::from_block(RowAddrTreeMap::from_iter(&[44]));
1190        let ranges = sequence.mask_to_offset_ranges(&mask);
1191        assert_eq!(ranges, vec![0..2]);
1192
1193        // Test with a sorted array segment
1194        let sequence = RowIdSequence(vec![U64Segment::SortedArray(vec![0, 2, 4, 6, 8].into())]);
1195        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1196        let ranges = sequence.mask_to_offset_ranges(&mask);
1197        assert_eq!(ranges, vec![0..1, 3..5]);
1198
1199        let sequence = RowIdSequence(vec![U64Segment::Array(vec![8, 2, 6, 0, 4].into())]);
1200        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 6, 8]));
1201        let ranges = sequence.mask_to_offset_ranges(&mask);
1202        assert_eq!(ranges, vec![0..1, 2..4]);
1203
1204        // Test with multiple segments
1205        // 0, 1, 2, 3, 4, 100, 101, 102, 104, 44, 46, 78
1206        // *, -, *, -, -, ***, ---, ---, ***, --, **, --
1207        // 0, 1, 2, 3, 4,   5,   6,   7,   8,  9, 10, 11
1208        let sequence = RowIdSequence(vec![
1209            U64Segment::Range(0..5),
1210            U64Segment::RangeWithHoles {
1211                range: 100..105,
1212                holes: vec![103].into(),
1213            },
1214            U64Segment::SortedArray(vec![44, 46, 78].into()),
1215        ]);
1216        let mask = RowAddrMask::from_allowed(RowAddrTreeMap::from_iter(&[0, 2, 46, 100, 104]));
1217        let ranges = sequence.mask_to_offset_ranges(&mask);
1218        assert_eq!(ranges, vec![0..1, 2..3, 5..6, 8..9, 10..11]);
1219
1220        // Test with empty mask (should select everything)
1221        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1222        let mask = RowAddrMask::default();
1223        let ranges = sequence.mask_to_offset_ranges(&mask);
1224        assert_eq!(ranges, vec![0..10]);
1225
1226        // Test with allow nothing mask
1227        let sequence = RowIdSequence(vec![U64Segment::Range(0..10)]);
1228        let mask = RowAddrMask::allow_nothing();
1229        let ranges = sequence.mask_to_offset_ranges(&mask);
1230        assert_eq!(ranges, vec![]);
1231    }
1232
1233    #[test]
1234    fn test_row_id_sequence_rechunk_with_empty_segments() {
1235        // equal case (segment exactly fills remaining space)
1236        let input_sequences = vec![
1237            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1238            RowIdSequence::from(20..23), // [20, 21, 22] - 3 elements
1239        ];
1240        let chunk_sizes = vec![2, 3]; // First chunk wants 2, second wants 3
1241
1242        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1243        assert_eq!(result.len(), 2);
1244        assert_eq!(result[0].len(), 2);
1245        assert_eq!(result[1].len(), 3);
1246
1247        let first_chunk: Vec<u64> = result[0].iter().collect();
1248        let second_chunk: Vec<u64> = result[1].iter().collect();
1249        assert_eq!(first_chunk, vec![0, 1]);
1250        assert_eq!(second_chunk, vec![20, 21, 22]);
1251
1252        // less case (segment smaller than remaining space)
1253        let input_sequences = vec![
1254            RowIdSequence::from(0..2),   // [0, 1] - 2 elements (less than remaining)
1255            RowIdSequence::from(20..21), // [20] - 1 element (less than remaining)
1256            RowIdSequence::from(30..32), // [30, 31] - 2 elements (exactly fills remaining)
1257        ];
1258        let chunk_sizes = vec![5]; // Request 5 elements, have exactly 5
1259
1260        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1261        assert_eq!(result.len(), 1);
1262        assert_eq!(result[0].len(), 5);
1263
1264        let elements: Vec<u64> = result[0].iter().collect();
1265        assert_eq!(elements, vec![0, 1, 20, 30, 31]);
1266
1267        // empty segment in the middle
1268        let input_sequences = vec![
1269            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1270            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1271            RowIdSequence::from(20..22), // [20, 21] - 2 elements
1272        ];
1273        let chunk_sizes = vec![3, 1];
1274        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1275
1276        assert_eq!(result.len(), 2);
1277        assert_eq!(result[0].len(), 3);
1278        assert_eq!(result[1].len(), 1);
1279
1280        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1281        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1282        assert_eq!(first_chunk_elements, vec![0, 1, 20]);
1283        assert_eq!(second_chunk_elements, vec![21]);
1284
1285        // multiple empty segments
1286        let input_sequences = vec![
1287            RowIdSequence::from(0..1),   // [0] - 1 element
1288            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1289            RowIdSequence::from(20..20), // [] - 0 elements (empty)
1290            RowIdSequence::from(30..32), // [30, 31] - 2 elements
1291        ];
1292        let chunk_sizes = vec![3];
1293        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1294
1295        assert_eq!(result.len(), 1);
1296        assert_eq!(result[0].len(), 3);
1297
1298        let elements: Vec<u64> = result[0].iter().collect();
1299        assert_eq!(elements, vec![0, 30, 31]);
1300
1301        // empty segment at chunk boundary
1302        let input_sequences = vec![
1303            RowIdSequence::from(0..3), // [0, 1, 2] - 3 elements (exactly fills first chunk)
1304            RowIdSequence::from(10..10), // [] - 0 elements (empty, at boundary)
1305            RowIdSequence::from(20..22), // [20, 21] - 2 elements (for second chunk)
1306        ];
1307        let chunk_sizes = vec![3, 2];
1308        let result = rechunk_sequences(input_sequences, chunk_sizes, false).unwrap();
1309
1310        assert_eq!(result.len(), 2);
1311        assert_eq!(result[0].len(), 3);
1312        assert_eq!(result[1].len(), 2);
1313
1314        let first_chunk_elements: Vec<u64> = result[0].iter().collect();
1315        let second_chunk_elements: Vec<u64> = result[1].iter().collect();
1316        assert_eq!(first_chunk_elements, vec![0, 1, 2]);
1317        assert_eq!(second_chunk_elements, vec![20, 21]);
1318
1319        // empty segments with allow_incomplete = true
1320        let input_sequences = vec![
1321            RowIdSequence::from(0..2),   // [0, 1] - 2 elements
1322            RowIdSequence::from(10..10), // [] - 0 elements (empty)
1323        ];
1324        let chunk_sizes = vec![5]; // Request more than available
1325        let result = rechunk_sequences(input_sequences, chunk_sizes, true).unwrap();
1326
1327        assert_eq!(result.len(), 1);
1328        assert_eq!(result[0].len(), 2);
1329
1330        let elements: Vec<u64> = result[0].iter().collect();
1331        assert_eq!(elements, vec![0, 1]);
1332    }
1333
1334    #[test]
1335    fn test_row_id_range_empty() {
1336        let seq = RowIdSequence::from(0u64..0);
1337        assert_eq!(seq.row_id_range(), None);
1338    }
1339
1340    #[test]
1341    fn test_row_id_range_single_contiguous() {
1342        let seq = RowIdSequence::from(10u64..20);
1343        assert_eq!(seq.row_id_range(), Some(10..=19));
1344    }
1345
1346    #[test]
1347    fn test_row_id_range_unsorted_array() {
1348        // Array variant: range() returns min..=max as bounding box
1349        let seq = RowIdSequence::from([50u64, 10, 30].as_slice());
1350        let r = seq.row_id_range().unwrap();
1351        assert!(*r.start() <= 10);
1352        assert!(*r.end() >= 50);
1353    }
1354
1355    #[test]
1356    fn test_row_id_range_multi_segment() {
1357        // Two disjoint ranges; bounding box should span both
1358        let mut seq = RowIdSequence::from(0u64..5);
1359        seq.extend(RowIdSequence::from(100u64..105));
1360        let r = seq.row_id_range().unwrap();
1361        assert_eq!(*r.start(), 0);
1362        assert_eq!(*r.end(), 104);
1363    }
1364}