lance_table/rowids/
version.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Row version tracking for cross-version diff functionality
5//!
6//! This module provides data structures and functionality to track the latest
7//! update version for each row in a Lance dataset, enabling efficient
8//! cross-version diff operations.
9
10use deepsize::DeepSizeOf;
11use lance_core::Error;
12use lance_core::Result;
13use prost::Message;
14use serde::{Deserialize, Serialize};
15use snafu::location;
16
17use crate::format::{pb, ExternalFile, Fragment};
18use crate::rowids::segment::U64Segment;
19use crate::rowids::{read_row_ids, RowIdSequence};
20
21/// A run of identical versions over a contiguous span of row positions.
22///
23/// Span is expressed as a U64Segment over row offsets (0..N within a fragment),
24/// not over row IDs. This keeps the encoding aligned with RowIdSequence order
25/// and enables zipped iteration without building a map.
26#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
27pub struct RowDatasetVersionRun {
28    pub span: U64Segment,
29    pub version: u64,
30}
31
32impl RowDatasetVersionRun {
33    /// Number of rows covered by this run.
34    pub fn len(&self) -> usize {
35        self.span.len()
36    }
37
38    /// Whether this run covers no rows.
39    pub fn is_empty(&self) -> bool {
40        self.span.is_empty()
41    }
42
43    /// The version value of this run.
44    pub fn version(&self) -> u64 {
45        self.version
46    }
47}
48
49/// Sequence of dataset versions
50///
51/// Stores version runs aligned to the positional order of RowIdSequence.
52/// Provides sequential iterators and optional lightweight indexing for
53/// efficient random access.
54#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
55pub struct RowDatasetVersionSequence {
56    pub runs: Vec<RowDatasetVersionRun>,
57}
58
59impl RowDatasetVersionSequence {
60    /// Create a new empty version sequence
61    pub fn new() -> Self {
62        Self { runs: Vec::new() }
63    }
64
65    /// Create a version sequence with a single uniform run of `row_count` rows.
66    pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
67        if row_count == 0 {
68            return Self::new();
69        }
70        let run = RowDatasetVersionRun {
71            span: U64Segment::Range(0..row_count),
72            version,
73        };
74        Self { runs: vec![run] }
75    }
76
77    /// Number of rows tracked by this sequence (sum of run lengths).
78    pub fn len(&self) -> u64 {
79        self.runs.iter().map(|s| s.len() as u64).sum()
80    }
81
82    /// Empty if there are no runs or all runs are empty.
83    pub fn is_empty(&self) -> bool {
84        self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
85    }
86
87    /// Returns a forward iterator over versions, expanding runs lazily.
88    pub fn versions(&self) -> VersionsIter<'_> {
89        VersionsIter::new(&self.runs)
90    }
91
92    /// Random access: get the version at global row position `index`.
93    pub fn version_at(&self, index: usize) -> Option<u64> {
94        let mut offset = 0usize;
95        for run in &self.runs {
96            let len = run.len();
97            if index < offset + len {
98                return Some(run.version());
99            }
100            offset += len;
101        }
102        None
103    }
104
105    /// Get the version associated with a specific row id.
106    /// This reconstructs the positional offset from RowIdSequence and then
107    /// performs `version_at` lookup.
108    pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
109        let mut offset = 0usize;
110        for seg in &row_ids.0 {
111            if seg.range().is_some_and(|r| r.contains(&row_id)) {
112                if let Some(local) = seg.position(row_id) {
113                    return self.version_at(offset + local);
114                }
115            }
116            offset += seg.len();
117        }
118        None
119    }
120
121    /// Convenience: collect row IDs with version strictly greater than `threshold`.
122    pub fn rows_with_version_greater_than(
123        &self,
124        row_ids: &RowIdSequence,
125        threshold: u64,
126    ) -> Vec<u64> {
127        row_ids
128            .iter()
129            .zip(self.versions())
130            .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
131            .collect()
132    }
133
134    /// Delete rows by positional offsets (e.g., from a deletion vector)
135    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
136        let mut local_positions: Vec<u32> = Vec::new();
137        let mut positions_iter = positions.into_iter();
138        let mut curr_position = positions_iter.next();
139        let mut offset: usize = 0;
140        let mut cutoff: usize = 0;
141
142        for run in self.runs.iter_mut() {
143            cutoff += run.span.len();
144            while let Some(position) = curr_position {
145                if position as usize >= cutoff {
146                    break;
147                }
148                local_positions.push(position - offset as u32);
149                curr_position = positions_iter.next();
150            }
151
152            if !local_positions.is_empty() {
153                run.span.mask(local_positions.as_slice());
154                local_positions.clear();
155            }
156            offset = cutoff;
157        }
158
159        self.runs.retain(|r| !r.span.is_empty());
160        Ok(())
161    }
162}
163
164/// Iterator over versions expanding runs lazily.
165pub struct VersionsIter<'a> {
166    runs: &'a [RowDatasetVersionRun],
167    run_idx: usize,
168    remaining_in_run: usize,
169    current_version: u64,
170}
171
172impl<'a> VersionsIter<'a> {
173    fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
174        let mut it = Self {
175            runs,
176            run_idx: 0,
177            remaining_in_run: 0,
178            current_version: 0,
179        };
180        it.advance_run();
181        it
182    }
183
184    fn advance_run(&mut self) {
185        if self.run_idx < self.runs.len() {
186            let run = &self.runs[self.run_idx];
187            self.remaining_in_run = run.len();
188            self.current_version = run.version();
189        } else {
190            self.remaining_in_run = 0;
191        }
192    }
193}
194
195impl<'a> Iterator for VersionsIter<'a> {
196    type Item = u64;
197
198    fn next(&mut self) -> Option<Self::Item> {
199        if self.remaining_in_run == 0 {
200            // Move to next run
201            self.run_idx += 1;
202            if self.run_idx >= self.runs.len() {
203                return None;
204            }
205            self.advance_run();
206        }
207        self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
208        Some(self.current_version)
209    }
210}
211
212/// Metadata about the location of dataset version sequence data
213/// Following the same pattern as RowIdMeta
214#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
215pub enum RowDatasetVersionMeta {
216    /// Small sequences stored inline in the fragment metadata
217    Inline(Vec<u8>),
218    /// Large sequences stored in external files
219    External(ExternalFile),
220}
221
222impl RowDatasetVersionMeta {
223    /// Create inline metadata from a version sequence
224    pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
225        let bytes = write_dataset_versions(sequence);
226        Ok(Self::Inline(bytes))
227    }
228
229    /// Create external metadata reference
230    pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
231        Self::External(ExternalFile { path, offset, size })
232    }
233
234    /// Load the version sequence from this metadata
235    pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
236        match self {
237            Self::Inline(data) => read_dataset_versions(data),
238            Self::External(_file) => {
239                todo!("External file loading not yet implemented")
240            }
241        }
242    }
243}
244
245/// Helper function to convert RowDatasetVersionMeta to protobuf format for last_updated_at
246pub fn last_updated_at_version_meta_to_pb(
247    meta: &Option<RowDatasetVersionMeta>,
248) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
249    meta.as_ref().map(|m| match m {
250        RowDatasetVersionMeta::Inline(data) => {
251            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
252                data.clone(),
253            )
254        }
255        RowDatasetVersionMeta::External(file) => {
256            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
257                pb::ExternalFile {
258                    path: file.path.clone(),
259                    offset: file.offset,
260                    size: file.size,
261                },
262            )
263        }
264    })
265}
266
267/// Helper function to convert RowDatasetVersionMeta to protobuf format for created_at
268pub fn created_at_version_meta_to_pb(
269    meta: &Option<RowDatasetVersionMeta>,
270) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
271    meta.as_ref().map(|m| match m {
272        RowDatasetVersionMeta::Inline(data) => {
273            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.clone())
274        }
275        RowDatasetVersionMeta::External(file) => {
276            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
277                pb::ExternalFile {
278                    path: file.path.clone(),
279                    offset: file.offset,
280                    size: file.size,
281                },
282            )
283        }
284    })
285}
286
287/// Serialize a dataset version sequence to a buffer (following RowIdSequence pattern)
288pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
289    // Convert to protobuf sequence
290    let pb_sequence = pb::RowDatasetVersionSequence {
291        runs: sequence
292            .runs
293            .iter()
294            .map(|run| pb::RowDatasetVersionRun {
295                span: Some(pb::U64Segment::from(run.span.clone())),
296                version: run.version,
297            })
298            .collect(),
299    };
300
301    pb_sequence.encode_to_vec()
302}
303
304/// Deserialize a dataset version sequence from bytes (following RowIdSequence pattern)
305pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
306    let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| Error::Internal {
307        message: format!("Failed to decode RowDatasetVersionSequence: {}", e),
308        location: location!(),
309    })?;
310
311    let segments = pb_sequence
312        .runs
313        .into_iter()
314        .map(|pb_run| {
315            let positions_pb = pb_run.span.ok_or_else(|| Error::Internal {
316                message: "Missing positions in RowDatasetVersionRun".to_string(),
317                location: location!(),
318            })?;
319            let segment = U64Segment::try_from(positions_pb)?;
320            Ok(RowDatasetVersionRun {
321                span: segment,
322                version: pb_run.version,
323            })
324        })
325        .collect::<Result<Vec<_>>>()?;
326
327    Ok(RowDatasetVersionSequence { runs: segments })
328}
329
330/// Re-chunk a sequence of dataset version runs into new chunk sizes (aligned with RowIdSequence rechunking)
331pub fn rechunk_version_sequences(
332    sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
333    chunk_sizes: impl IntoIterator<Item = u64>,
334    allow_incomplete: bool,
335) -> Result<Vec<RowDatasetVersionSequence>> {
336    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
337    let total_chunks = chunk_sizes_vec.len();
338    let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
339
340    let mut run_iter = sequences
341        .into_iter()
342        .flat_map(|sequence| sequence.runs.into_iter())
343        .peekable();
344
345    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
346        Error::invalid_input(
347            format!(
348                "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
349                chunk_index, expected_chunk_size, remaining
350            ),
351            location!(),
352        )
353    };
354
355    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
356        Error::invalid_input(
357            format!(
358                "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
359                processed_chunks, total_chunk_sizes
360            ),
361            location!(),
362        )
363    };
364
365    let mut segment_offset = 0_u64;
366
367    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
368        let chunk_size = *chunk_size;
369        let mut out_seq = RowDatasetVersionSequence::new();
370        let mut remaining = chunk_size;
371
372        while remaining > 0 {
373            let remaining_in_segment = run_iter
374                .peek()
375                .map_or(0, |run| run.span.len() as u64 - segment_offset);
376
377            if remaining_in_segment == 0 {
378                if run_iter.next().is_some() {
379                    segment_offset = 0;
380                    continue;
381                } else if allow_incomplete {
382                    break;
383                } else {
384                    return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
385                }
386            }
387
388            match remaining_in_segment.cmp(&remaining) {
389                std::cmp::Ordering::Greater => {
390                    let run = run_iter.peek().unwrap();
391                    let seg = run.span.slice(segment_offset as usize, remaining as usize);
392                    out_seq.runs.push(RowDatasetVersionRun {
393                        span: seg,
394                        version: run.version,
395                    });
396                    segment_offset += remaining;
397                    remaining = 0;
398                }
399                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
400                    let run = run_iter.next().ok_or_else(|| {
401                        too_few_segments_error(chunk_index, chunk_size, remaining)
402                    })?;
403                    let seg = run
404                        .span
405                        .slice(segment_offset as usize, remaining_in_segment as usize);
406                    out_seq.runs.push(RowDatasetVersionRun {
407                        span: seg,
408                        version: run.version,
409                    });
410                    segment_offset = 0;
411                    remaining -= remaining_in_segment;
412                }
413            }
414        }
415
416        chunked_sequences.push(out_seq);
417    }
418
419    if run_iter.peek().is_some() {
420        return Err(too_many_segments_error(
421            chunked_sequences.len(),
422            total_chunks,
423        ));
424    }
425
426    Ok(chunked_sequences)
427}
428
429/// Build version metadata for a fragment if it has physical rows and no existing metadata.
430pub fn build_version_meta(
431    fragment: &Fragment,
432    current_version: u64,
433) -> Option<RowDatasetVersionMeta> {
434    if let Some(physical_rows) = fragment.physical_rows {
435        if physical_rows > 0 {
436            // Verify row_id_meta exists (sanity check for stable row IDs)
437            if fragment.row_id_meta.is_none() {
438                panic!("Can not find row id meta, please make sure you have enabled stable row id.")
439            }
440
441            // Use physical_rows directly as the authoritative row count
442            // This is correct even for compacted fragments where row_id_meta might
443            // have been partially copied
444            let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
445                physical_rows as u64,
446                current_version,
447            );
448
449            return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
450        }
451    }
452    None
453}
454
455/// Refresh row-level latest update version metadata for a full fragment rewrite-column update.
456///
457/// This sets a uniform version sequence for all rows in the fragment to `current_version`.
458pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
459    fragment: &mut Fragment,
460    current_version: u64,
461) -> Result<()> {
462    let row_count = if let Some(pr) = fragment.physical_rows {
463        pr as u64
464    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
465        match row_id_meta {
466            crate::format::RowIdMeta::Inline(data) => {
467                let sequence = read_row_ids(data).unwrap();
468                sequence.len()
469            }
470            // Follow existing behavior: external sequence not yet supported here
471            crate::format::RowIdMeta::External(_file) => 0,
472        }
473    } else {
474        0
475    };
476
477    if row_count > 0 {
478        let version_seq =
479            RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
480        let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
481        fragment.last_updated_at_version_meta = Some(version_meta);
482    }
483
484    Ok(())
485}
486
487/// Refresh row-level latest update version metadata for a partial fragment rewrite-column update.
488///
489/// `updated_offsets` are local row offsets (within the fragment) that have been updated.
490/// Existing version metadata is preserved and only the updated positions are set to `current_version`.
491/// If no existing metadata is present, positions default to `prev_version`.
492pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
493    fragment: &mut Fragment,
494    updated_offsets: &[usize],
495    current_version: u64,
496    prev_version: u64,
497) -> Result<()> {
498    // Determine row count for fragment
499    let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
500        pr as u64
501    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
502        match row_id_meta {
503            crate::format::RowIdMeta::Inline(data) => {
504                let sequence = read_row_ids(data).unwrap();
505                sequence.len()
506            }
507            crate::format::RowIdMeta::External(_file) => {
508                // Preserve original behavior for external sequences
509                todo!("External file loading not yet implemented")
510            }
511        }
512    } else {
513        0
514    };
515
516    if row_count_u64 > 0 {
517        // Build base version vector from existing meta or previous dataset version
518        let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
519        if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
520            if let Ok(base_seq) = meta.load_sequence() {
521                for pos in 0..(row_count_u64 as usize) {
522                    base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
523                }
524            } else {
525                base_versions.resize(row_count_u64 as usize, prev_version);
526            }
527        } else {
528            base_versions.resize(row_count_u64 as usize, prev_version);
529        }
530
531        // Apply updates to updated positions
532        for &pos in updated_offsets {
533            if pos < base_versions.len() {
534                base_versions[pos] = current_version;
535            }
536        }
537
538        // Compress into runs
539        let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
540        if !base_versions.is_empty() {
541            let mut start = 0usize;
542            let mut curr_ver = base_versions[0];
543            for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
544                if ver != curr_ver {
545                    runs.push(RowDatasetVersionRun {
546                        span: U64Segment::Range(start as u64..idx as u64),
547                        version: curr_ver,
548                    });
549                    start = idx;
550                    curr_ver = ver;
551                }
552            }
553            runs.push(RowDatasetVersionRun {
554                span: U64Segment::Range(start as u64..base_versions.len() as u64),
555                version: curr_ver,
556            });
557        }
558        let new_seq = RowDatasetVersionSequence { runs };
559        let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
560        fragment.last_updated_at_version_meta = Some(new_meta);
561    }
562
563    Ok(())
564}
565
566// Protobuf conversion implementations
567impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
568    type Error = Error;
569
570    fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
571        match value {
572            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
573                Ok(Self::Inline(data))
574            }
575            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
576                file,
577            ) => Ok(Self::External(ExternalFile {
578                path: file.path,
579                offset: file.offset,
580                size: file.size,
581            })),
582        }
583    }
584}
585
586impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
587    type Error = Error;
588
589    fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
590        match value {
591            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
592                Ok(Self::Inline(data))
593            }
594            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
595                Ok(Self::External(ExternalFile {
596                    path: file.path,
597                    offset: file.offset,
598                    size: file.size,
599                }))
600            }
601        }
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use super::*;
608
609    #[test]
610    fn test_version_random_access() {
611        let seq = RowDatasetVersionSequence {
612            runs: vec![
613                RowDatasetVersionRun {
614                    span: U64Segment::Range(0..3),
615                    version: 1,
616                },
617                RowDatasetVersionRun {
618                    span: U64Segment::Range(0..2),
619                    version: 2,
620                },
621                RowDatasetVersionRun {
622                    span: U64Segment::Range(0..1),
623                    version: 3,
624                },
625            ],
626        };
627        assert_eq!(seq.version_at(0), Some(1));
628        assert_eq!(seq.version_at(2), Some(1));
629        assert_eq!(seq.version_at(3), Some(2));
630        assert_eq!(seq.version_at(4), Some(2));
631        assert_eq!(seq.version_at(5), Some(3));
632        assert_eq!(seq.version_at(6), None);
633    }
634
635    #[test]
636    fn test_serialization_round_trip() {
637        let seq = RowDatasetVersionSequence {
638            runs: vec![
639                RowDatasetVersionRun {
640                    span: U64Segment::Range(0..4),
641                    version: 42,
642                },
643                RowDatasetVersionRun {
644                    span: U64Segment::Range(0..3),
645                    version: 99,
646                },
647            ],
648        };
649        let bytes = write_dataset_versions(&seq);
650        let seq2 = read_dataset_versions(&bytes).unwrap();
651        assert_eq!(seq2.runs.len(), 2);
652        assert_eq!(seq2.len(), 7);
653        assert_eq!(seq2.version_at(0), Some(42));
654        assert_eq!(seq2.version_at(5), Some(99));
655    }
656
657    #[test]
658    fn test_get_version_for_row_id() {
659        let seq = RowDatasetVersionSequence {
660            runs: vec![
661                RowDatasetVersionRun {
662                    span: U64Segment::Range(0..2),
663                    version: 8,
664                },
665                RowDatasetVersionRun {
666                    span: U64Segment::Range(0..2),
667                    version: 9,
668                },
669            ],
670        };
671        let rows = RowIdSequence::from(10..14); // row ids: 10,11,12,13
672        assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
673        assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
674        assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
675        assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
676        assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
677    }
678}