Skip to main content

lance_table/rowids/
version.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Row version tracking for cross-version diff functionality
5//!
6//! This module provides data structures and functionality to track the latest
7//! update version for each row in a Lance dataset, enabling efficient
8//! cross-version diff operations.
9
10use deepsize::DeepSizeOf;
11use lance_core::Error;
12use lance_core::Result;
13use prost::Message;
14use serde::{Deserialize, Serialize};
15
16use crate::format::{ExternalFile, Fragment, pb};
17use crate::rowids::segment::U64Segment;
18use crate::rowids::{RowIdSequence, read_row_ids};
19
20/// A run of identical versions over a contiguous span of row positions.
21///
22/// Span is expressed as a U64Segment over row offsets (0..N within a fragment),
23/// not over row IDs. This keeps the encoding aligned with RowIdSequence order
24/// and enables zipped iteration without building a map.
25#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
26pub struct RowDatasetVersionRun {
27    pub span: U64Segment,
28    pub version: u64,
29}
30
31impl RowDatasetVersionRun {
32    /// Number of rows covered by this run.
33    pub fn len(&self) -> usize {
34        self.span.len()
35    }
36
37    /// Whether this run covers no rows.
38    pub fn is_empty(&self) -> bool {
39        self.span.is_empty()
40    }
41
42    /// The version value of this run.
43    pub fn version(&self) -> u64 {
44        self.version
45    }
46}
47
48/// Sequence of dataset versions
49///
50/// Stores version runs aligned to the positional order of RowIdSequence.
51/// Provides sequential iterators and optional lightweight indexing for
52/// efficient random access.
53#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
54pub struct RowDatasetVersionSequence {
55    pub runs: Vec<RowDatasetVersionRun>,
56}
57
58impl RowDatasetVersionSequence {
59    /// Create a new empty version sequence
60    pub fn new() -> Self {
61        Self { runs: Vec::new() }
62    }
63
64    /// Create a version sequence with a single uniform run of `row_count` rows.
65    pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
66        if row_count == 0 {
67            return Self::new();
68        }
69        let run = RowDatasetVersionRun {
70            span: U64Segment::Range(0..row_count),
71            version,
72        };
73        Self { runs: vec![run] }
74    }
75
76    /// Number of rows tracked by this sequence (sum of run lengths).
77    pub fn len(&self) -> u64 {
78        self.runs.iter().map(|s| s.len() as u64).sum()
79    }
80
81    /// Empty if there are no runs or all runs are empty.
82    pub fn is_empty(&self) -> bool {
83        self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
84    }
85
86    /// Returns a forward iterator over versions, expanding runs lazily.
87    pub fn versions(&self) -> VersionsIter<'_> {
88        VersionsIter::new(&self.runs)
89    }
90
91    /// Random access: get the version at global row position `index`.
92    pub fn version_at(&self, index: usize) -> Option<u64> {
93        let mut offset = 0usize;
94        for run in &self.runs {
95            let len = run.len();
96            if index < offset + len {
97                return Some(run.version());
98            }
99            offset += len;
100        }
101        None
102    }
103
104    /// Get the version associated with a specific row id.
105    /// This reconstructs the positional offset from RowIdSequence and then
106    /// performs `version_at` lookup.
107    pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
108        let mut offset = 0usize;
109        for seg in &row_ids.0 {
110            if seg.range().is_some_and(|r| r.contains(&row_id))
111                && let Some(local) = seg.position(row_id)
112            {
113                return self.version_at(offset + local);
114            }
115            offset += seg.len();
116        }
117        None
118    }
119
120    /// Convenience: collect row IDs with version strictly greater than `threshold`.
121    pub fn rows_with_version_greater_than(
122        &self,
123        row_ids: &RowIdSequence,
124        threshold: u64,
125    ) -> Vec<u64> {
126        row_ids
127            .iter()
128            .zip(self.versions())
129            .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
130            .collect()
131    }
132
133    /// Delete rows by positional offsets (e.g., from a deletion vector)
134    pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
135        let mut local_positions: Vec<u32> = Vec::new();
136        let mut positions_iter = positions.into_iter();
137        let mut curr_position = positions_iter.next();
138        let mut offset: usize = 0;
139        let mut cutoff: usize = 0;
140
141        for run in self.runs.iter_mut() {
142            cutoff += run.span.len();
143            while let Some(position) = curr_position {
144                if position as usize >= cutoff {
145                    break;
146                }
147                local_positions.push(position - offset as u32);
148                curr_position = positions_iter.next();
149            }
150
151            if !local_positions.is_empty() {
152                run.span.mask(local_positions.as_slice());
153                local_positions.clear();
154            }
155            offset = cutoff;
156        }
157
158        self.runs.retain(|r| !r.span.is_empty());
159        Ok(())
160    }
161}
162
163/// Iterator over versions expanding runs lazily.
164pub struct VersionsIter<'a> {
165    runs: &'a [RowDatasetVersionRun],
166    run_idx: usize,
167    remaining_in_run: usize,
168    current_version: u64,
169}
170
171impl<'a> VersionsIter<'a> {
172    fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
173        let mut it = Self {
174            runs,
175            run_idx: 0,
176            remaining_in_run: 0,
177            current_version: 0,
178        };
179        it.advance_run();
180        it
181    }
182
183    fn advance_run(&mut self) {
184        if self.run_idx < self.runs.len() {
185            let run = &self.runs[self.run_idx];
186            self.remaining_in_run = run.len();
187            self.current_version = run.version();
188        } else {
189            self.remaining_in_run = 0;
190        }
191    }
192}
193
194impl<'a> Iterator for VersionsIter<'a> {
195    type Item = u64;
196
197    fn next(&mut self) -> Option<Self::Item> {
198        if self.remaining_in_run == 0 {
199            // Move to next run
200            self.run_idx += 1;
201            if self.run_idx >= self.runs.len() {
202                return None;
203            }
204            self.advance_run();
205        }
206        self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
207        Some(self.current_version)
208    }
209}
210
211/// Metadata about the location of dataset version sequence data
212/// Following the same pattern as RowIdMeta
213#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
214pub enum RowDatasetVersionMeta {
215    /// Small sequences stored inline in the fragment metadata
216    Inline(Vec<u8>),
217    /// Large sequences stored in external files
218    External(ExternalFile),
219}
220
221impl RowDatasetVersionMeta {
222    /// Create inline metadata from a version sequence
223    pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
224        let bytes = write_dataset_versions(sequence);
225        Ok(Self::Inline(bytes))
226    }
227
228    /// Create external metadata reference
229    pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
230        Self::External(ExternalFile { path, offset, size })
231    }
232
233    /// Load the version sequence from this metadata
234    pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
235        match self {
236            Self::Inline(data) => read_dataset_versions(data),
237            Self::External(_file) => {
238                todo!("External file loading not yet implemented")
239            }
240        }
241    }
242}
243
244/// Helper function to convert RowDatasetVersionMeta to protobuf format for last_updated_at
245pub fn last_updated_at_version_meta_to_pb(
246    meta: &Option<RowDatasetVersionMeta>,
247) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
248    meta.as_ref().map(|m| match m {
249        RowDatasetVersionMeta::Inline(data) => {
250            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
251                data.clone(),
252            )
253        }
254        RowDatasetVersionMeta::External(file) => {
255            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
256                pb::ExternalFile {
257                    path: file.path.clone(),
258                    offset: file.offset,
259                    size: file.size,
260                },
261            )
262        }
263    })
264}
265
266/// Helper function to convert RowDatasetVersionMeta to protobuf format for created_at
267pub fn created_at_version_meta_to_pb(
268    meta: &Option<RowDatasetVersionMeta>,
269) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
270    meta.as_ref().map(|m| match m {
271        RowDatasetVersionMeta::Inline(data) => {
272            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.clone())
273        }
274        RowDatasetVersionMeta::External(file) => {
275            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
276                pb::ExternalFile {
277                    path: file.path.clone(),
278                    offset: file.offset,
279                    size: file.size,
280                },
281            )
282        }
283    })
284}
285
286/// Serialize a dataset version sequence to a buffer (following RowIdSequence pattern)
287pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
288    // Convert to protobuf sequence
289    let pb_sequence = pb::RowDatasetVersionSequence {
290        runs: sequence
291            .runs
292            .iter()
293            .map(|run| pb::RowDatasetVersionRun {
294                span: Some(pb::U64Segment::from(run.span.clone())),
295                version: run.version,
296            })
297            .collect(),
298    };
299
300    pb_sequence.encode_to_vec()
301}
302
303/// Deserialize a dataset version sequence from bytes (following RowIdSequence pattern)
304pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
305    let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| {
306        Error::internal(format!("Failed to decode RowDatasetVersionSequence: {}", e))
307    })?;
308
309    let segments = pb_sequence
310        .runs
311        .into_iter()
312        .map(|pb_run| {
313            let positions_pb = pb_run.span.ok_or_else(|| {
314                Error::internal("Missing positions in RowDatasetVersionRun".to_string())
315            })?;
316            let segment = U64Segment::try_from(positions_pb)?;
317            Ok(RowDatasetVersionRun {
318                span: segment,
319                version: pb_run.version,
320            })
321        })
322        .collect::<Result<Vec<_>>>()?;
323
324    Ok(RowDatasetVersionSequence { runs: segments })
325}
326
327/// Re-chunk a sequence of dataset version runs into new chunk sizes (aligned with RowIdSequence rechunking)
328pub fn rechunk_version_sequences(
329    sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
330    chunk_sizes: impl IntoIterator<Item = u64>,
331    allow_incomplete: bool,
332) -> Result<Vec<RowDatasetVersionSequence>> {
333    let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
334    let total_chunks = chunk_sizes_vec.len();
335    let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
336
337    let mut run_iter = sequences
338        .into_iter()
339        .flat_map(|sequence| sequence.runs.into_iter())
340        .peekable();
341
342    let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
343        Error::invalid_input(format!(
344            "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
345            chunk_index, expected_chunk_size, remaining
346        ))
347    };
348
349    let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
350        Error::invalid_input(format!(
351            "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
352            processed_chunks, total_chunk_sizes
353        ))
354    };
355
356    let mut segment_offset = 0_u64;
357
358    for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
359        let chunk_size = *chunk_size;
360        let mut out_seq = RowDatasetVersionSequence::new();
361        let mut remaining = chunk_size;
362
363        while remaining > 0 {
364            let remaining_in_segment = run_iter
365                .peek()
366                .map_or(0, |run| run.span.len() as u64 - segment_offset);
367
368            if remaining_in_segment == 0 {
369                if run_iter.next().is_some() {
370                    segment_offset = 0;
371                    continue;
372                } else if allow_incomplete {
373                    break;
374                } else {
375                    return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
376                }
377            }
378
379            match remaining_in_segment.cmp(&remaining) {
380                std::cmp::Ordering::Greater => {
381                    let run = run_iter.peek().unwrap();
382                    let seg = run.span.slice(segment_offset as usize, remaining as usize);
383                    out_seq.runs.push(RowDatasetVersionRun {
384                        span: seg,
385                        version: run.version,
386                    });
387                    segment_offset += remaining;
388                    remaining = 0;
389                }
390                std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
391                    let run = run_iter.next().ok_or_else(|| {
392                        too_few_segments_error(chunk_index, chunk_size, remaining)
393                    })?;
394                    let seg = run
395                        .span
396                        .slice(segment_offset as usize, remaining_in_segment as usize);
397                    out_seq.runs.push(RowDatasetVersionRun {
398                        span: seg,
399                        version: run.version,
400                    });
401                    segment_offset = 0;
402                    remaining -= remaining_in_segment;
403                }
404            }
405        }
406
407        chunked_sequences.push(out_seq);
408    }
409
410    if run_iter.peek().is_some() {
411        return Err(too_many_segments_error(
412            chunked_sequences.len(),
413            total_chunks,
414        ));
415    }
416
417    Ok(chunked_sequences)
418}
419
420/// Build version metadata for a fragment if it has physical rows and no existing metadata.
421pub fn build_version_meta(
422    fragment: &Fragment,
423    current_version: u64,
424) -> Option<RowDatasetVersionMeta> {
425    if let Some(physical_rows) = fragment.physical_rows
426        && physical_rows > 0
427    {
428        // Verify row_id_meta exists (sanity check for stable row IDs)
429        if fragment.row_id_meta.is_none() {
430            panic!("Can not find row id meta, please make sure you have enabled stable row id.")
431        }
432
433        // Use physical_rows directly as the authoritative row count
434        // This is correct even for compacted fragments where row_id_meta might
435        // have been partially copied
436        let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
437            physical_rows as u64,
438            current_version,
439        );
440
441        return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
442    }
443    None
444}
445
446/// Refresh row-level latest update version metadata for a full fragment rewrite-column update.
447///
448/// This sets a uniform version sequence for all rows in the fragment to `current_version`.
449pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
450    fragment: &mut Fragment,
451    current_version: u64,
452) -> Result<()> {
453    let row_count = if let Some(pr) = fragment.physical_rows {
454        pr as u64
455    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
456        match row_id_meta {
457            crate::format::RowIdMeta::Inline(data) => {
458                let sequence = read_row_ids(data).unwrap();
459                sequence.len()
460            }
461            // Follow existing behavior: external sequence not yet supported here
462            crate::format::RowIdMeta::External(_file) => 0,
463        }
464    } else {
465        0
466    };
467
468    if row_count > 0 {
469        let version_seq =
470            RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
471        let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
472        fragment.last_updated_at_version_meta = Some(version_meta);
473    }
474
475    Ok(())
476}
477
478/// Refresh row-level latest update version metadata for a partial fragment rewrite-column update.
479///
480/// `updated_offsets` are local row offsets (within the fragment) that have been updated.
481/// Existing version metadata is preserved and only the updated positions are set to `current_version`.
482/// If no existing metadata is present, positions default to `prev_version`.
483pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
484    fragment: &mut Fragment,
485    updated_offsets: &[usize],
486    current_version: u64,
487    prev_version: u64,
488) -> Result<()> {
489    // Determine row count for fragment
490    let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
491        pr as u64
492    } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
493        match row_id_meta {
494            crate::format::RowIdMeta::Inline(data) => {
495                let sequence = read_row_ids(data).unwrap();
496                sequence.len()
497            }
498            crate::format::RowIdMeta::External(_file) => {
499                // Preserve original behavior for external sequences
500                todo!("External file loading not yet implemented")
501            }
502        }
503    } else {
504        0
505    };
506
507    if row_count_u64 > 0 {
508        // Build base version vector from existing meta or previous dataset version
509        let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
510        if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
511            if let Ok(base_seq) = meta.load_sequence() {
512                for pos in 0..(row_count_u64 as usize) {
513                    base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
514                }
515            } else {
516                base_versions.resize(row_count_u64 as usize, prev_version);
517            }
518        } else {
519            base_versions.resize(row_count_u64 as usize, prev_version);
520        }
521
522        // Apply updates to updated positions
523        for &pos in updated_offsets {
524            if pos < base_versions.len() {
525                base_versions[pos] = current_version;
526            }
527        }
528
529        // Compress into runs
530        let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
531        if !base_versions.is_empty() {
532            let mut start = 0usize;
533            let mut curr_ver = base_versions[0];
534            for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
535                if ver != curr_ver {
536                    runs.push(RowDatasetVersionRun {
537                        span: U64Segment::Range(start as u64..idx as u64),
538                        version: curr_ver,
539                    });
540                    start = idx;
541                    curr_ver = ver;
542                }
543            }
544            runs.push(RowDatasetVersionRun {
545                span: U64Segment::Range(start as u64..base_versions.len() as u64),
546                version: curr_ver,
547            });
548        }
549        let new_seq = RowDatasetVersionSequence { runs };
550        let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
551        fragment.last_updated_at_version_meta = Some(new_meta);
552    }
553
554    Ok(())
555}
556
557// Protobuf conversion implementations
558impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
559    type Error = Error;
560
561    fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
562        match value {
563            pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
564                Ok(Self::Inline(data))
565            }
566            pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
567                file,
568            ) => Ok(Self::External(ExternalFile {
569                path: file.path,
570                offset: file.offset,
571                size: file.size,
572            })),
573        }
574    }
575}
576
577impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
578    type Error = Error;
579
580    fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
581        match value {
582            pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
583                Ok(Self::Inline(data))
584            }
585            pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
586                Ok(Self::External(ExternalFile {
587                    path: file.path,
588                    offset: file.offset,
589                    size: file.size,
590                }))
591            }
592        }
593    }
594}
595
596#[cfg(test)]
597mod tests {
598    use super::*;
599
600    #[test]
601    fn test_version_random_access() {
602        let seq = RowDatasetVersionSequence {
603            runs: vec![
604                RowDatasetVersionRun {
605                    span: U64Segment::Range(0..3),
606                    version: 1,
607                },
608                RowDatasetVersionRun {
609                    span: U64Segment::Range(0..2),
610                    version: 2,
611                },
612                RowDatasetVersionRun {
613                    span: U64Segment::Range(0..1),
614                    version: 3,
615                },
616            ],
617        };
618        assert_eq!(seq.version_at(0), Some(1));
619        assert_eq!(seq.version_at(2), Some(1));
620        assert_eq!(seq.version_at(3), Some(2));
621        assert_eq!(seq.version_at(4), Some(2));
622        assert_eq!(seq.version_at(5), Some(3));
623        assert_eq!(seq.version_at(6), None);
624    }
625
626    #[test]
627    fn test_serialization_round_trip() {
628        let seq = RowDatasetVersionSequence {
629            runs: vec![
630                RowDatasetVersionRun {
631                    span: U64Segment::Range(0..4),
632                    version: 42,
633                },
634                RowDatasetVersionRun {
635                    span: U64Segment::Range(0..3),
636                    version: 99,
637                },
638            ],
639        };
640        let bytes = write_dataset_versions(&seq);
641        let seq2 = read_dataset_versions(&bytes).unwrap();
642        assert_eq!(seq2.runs.len(), 2);
643        assert_eq!(seq2.len(), 7);
644        assert_eq!(seq2.version_at(0), Some(42));
645        assert_eq!(seq2.version_at(5), Some(99));
646    }
647
648    #[test]
649    fn test_get_version_for_row_id() {
650        let seq = RowDatasetVersionSequence {
651            runs: vec![
652                RowDatasetVersionRun {
653                    span: U64Segment::Range(0..2),
654                    version: 8,
655                },
656                RowDatasetVersionRun {
657                    span: U64Segment::Range(0..2),
658                    version: 9,
659                },
660            ],
661        };
662        let rows = RowIdSequence::from(10..14); // row ids: 10,11,12,13
663        assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
664        assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
665        assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
666        assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
667        assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
668    }
669}