1use deepsize::DeepSizeOf;
11use lance_core::Error;
12use lance_core::Result;
13use prost::Message;
14use serde::{Deserialize, Serialize};
15use snafu::location;
16
17use crate::format::{pb, ExternalFile, Fragment};
18use crate::rowids::segment::U64Segment;
19use crate::rowids::{read_row_ids, RowIdSequence};
20
21#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
27pub struct RowDatasetVersionRun {
28 pub span: U64Segment,
29 pub version: u64,
30}
31
32impl RowDatasetVersionRun {
33 pub fn len(&self) -> usize {
35 self.span.len()
36 }
37
38 pub fn is_empty(&self) -> bool {
40 self.span.is_empty()
41 }
42
43 pub fn version(&self) -> u64 {
45 self.version
46 }
47}
48
49#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
55pub struct RowDatasetVersionSequence {
56 pub runs: Vec<RowDatasetVersionRun>,
57}
58
59impl RowDatasetVersionSequence {
60 pub fn new() -> Self {
62 Self { runs: Vec::new() }
63 }
64
65 pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
67 if row_count == 0 {
68 return Self::new();
69 }
70 let run = RowDatasetVersionRun {
71 span: U64Segment::Range(0..row_count),
72 version,
73 };
74 Self { runs: vec![run] }
75 }
76
77 pub fn len(&self) -> u64 {
79 self.runs.iter().map(|s| s.len() as u64).sum()
80 }
81
82 pub fn is_empty(&self) -> bool {
84 self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
85 }
86
87 pub fn versions(&self) -> VersionsIter<'_> {
89 VersionsIter::new(&self.runs)
90 }
91
92 pub fn version_at(&self, index: usize) -> Option<u64> {
94 let mut offset = 0usize;
95 for run in &self.runs {
96 let len = run.len();
97 if index < offset + len {
98 return Some(run.version());
99 }
100 offset += len;
101 }
102 None
103 }
104
105 pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
109 let mut offset = 0usize;
110 for seg in &row_ids.0 {
111 if seg.range().is_some_and(|r| r.contains(&row_id)) {
112 if let Some(local) = seg.position(row_id) {
113 return self.version_at(offset + local);
114 }
115 }
116 offset += seg.len();
117 }
118 None
119 }
120
121 pub fn rows_with_version_greater_than(
123 &self,
124 row_ids: &RowIdSequence,
125 threshold: u64,
126 ) -> Vec<u64> {
127 row_ids
128 .iter()
129 .zip(self.versions())
130 .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
131 .collect()
132 }
133
134 pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
136 let mut local_positions: Vec<u32> = Vec::new();
137 let mut positions_iter = positions.into_iter();
138 let mut curr_position = positions_iter.next();
139 let mut offset: usize = 0;
140 let mut cutoff: usize = 0;
141
142 for run in self.runs.iter_mut() {
143 cutoff += run.span.len();
144 while let Some(position) = curr_position {
145 if position as usize >= cutoff {
146 break;
147 }
148 local_positions.push(position - offset as u32);
149 curr_position = positions_iter.next();
150 }
151
152 if !local_positions.is_empty() {
153 run.span.mask(local_positions.as_slice());
154 local_positions.clear();
155 }
156 offset = cutoff;
157 }
158
159 self.runs.retain(|r| !r.span.is_empty());
160 Ok(())
161 }
162}
163
164pub struct VersionsIter<'a> {
166 runs: &'a [RowDatasetVersionRun],
167 run_idx: usize,
168 remaining_in_run: usize,
169 current_version: u64,
170}
171
172impl<'a> VersionsIter<'a> {
173 fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
174 let mut it = Self {
175 runs,
176 run_idx: 0,
177 remaining_in_run: 0,
178 current_version: 0,
179 };
180 it.advance_run();
181 it
182 }
183
184 fn advance_run(&mut self) {
185 if self.run_idx < self.runs.len() {
186 let run = &self.runs[self.run_idx];
187 self.remaining_in_run = run.len();
188 self.current_version = run.version();
189 } else {
190 self.remaining_in_run = 0;
191 }
192 }
193}
194
195impl<'a> Iterator for VersionsIter<'a> {
196 type Item = u64;
197
198 fn next(&mut self) -> Option<Self::Item> {
199 if self.remaining_in_run == 0 {
200 self.run_idx += 1;
202 if self.run_idx >= self.runs.len() {
203 return None;
204 }
205 self.advance_run();
206 }
207 self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
208 Some(self.current_version)
209 }
210}
211
212#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
215pub enum RowDatasetVersionMeta {
216 Inline(Vec<u8>),
218 External(ExternalFile),
220}
221
222impl RowDatasetVersionMeta {
223 pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
225 let bytes = write_dataset_versions(sequence);
226 Ok(Self::Inline(bytes))
227 }
228
229 pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
231 Self::External(ExternalFile { path, offset, size })
232 }
233
234 pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
236 match self {
237 Self::Inline(data) => read_dataset_versions(data),
238 Self::External(_file) => {
239 todo!("External file loading not yet implemented")
240 }
241 }
242 }
243}
244
245pub fn last_updated_at_version_meta_to_pb(
247 meta: &Option<RowDatasetVersionMeta>,
248) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
249 meta.as_ref().map(|m| match m {
250 RowDatasetVersionMeta::Inline(data) => {
251 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
252 data.clone(),
253 )
254 }
255 RowDatasetVersionMeta::External(file) => {
256 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
257 pb::ExternalFile {
258 path: file.path.clone(),
259 offset: file.offset,
260 size: file.size,
261 },
262 )
263 }
264 })
265}
266
267pub fn created_at_version_meta_to_pb(
269 meta: &Option<RowDatasetVersionMeta>,
270) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
271 meta.as_ref().map(|m| match m {
272 RowDatasetVersionMeta::Inline(data) => {
273 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.clone())
274 }
275 RowDatasetVersionMeta::External(file) => {
276 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
277 pb::ExternalFile {
278 path: file.path.clone(),
279 offset: file.offset,
280 size: file.size,
281 },
282 )
283 }
284 })
285}
286
287pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
289 let pb_sequence = pb::RowDatasetVersionSequence {
291 runs: sequence
292 .runs
293 .iter()
294 .map(|run| pb::RowDatasetVersionRun {
295 span: Some(pb::U64Segment::from(run.span.clone())),
296 version: run.version,
297 })
298 .collect(),
299 };
300
301 pb_sequence.encode_to_vec()
302}
303
304pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
306 let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| Error::Internal {
307 message: format!("Failed to decode RowDatasetVersionSequence: {}", e),
308 location: location!(),
309 })?;
310
311 let segments = pb_sequence
312 .runs
313 .into_iter()
314 .map(|pb_run| {
315 let positions_pb = pb_run.span.ok_or_else(|| Error::Internal {
316 message: "Missing positions in RowDatasetVersionRun".to_string(),
317 location: location!(),
318 })?;
319 let segment = U64Segment::try_from(positions_pb)?;
320 Ok(RowDatasetVersionRun {
321 span: segment,
322 version: pb_run.version,
323 })
324 })
325 .collect::<Result<Vec<_>>>()?;
326
327 Ok(RowDatasetVersionSequence { runs: segments })
328}
329
330pub fn rechunk_version_sequences(
332 sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
333 chunk_sizes: impl IntoIterator<Item = u64>,
334 allow_incomplete: bool,
335) -> Result<Vec<RowDatasetVersionSequence>> {
336 let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
337 let total_chunks = chunk_sizes_vec.len();
338 let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
339
340 let mut run_iter = sequences
341 .into_iter()
342 .flat_map(|sequence| sequence.runs.into_iter())
343 .peekable();
344
345 let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
346 Error::invalid_input(
347 format!(
348 "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
349 chunk_index, expected_chunk_size, remaining
350 ),
351 location!(),
352 )
353 };
354
355 let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
356 Error::invalid_input(
357 format!(
358 "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
359 processed_chunks, total_chunk_sizes
360 ),
361 location!(),
362 )
363 };
364
365 let mut segment_offset = 0_u64;
366
367 for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
368 let chunk_size = *chunk_size;
369 let mut out_seq = RowDatasetVersionSequence::new();
370 let mut remaining = chunk_size;
371
372 while remaining > 0 {
373 let remaining_in_segment = run_iter
374 .peek()
375 .map_or(0, |run| run.span.len() as u64 - segment_offset);
376
377 if remaining_in_segment == 0 {
378 if run_iter.next().is_some() {
379 segment_offset = 0;
380 continue;
381 } else if allow_incomplete {
382 break;
383 } else {
384 return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
385 }
386 }
387
388 match remaining_in_segment.cmp(&remaining) {
389 std::cmp::Ordering::Greater => {
390 let run = run_iter.peek().unwrap();
391 let seg = run.span.slice(segment_offset as usize, remaining as usize);
392 out_seq.runs.push(RowDatasetVersionRun {
393 span: seg,
394 version: run.version,
395 });
396 segment_offset += remaining;
397 remaining = 0;
398 }
399 std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
400 let run = run_iter.next().ok_or_else(|| {
401 too_few_segments_error(chunk_index, chunk_size, remaining)
402 })?;
403 let seg = run
404 .span
405 .slice(segment_offset as usize, remaining_in_segment as usize);
406 out_seq.runs.push(RowDatasetVersionRun {
407 span: seg,
408 version: run.version,
409 });
410 segment_offset = 0;
411 remaining -= remaining_in_segment;
412 }
413 }
414 }
415
416 chunked_sequences.push(out_seq);
417 }
418
419 if run_iter.peek().is_some() {
420 return Err(too_many_segments_error(
421 chunked_sequences.len(),
422 total_chunks,
423 ));
424 }
425
426 Ok(chunked_sequences)
427}
428
429pub fn build_version_meta(
431 fragment: &Fragment,
432 current_version: u64,
433) -> Option<RowDatasetVersionMeta> {
434 if let Some(physical_rows) = fragment.physical_rows {
435 if physical_rows > 0 {
436 if fragment.row_id_meta.is_none() {
438 panic!("Can not find row id meta, please make sure you have enabled stable row id.")
439 }
440
441 let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
445 physical_rows as u64,
446 current_version,
447 );
448
449 return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
450 }
451 }
452 None
453}
454
455pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
459 fragment: &mut Fragment,
460 current_version: u64,
461) -> Result<()> {
462 let row_count = if let Some(pr) = fragment.physical_rows {
463 pr as u64
464 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
465 match row_id_meta {
466 crate::format::RowIdMeta::Inline(data) => {
467 let sequence = read_row_ids(data).unwrap();
468 sequence.len()
469 }
470 crate::format::RowIdMeta::External(_file) => 0,
472 }
473 } else {
474 0
475 };
476
477 if row_count > 0 {
478 let version_seq =
479 RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
480 let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
481 fragment.last_updated_at_version_meta = Some(version_meta);
482 }
483
484 Ok(())
485}
486
487pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
493 fragment: &mut Fragment,
494 updated_offsets: &[usize],
495 current_version: u64,
496 prev_version: u64,
497) -> Result<()> {
498 let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
500 pr as u64
501 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
502 match row_id_meta {
503 crate::format::RowIdMeta::Inline(data) => {
504 let sequence = read_row_ids(data).unwrap();
505 sequence.len()
506 }
507 crate::format::RowIdMeta::External(_file) => {
508 todo!("External file loading not yet implemented")
510 }
511 }
512 } else {
513 0
514 };
515
516 if row_count_u64 > 0 {
517 let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
519 if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
520 if let Ok(base_seq) = meta.load_sequence() {
521 for pos in 0..(row_count_u64 as usize) {
522 base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
523 }
524 } else {
525 base_versions.resize(row_count_u64 as usize, prev_version);
526 }
527 } else {
528 base_versions.resize(row_count_u64 as usize, prev_version);
529 }
530
531 for &pos in updated_offsets {
533 if pos < base_versions.len() {
534 base_versions[pos] = current_version;
535 }
536 }
537
538 let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
540 if !base_versions.is_empty() {
541 let mut start = 0usize;
542 let mut curr_ver = base_versions[0];
543 for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
544 if ver != curr_ver {
545 runs.push(RowDatasetVersionRun {
546 span: U64Segment::Range(start as u64..idx as u64),
547 version: curr_ver,
548 });
549 start = idx;
550 curr_ver = ver;
551 }
552 }
553 runs.push(RowDatasetVersionRun {
554 span: U64Segment::Range(start as u64..base_versions.len() as u64),
555 version: curr_ver,
556 });
557 }
558 let new_seq = RowDatasetVersionSequence { runs };
559 let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
560 fragment.last_updated_at_version_meta = Some(new_meta);
561 }
562
563 Ok(())
564}
565
566impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
568 type Error = Error;
569
570 fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
571 match value {
572 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
573 Ok(Self::Inline(data))
574 }
575 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
576 file,
577 ) => Ok(Self::External(ExternalFile {
578 path: file.path,
579 offset: file.offset,
580 size: file.size,
581 })),
582 }
583 }
584}
585
586impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
587 type Error = Error;
588
589 fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
590 match value {
591 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
592 Ok(Self::Inline(data))
593 }
594 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
595 Ok(Self::External(ExternalFile {
596 path: file.path,
597 offset: file.offset,
598 size: file.size,
599 }))
600 }
601 }
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use super::*;
608
609 #[test]
610 fn test_version_random_access() {
611 let seq = RowDatasetVersionSequence {
612 runs: vec![
613 RowDatasetVersionRun {
614 span: U64Segment::Range(0..3),
615 version: 1,
616 },
617 RowDatasetVersionRun {
618 span: U64Segment::Range(0..2),
619 version: 2,
620 },
621 RowDatasetVersionRun {
622 span: U64Segment::Range(0..1),
623 version: 3,
624 },
625 ],
626 };
627 assert_eq!(seq.version_at(0), Some(1));
628 assert_eq!(seq.version_at(2), Some(1));
629 assert_eq!(seq.version_at(3), Some(2));
630 assert_eq!(seq.version_at(4), Some(2));
631 assert_eq!(seq.version_at(5), Some(3));
632 assert_eq!(seq.version_at(6), None);
633 }
634
635 #[test]
636 fn test_serialization_round_trip() {
637 let seq = RowDatasetVersionSequence {
638 runs: vec![
639 RowDatasetVersionRun {
640 span: U64Segment::Range(0..4),
641 version: 42,
642 },
643 RowDatasetVersionRun {
644 span: U64Segment::Range(0..3),
645 version: 99,
646 },
647 ],
648 };
649 let bytes = write_dataset_versions(&seq);
650 let seq2 = read_dataset_versions(&bytes).unwrap();
651 assert_eq!(seq2.runs.len(), 2);
652 assert_eq!(seq2.len(), 7);
653 assert_eq!(seq2.version_at(0), Some(42));
654 assert_eq!(seq2.version_at(5), Some(99));
655 }
656
657 #[test]
658 fn test_get_version_for_row_id() {
659 let seq = RowDatasetVersionSequence {
660 runs: vec![
661 RowDatasetVersionRun {
662 span: U64Segment::Range(0..2),
663 version: 8,
664 },
665 RowDatasetVersionRun {
666 span: U64Segment::Range(0..2),
667 version: 9,
668 },
669 ],
670 };
671 let rows = RowIdSequence::from(10..14); assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
673 assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
674 assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
675 assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
676 assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
677 }
678}