Skip to main content

lance_table/rowids/
version.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Row version tracking for cross-version diff functionality
5//!
6//! This module provides data structures and functionality to track the latest
7//! update version for each row in a Lance dataset, enabling efficient
8//! cross-version diff operations.
9
10use std::sync::Arc;
11
12use deepsize::DeepSizeOf;
13use lance_core::Error;
14use lance_core::Result;
15use prost::Message;
16use serde::de::Deserializer;
17use serde::ser::Serializer;
18use serde::{Deserialize, Serialize};
19
20use crate::format::{ExternalFile, Fragment, pb};
21use crate::rowids::segment::U64Segment;
22use crate::rowids::{RowIdSequence, read_row_ids};
23
24/// A run of identical versions over a contiguous span of row positions.
25///
26/// Span is expressed as a U64Segment over row offsets (0..N within a fragment),
27/// not over row IDs. This keeps the encoding aligned with RowIdSequence order
28/// and enables zipped iteration without building a map.
29#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
30pub struct RowDatasetVersionRun {
31    pub span: U64Segment,
32    pub version: u64,
33}
34
35impl RowDatasetVersionRun {
36    /// Number of rows covered by this run.
37    pub fn len(&self) -> usize {
38        self.span.len()
39    }
40
41    /// Whether this run covers no rows.
42    pub fn is_empty(&self) -> bool {
43        self.span.is_empty()
44    }
45
46    /// The version value of this run.
47    pub fn version(&self) -> u64 {
48        self.version
49    }
50}
51
52/// Sequence of dataset versions
53///
54/// Stores version runs aligned to the positional order of RowIdSequence.
55/// Provides sequential iterators and optional lightweight indexing for
56/// efficient random access.
57#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
58pub struct RowDatasetVersionSequence {
59    pub runs: Vec<RowDatasetVersionRun>,
60}
61
62impl RowDatasetVersionSequence {
63    /// Create a new empty version sequence
64    pub fn new() -> Self {
65        Self { runs: Vec::new() }
66    }
67
68    /// Create a version sequence with a single uniform run of `row_count` rows.
69    pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
70        if row_count == 0 {
71            return Self::new();
72        }
73        let run = RowDatasetVersionRun {
74            span: U64Segment::Range(0..row_count),
75            version,
76        };
77        Self { runs: vec![run] }
78    }
79
80    /// Number of rows tracked by this sequence (sum of run lengths).
81    pub fn len(&self) -> u64 {
82        self.runs.iter().map(|s| s.len() as u64).sum()
83    }
84
85    /// Empty if there are no runs or all runs are empty.
86    pub fn is_empty(&self) -> bool {
87        self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
88    }
89
90    /// Returns a forward iterator over versions, expanding runs lazily.
91    pub fn versions(&self) -> VersionsIter<'_> {
92        VersionsIter::new(&self.runs)
93    }
94
95    /// Random access: get the version at global row position `index`.
96    pub fn version_at(&self, index: usize) -> Option<u64> {
97        let mut offset = 0usize;
98        for run in &self.runs {
99            let len = run.len();
100            if index < offset + len {
101                return Some(run.version());
102            }
103            offset += len;
104        }
105        None
106    }
107
108    /// Get the version associated with a specific row id.
109    /// This reconstructs the positional offset from RowIdSequence and then
110    /// performs `version_at` lookup.
111    pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
112        let mut offset = 0usize;
113        for seg in &row_ids.0 {
114            if seg.range().is_some_and(|r| r.contains(&row_id))
115                && let Some(local) = seg.position(row_id)
116            {
117                return self.version_at(offset + local);
118            }
119            offset += seg.len();
120        }
121        None
122    }
123
124    /// Convenience: collect row IDs with version strictly greater than `threshold`.
125    pub fn rows_with_version_greater_than(
126        &self,
127        row_ids: &RowIdSequence,
128        threshold: u64,
129    ) -> Vec<u64> {
130        row_ids
131            .iter()
132            .zip(self.versions())
133            .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
134            .collect()
135    }
136
137    /// Delete rows by positional offsets (e.g., from a deletion vector)
138    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
139        let mut local_positions: Vec<u32> = Vec::new();
140        let mut positions_iter = positions.into_iter();
141        let mut curr_position = positions_iter.next();
142        let mut offset: usize = 0;
143        let mut cutoff: usize = 0;
144
145        for run in self.runs.iter_mut() {
146            cutoff += run.span.len();
147            while let Some(position) = curr_position {
148                if position as usize >= cutoff {
149                    break;
150                }
151                local_positions.push(position - offset as u32);
152                curr_position = positions_iter.next();
153            }
154
155            if !local_positions.is_empty() {
156                run.span.mask(local_positions.as_slice());
157                local_positions.clear();
158            }
159            offset = cutoff;
160        }
161
162        self.runs.retain(|r| !r.span.is_empty());
163        Ok(())
164    }
165}
166
167/// Iterator over versions expanding runs lazily.
168pub struct VersionsIter<'a> {
169    runs: &'a [RowDatasetVersionRun],
170    run_idx: usize,
171    remaining_in_run: usize,
172    current_version: u64,
173}
174
175impl<'a> VersionsIter<'a> {
176    fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
177        let mut it = Self {
178            runs,
179            run_idx: 0,
180            remaining_in_run: 0,
181            current_version: 0,
182        };
183        it.advance_run();
184        it
185    }
186
187    fn advance_run(&mut self) {
188        if self.run_idx < self.runs.len() {
189            let run = &self.runs[self.run_idx];
190            self.remaining_in_run = run.len();
191            self.current_version = run.version();
192        } else {
193            self.remaining_in_run = 0;
194        }
195    }
196}
197
198impl<'a> Iterator for VersionsIter<'a> {
199    type Item = u64;
200
201    fn next(&mut self) -> Option<Self::Item> {
202        if self.remaining_in_run == 0 {
203            // Move to next run
204            self.run_idx += 1;
205            if self.run_idx >= self.runs.len() {
206                return None;
207            }
208            self.advance_run();
209        }
210        self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
211        Some(self.current_version)
212    }
213}
214
215/// Metadata about the location of dataset version sequence data
216/// Following the same pattern as RowIdMeta
217///
218/// When stored inline, identical byte sequences are shared across fragments
219/// via `Arc<[u8]>` to reduce manifest memory for large tables.
220#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
221pub enum RowDatasetVersionMeta {
222    /// Small sequences stored inline in the fragment metadata
223    Inline(Arc<[u8]>),
224    /// Large sequences stored in external files
225    External(ExternalFile),
226}
227
228// Custom Serialize: convert Arc<[u8]> to slice for transparent JSON output
229impl Serialize for RowDatasetVersionMeta {
230    fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
231        #[derive(Serialize)]
232        #[serde(untagged)]
233        enum Helper<'a> {
234            Inline { inline: &'a [u8] },
235            External { external: &'a ExternalFile },
236        }
237
238        match self {
239            Self::Inline(data) => Helper::Inline {
240                inline: data.as_ref(),
241            }
242            .serialize(serializer),
243            Self::External(file) => Helper::External { external: file }.serialize(serializer),
244        }
245    }
246}
247
248// Custom Deserialize: read Vec<u8> and convert to Arc<[u8]>
249impl<'de> Deserialize<'de> for RowDatasetVersionMeta {
250    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
251        #[derive(Deserialize)]
252        #[serde(untagged)]
253        enum Helper {
254            Inline { inline: Vec<u8> },
255            External { external: ExternalFile },
256        }
257
258        match Helper::deserialize(deserializer)? {
259            Helper::Inline { inline } => Ok(Self::Inline(Arc::from(inline))),
260            Helper::External { external } => Ok(Self::External(external)),
261        }
262    }
263}
264
265impl RowDatasetVersionMeta {
266    /// Create inline metadata from a version sequence
267    pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
268        let bytes = write_dataset_versions(sequence);
269        Ok(Self::Inline(Arc::from(bytes)))
270    }
271
272    /// Create external metadata reference
273    pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
274        Self::External(ExternalFile { path, offset, size })
275    }
276
277    /// Load the version sequence from this metadata
278    pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
279        match self {
280            Self::Inline(data) => read_dataset_versions(data),
281            Self::External(_file) => {
282                todo!("External file loading not yet implemented")
283            }
284        }
285    }
286}
287
288/// Helper function to convert RowDatasetVersionMeta to protobuf format for last_updated_at
289pub fn last_updated_at_version_meta_to_pb(
290    meta: &Option<RowDatasetVersionMeta>,
291) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
292    meta.as_ref().map(|m| match m {
293        RowDatasetVersionMeta::Inline(data) => {
294            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
295                data.to_vec(),
296            )
297        }
298        RowDatasetVersionMeta::External(file) => {
299            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
300                pb::ExternalFile {
301                    path: file.path.clone(),
302                    offset: file.offset,
303                    size: file.size,
304                },
305            )
306        }
307    })
308}
309
310/// Helper function to convert RowDatasetVersionMeta to protobuf format for created_at
311pub fn created_at_version_meta_to_pb(
312    meta: &Option<RowDatasetVersionMeta>,
313) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
314    meta.as_ref().map(|m| match m {
315        RowDatasetVersionMeta::Inline(data) => {
316            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.to_vec())
317        }
318        RowDatasetVersionMeta::External(file) => {
319            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
320                pb::ExternalFile {
321                    path: file.path.clone(),
322                    offset: file.offset,
323                    size: file.size,
324                },
325            )
326        }
327    })
328}
329
330/// Serialize a dataset version sequence to a buffer (following RowIdSequence pattern)
331pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
332    // Convert to protobuf sequence
333    let pb_sequence = pb::RowDatasetVersionSequence {
334        runs: sequence
335            .runs
336            .iter()
337            .map(|run| pb::RowDatasetVersionRun {
338                span: Some(pb::U64Segment::from(run.span.clone())),
339                version: run.version,
340            })
341            .collect(),
342    };
343
344    pb_sequence.encode_to_vec()
345}
346
347/// Deserialize a dataset version sequence from bytes (following RowIdSequence pattern)
348pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
349    let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| {
350        Error::internal(format!("Failed to decode RowDatasetVersionSequence: {}", e))
351    })?;
352
353    let segments = pb_sequence
354        .runs
355        .into_iter()
356        .map(|pb_run| {
357            let positions_pb = pb_run.span.ok_or_else(|| {
358                Error::internal("Missing positions in RowDatasetVersionRun".to_string())
359            })?;
360            let segment = U64Segment::try_from(positions_pb)?;
361            Ok(RowDatasetVersionRun {
362                span: segment,
363                version: pb_run.version,
364            })
365        })
366        .collect::<Result<Vec<_>>>()?;
367
368    Ok(RowDatasetVersionSequence { runs: segments })
369}
370
371/// Re-chunk a sequence of dataset version runs into new chunk sizes (aligned with RowIdSequence rechunking)
372pub fn rechunk_version_sequences(
373    sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
374    chunk_sizes: impl IntoIterator<Item = u64>,
375    allow_incomplete: bool,
376) -> Result<Vec<RowDatasetVersionSequence>> {
377    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
378    let total_chunks = chunk_sizes_vec.len();
379    let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
380
381    let mut run_iter = sequences
382        .into_iter()
383        .flat_map(|sequence| sequence.runs.into_iter())
384        .peekable();
385
386    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
387        Error::invalid_input(format!(
388            "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
389            chunk_index, expected_chunk_size, remaining
390        ))
391    };
392
393    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
394        Error::invalid_input(format!(
395            "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
396            processed_chunks, total_chunk_sizes
397        ))
398    };
399
400    let mut segment_offset = 0_u64;
401
402    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
403        let chunk_size = *chunk_size;
404        let mut out_seq = RowDatasetVersionSequence::new();
405        let mut remaining = chunk_size;
406
407        while remaining > 0 {
408            let remaining_in_segment = run_iter
409                .peek()
410                .map_or(0, |run| run.span.len() as u64 - segment_offset);
411
412            if remaining_in_segment == 0 {
413                if run_iter.next().is_some() {
414                    segment_offset = 0;
415                    continue;
416                } else if allow_incomplete {
417                    break;
418                } else {
419                    return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
420                }
421            }
422
423            match remaining_in_segment.cmp(&remaining) {
424                std::cmp::Ordering::Greater => {
425                    let run = run_iter.peek().unwrap();
426                    let seg = run.span.slice(segment_offset as usize, remaining as usize);
427                    out_seq.runs.push(RowDatasetVersionRun {
428                        span: seg,
429                        version: run.version,
430                    });
431                    segment_offset += remaining;
432                    remaining = 0;
433                }
434                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
435                    let run = run_iter.next().ok_or_else(|| {
436                        too_few_segments_error(chunk_index, chunk_size, remaining)
437                    })?;
438                    let seg = run
439                        .span
440                        .slice(segment_offset as usize, remaining_in_segment as usize);
441                    out_seq.runs.push(RowDatasetVersionRun {
442                        span: seg,
443                        version: run.version,
444                    });
445                    segment_offset = 0;
446                    remaining -= remaining_in_segment;
447                }
448            }
449        }
450
451        chunked_sequences.push(out_seq);
452    }
453
454    if run_iter.peek().is_some() {
455        return Err(too_many_segments_error(
456            chunked_sequences.len(),
457            total_chunks,
458        ));
459    }
460
461    Ok(chunked_sequences)
462}
463
464/// Build version metadata for a fragment if it has physical rows and no existing metadata.
465pub fn build_version_meta(
466    fragment: &Fragment,
467    current_version: u64,
468) -> Option<RowDatasetVersionMeta> {
469    if let Some(physical_rows) = fragment.physical_rows
470        && physical_rows > 0
471    {
472        // Verify row_id_meta exists (sanity check for stable row IDs)
473        if fragment.row_id_meta.is_none() {
474            panic!("Can not find row id meta, please make sure you have enabled stable row id.")
475        }
476
477        // Use physical_rows directly as the authoritative row count
478        // This is correct even for compacted fragments where row_id_meta might
479        // have been partially copied
480        let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
481            physical_rows as u64,
482            current_version,
483        );
484
485        return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
486    }
487    None
488}
489
490/// Refresh row-level latest update version metadata for a full fragment rewrite-column update.
491///
492/// This sets a uniform version sequence for all rows in the fragment to `current_version`.
493pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
494    fragment: &mut Fragment,
495    current_version: u64,
496) -> Result<()> {
497    let row_count = if let Some(pr) = fragment.physical_rows {
498        pr as u64
499    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
500        match row_id_meta {
501            crate::format::RowIdMeta::Inline(data) => {
502                let sequence = read_row_ids(data).unwrap();
503                sequence.len()
504            }
505            // Follow existing behavior: external sequence not yet supported here
506            crate::format::RowIdMeta::External(_file) => 0,
507        }
508    } else {
509        0
510    };
511
512    if row_count > 0 {
513        let version_seq =
514            RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
515        let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
516        fragment.last_updated_at_version_meta = Some(version_meta);
517    }
518
519    Ok(())
520}
521
522/// Refresh row-level latest update version metadata for a partial fragment rewrite-column update.
523///
524/// `updated_offsets` are local row offsets (within the fragment) that have been updated.
525/// Existing version metadata is preserved and only the updated positions are set to `current_version`.
526/// If no existing metadata is present, positions default to `prev_version`.
527pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
528    fragment: &mut Fragment,
529    updated_offsets: &[usize],
530    current_version: u64,
531    prev_version: u64,
532) -> Result<()> {
533    // Determine row count for fragment
534    let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
535        pr as u64
536    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
537        match row_id_meta {
538            crate::format::RowIdMeta::Inline(data) => {
539                let sequence = read_row_ids(data).unwrap();
540                sequence.len()
541            }
542            crate::format::RowIdMeta::External(_file) => {
543                // Preserve original behavior for external sequences
544                todo!("External file loading not yet implemented")
545            }
546        }
547    } else {
548        0
549    };
550
551    if row_count_u64 > 0 {
552        // Build base version vector from existing meta or previous dataset version
553        let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
554        if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
555            if let Ok(base_seq) = meta.load_sequence() {
556                for pos in 0..(row_count_u64 as usize) {
557                    base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
558                }
559            } else {
560                base_versions.resize(row_count_u64 as usize, prev_version);
561            }
562        } else {
563            base_versions.resize(row_count_u64 as usize, prev_version);
564        }
565
566        // Apply updates to updated positions
567        for &pos in updated_offsets {
568            if pos < base_versions.len() {
569                base_versions[pos] = current_version;
570            }
571        }
572
573        // Compress into runs
574        let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
575        if !base_versions.is_empty() {
576            let mut start = 0usize;
577            let mut curr_ver = base_versions[0];
578            for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
579                if ver != curr_ver {
580                    runs.push(RowDatasetVersionRun {
581                        span: U64Segment::Range(start as u64..idx as u64),
582                        version: curr_ver,
583                    });
584                    start = idx;
585                    curr_ver = ver;
586                }
587            }
588            runs.push(RowDatasetVersionRun {
589                span: U64Segment::Range(start as u64..base_versions.len() as u64),
590                version: curr_ver,
591            });
592        }
593        let new_seq = RowDatasetVersionSequence { runs };
594        let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
595        fragment.last_updated_at_version_meta = Some(new_meta);
596    }
597
598    Ok(())
599}
600
601// Protobuf conversion implementations
602impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
603    type Error = Error;
604
605    fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
606        match value {
607            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
608                Ok(Self::Inline(Arc::from(data)))
609            }
610            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
611                file,
612            ) => Ok(Self::External(ExternalFile {
613                path: file.path,
614                offset: file.offset,
615                size: file.size,
616            })),
617        }
618    }
619}
620
621impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
622    type Error = Error;
623
624    fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
625        match value {
626            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
627                Ok(Self::Inline(Arc::from(data)))
628            }
629            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
630                Ok(Self::External(ExternalFile {
631                    path: file.path,
632                    offset: file.offset,
633                    size: file.size,
634                }))
635            }
636        }
637    }
638}
639
640#[cfg(test)]
641mod tests {
642    use super::*;
643
644    #[test]
645    fn test_version_random_access() {
646        let seq = RowDatasetVersionSequence {
647            runs: vec![
648                RowDatasetVersionRun {
649                    span: U64Segment::Range(0..3),
650                    version: 1,
651                },
652                RowDatasetVersionRun {
653                    span: U64Segment::Range(0..2),
654                    version: 2,
655                },
656                RowDatasetVersionRun {
657                    span: U64Segment::Range(0..1),
658                    version: 3,
659                },
660            ],
661        };
662        assert_eq!(seq.version_at(0), Some(1));
663        assert_eq!(seq.version_at(2), Some(1));
664        assert_eq!(seq.version_at(3), Some(2));
665        assert_eq!(seq.version_at(4), Some(2));
666        assert_eq!(seq.version_at(5), Some(3));
667        assert_eq!(seq.version_at(6), None);
668    }
669
670    #[test]
671    fn test_serialization_round_trip() {
672        let seq = RowDatasetVersionSequence {
673            runs: vec![
674                RowDatasetVersionRun {
675                    span: U64Segment::Range(0..4),
676                    version: 42,
677                },
678                RowDatasetVersionRun {
679                    span: U64Segment::Range(0..3),
680                    version: 99,
681                },
682            ],
683        };
684        let bytes = write_dataset_versions(&seq);
685        let seq2 = read_dataset_versions(&bytes).unwrap();
686        assert_eq!(seq2.runs.len(), 2);
687        assert_eq!(seq2.len(), 7);
688        assert_eq!(seq2.version_at(0), Some(42));
689        assert_eq!(seq2.version_at(5), Some(99));
690    }
691
692    #[test]
693    fn test_get_version_for_row_id() {
694        let seq = RowDatasetVersionSequence {
695            runs: vec![
696                RowDatasetVersionRun {
697                    span: U64Segment::Range(0..2),
698                    version: 8,
699                },
700                RowDatasetVersionRun {
701                    span: U64Segment::Range(0..2),
702                    version: 9,
703                },
704            ],
705        };
706        let rows = RowIdSequence::from(10..14); // row ids: 10,11,12,13
707        assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
708        assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
709        assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
710        assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
711        assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
712    }
713}