1use std::sync::Arc;
11
12use deepsize::DeepSizeOf;
13use lance_core::Error;
14use lance_core::Result;
15use prost::Message;
16use serde::de::Deserializer;
17use serde::ser::Serializer;
18use serde::{Deserialize, Serialize};
19
20use crate::format::{ExternalFile, Fragment, pb};
21use crate::rowids::segment::U64Segment;
22use crate::rowids::{RowIdSequence, read_row_ids};
23
24#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
30pub struct RowDatasetVersionRun {
31 pub span: U64Segment,
32 pub version: u64,
33}
34
35impl RowDatasetVersionRun {
36 pub fn len(&self) -> usize {
38 self.span.len()
39 }
40
41 pub fn is_empty(&self) -> bool {
43 self.span.is_empty()
44 }
45
46 pub fn version(&self) -> u64 {
48 self.version
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
58pub struct RowDatasetVersionSequence {
59 pub runs: Vec<RowDatasetVersionRun>,
60}
61
62impl RowDatasetVersionSequence {
63 pub fn new() -> Self {
65 Self { runs: Vec::new() }
66 }
67
68 pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
70 if row_count == 0 {
71 return Self::new();
72 }
73 let run = RowDatasetVersionRun {
74 span: U64Segment::Range(0..row_count),
75 version,
76 };
77 Self { runs: vec![run] }
78 }
79
80 pub fn len(&self) -> u64 {
82 self.runs.iter().map(|s| s.len() as u64).sum()
83 }
84
85 pub fn is_empty(&self) -> bool {
87 self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
88 }
89
90 pub fn versions(&self) -> VersionsIter<'_> {
92 VersionsIter::new(&self.runs)
93 }
94
95 pub fn version_at(&self, index: usize) -> Option<u64> {
97 let mut offset = 0usize;
98 for run in &self.runs {
99 let len = run.len();
100 if index < offset + len {
101 return Some(run.version());
102 }
103 offset += len;
104 }
105 None
106 }
107
108 pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
112 let mut offset = 0usize;
113 for seg in &row_ids.0 {
114 if seg.range().is_some_and(|r| r.contains(&row_id))
115 && let Some(local) = seg.position(row_id)
116 {
117 return self.version_at(offset + local);
118 }
119 offset += seg.len();
120 }
121 None
122 }
123
124 pub fn rows_with_version_greater_than(
126 &self,
127 row_ids: &RowIdSequence,
128 threshold: u64,
129 ) -> Vec<u64> {
130 row_ids
131 .iter()
132 .zip(self.versions())
133 .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
134 .collect()
135 }
136
137 pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
139 let mut local_positions: Vec<u32> = Vec::new();
140 let mut positions_iter = positions.into_iter();
141 let mut curr_position = positions_iter.next();
142 let mut offset: usize = 0;
143 let mut cutoff: usize = 0;
144
145 for run in self.runs.iter_mut() {
146 cutoff += run.span.len();
147 while let Some(position) = curr_position {
148 if position as usize >= cutoff {
149 break;
150 }
151 local_positions.push(position - offset as u32);
152 curr_position = positions_iter.next();
153 }
154
155 if !local_positions.is_empty() {
156 run.span.mask(local_positions.as_slice());
157 local_positions.clear();
158 }
159 offset = cutoff;
160 }
161
162 self.runs.retain(|r| !r.span.is_empty());
163 Ok(())
164 }
165}
166
167pub struct VersionsIter<'a> {
169 runs: &'a [RowDatasetVersionRun],
170 run_idx: usize,
171 remaining_in_run: usize,
172 current_version: u64,
173}
174
175impl<'a> VersionsIter<'a> {
176 fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
177 let mut it = Self {
178 runs,
179 run_idx: 0,
180 remaining_in_run: 0,
181 current_version: 0,
182 };
183 it.advance_run();
184 it
185 }
186
187 fn advance_run(&mut self) {
188 if self.run_idx < self.runs.len() {
189 let run = &self.runs[self.run_idx];
190 self.remaining_in_run = run.len();
191 self.current_version = run.version();
192 } else {
193 self.remaining_in_run = 0;
194 }
195 }
196}
197
198impl<'a> Iterator for VersionsIter<'a> {
199 type Item = u64;
200
201 fn next(&mut self) -> Option<Self::Item> {
202 if self.remaining_in_run == 0 {
203 self.run_idx += 1;
205 if self.run_idx >= self.runs.len() {
206 return None;
207 }
208 self.advance_run();
209 }
210 self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
211 Some(self.current_version)
212 }
213}
214
215#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
221pub enum RowDatasetVersionMeta {
222 Inline(Arc<[u8]>),
224 External(ExternalFile),
226}
227
228impl Serialize for RowDatasetVersionMeta {
230 fn serialize<S: Serializer>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error> {
231 #[derive(Serialize)]
232 #[serde(untagged)]
233 enum Helper<'a> {
234 Inline { inline: &'a [u8] },
235 External { external: &'a ExternalFile },
236 }
237
238 match self {
239 Self::Inline(data) => Helper::Inline {
240 inline: data.as_ref(),
241 }
242 .serialize(serializer),
243 Self::External(file) => Helper::External { external: file }.serialize(serializer),
244 }
245 }
246}
247
248impl<'de> Deserialize<'de> for RowDatasetVersionMeta {
250 fn deserialize<D: Deserializer<'de>>(deserializer: D) -> std::result::Result<Self, D::Error> {
251 #[derive(Deserialize)]
252 #[serde(untagged)]
253 enum Helper {
254 Inline { inline: Vec<u8> },
255 External { external: ExternalFile },
256 }
257
258 match Helper::deserialize(deserializer)? {
259 Helper::Inline { inline } => Ok(Self::Inline(Arc::from(inline))),
260 Helper::External { external } => Ok(Self::External(external)),
261 }
262 }
263}
264
265impl RowDatasetVersionMeta {
266 pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
268 let bytes = write_dataset_versions(sequence);
269 Ok(Self::Inline(Arc::from(bytes)))
270 }
271
272 pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
274 Self::External(ExternalFile { path, offset, size })
275 }
276
277 pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
279 match self {
280 Self::Inline(data) => read_dataset_versions(data),
281 Self::External(_file) => {
282 todo!("External file loading not yet implemented")
283 }
284 }
285 }
286}
287
288pub fn last_updated_at_version_meta_to_pb(
290 meta: &Option<RowDatasetVersionMeta>,
291) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
292 meta.as_ref().map(|m| match m {
293 RowDatasetVersionMeta::Inline(data) => {
294 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
295 data.to_vec(),
296 )
297 }
298 RowDatasetVersionMeta::External(file) => {
299 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
300 pb::ExternalFile {
301 path: file.path.clone(),
302 offset: file.offset,
303 size: file.size,
304 },
305 )
306 }
307 })
308}
309
310pub fn created_at_version_meta_to_pb(
312 meta: &Option<RowDatasetVersionMeta>,
313) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
314 meta.as_ref().map(|m| match m {
315 RowDatasetVersionMeta::Inline(data) => {
316 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.to_vec())
317 }
318 RowDatasetVersionMeta::External(file) => {
319 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
320 pb::ExternalFile {
321 path: file.path.clone(),
322 offset: file.offset,
323 size: file.size,
324 },
325 )
326 }
327 })
328}
329
330pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
332 let pb_sequence = pb::RowDatasetVersionSequence {
334 runs: sequence
335 .runs
336 .iter()
337 .map(|run| pb::RowDatasetVersionRun {
338 span: Some(pb::U64Segment::from(run.span.clone())),
339 version: run.version,
340 })
341 .collect(),
342 };
343
344 pb_sequence.encode_to_vec()
345}
346
347pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
349 let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| {
350 Error::internal(format!("Failed to decode RowDatasetVersionSequence: {}", e))
351 })?;
352
353 let segments = pb_sequence
354 .runs
355 .into_iter()
356 .map(|pb_run| {
357 let positions_pb = pb_run.span.ok_or_else(|| {
358 Error::internal("Missing positions in RowDatasetVersionRun".to_string())
359 })?;
360 let segment = U64Segment::try_from(positions_pb)?;
361 Ok(RowDatasetVersionRun {
362 span: segment,
363 version: pb_run.version,
364 })
365 })
366 .collect::<Result<Vec<_>>>()?;
367
368 Ok(RowDatasetVersionSequence { runs: segments })
369}
370
371pub fn rechunk_version_sequences(
373 sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
374 chunk_sizes: impl IntoIterator<Item = u64>,
375 allow_incomplete: bool,
376) -> Result<Vec<RowDatasetVersionSequence>> {
377 let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
378 let total_chunks = chunk_sizes_vec.len();
379 let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
380
381 let mut run_iter = sequences
382 .into_iter()
383 .flat_map(|sequence| sequence.runs.into_iter())
384 .peekable();
385
386 let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
387 Error::invalid_input(format!(
388 "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
389 chunk_index, expected_chunk_size, remaining
390 ))
391 };
392
393 let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
394 Error::invalid_input(format!(
395 "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
396 processed_chunks, total_chunk_sizes
397 ))
398 };
399
400 let mut segment_offset = 0_u64;
401
402 for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
403 let chunk_size = *chunk_size;
404 let mut out_seq = RowDatasetVersionSequence::new();
405 let mut remaining = chunk_size;
406
407 while remaining > 0 {
408 let remaining_in_segment = run_iter
409 .peek()
410 .map_or(0, |run| run.span.len() as u64 - segment_offset);
411
412 if remaining_in_segment == 0 {
413 if run_iter.next().is_some() {
414 segment_offset = 0;
415 continue;
416 } else if allow_incomplete {
417 break;
418 } else {
419 return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
420 }
421 }
422
423 match remaining_in_segment.cmp(&remaining) {
424 std::cmp::Ordering::Greater => {
425 let run = run_iter.peek().unwrap();
426 let seg = run.span.slice(segment_offset as usize, remaining as usize);
427 out_seq.runs.push(RowDatasetVersionRun {
428 span: seg,
429 version: run.version,
430 });
431 segment_offset += remaining;
432 remaining = 0;
433 }
434 std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
435 let run = run_iter.next().ok_or_else(|| {
436 too_few_segments_error(chunk_index, chunk_size, remaining)
437 })?;
438 let seg = run
439 .span
440 .slice(segment_offset as usize, remaining_in_segment as usize);
441 out_seq.runs.push(RowDatasetVersionRun {
442 span: seg,
443 version: run.version,
444 });
445 segment_offset = 0;
446 remaining -= remaining_in_segment;
447 }
448 }
449 }
450
451 chunked_sequences.push(out_seq);
452 }
453
454 if run_iter.peek().is_some() {
455 return Err(too_many_segments_error(
456 chunked_sequences.len(),
457 total_chunks,
458 ));
459 }
460
461 Ok(chunked_sequences)
462}
463
464pub fn build_version_meta(
466 fragment: &Fragment,
467 current_version: u64,
468) -> Option<RowDatasetVersionMeta> {
469 if let Some(physical_rows) = fragment.physical_rows
470 && physical_rows > 0
471 {
472 if fragment.row_id_meta.is_none() {
474 panic!("Can not find row id meta, please make sure you have enabled stable row id.")
475 }
476
477 let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
481 physical_rows as u64,
482 current_version,
483 );
484
485 return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
486 }
487 None
488}
489
490pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
494 fragment: &mut Fragment,
495 current_version: u64,
496) -> Result<()> {
497 let row_count = if let Some(pr) = fragment.physical_rows {
498 pr as u64
499 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
500 match row_id_meta {
501 crate::format::RowIdMeta::Inline(data) => {
502 let sequence = read_row_ids(data).unwrap();
503 sequence.len()
504 }
505 crate::format::RowIdMeta::External(_file) => 0,
507 }
508 } else {
509 0
510 };
511
512 if row_count > 0 {
513 let version_seq =
514 RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
515 let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
516 fragment.last_updated_at_version_meta = Some(version_meta);
517 }
518
519 Ok(())
520}
521
522pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
528 fragment: &mut Fragment,
529 updated_offsets: &[usize],
530 current_version: u64,
531 prev_version: u64,
532) -> Result<()> {
533 let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
535 pr as u64
536 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
537 match row_id_meta {
538 crate::format::RowIdMeta::Inline(data) => {
539 let sequence = read_row_ids(data).unwrap();
540 sequence.len()
541 }
542 crate::format::RowIdMeta::External(_file) => {
543 todo!("External file loading not yet implemented")
545 }
546 }
547 } else {
548 0
549 };
550
551 if row_count_u64 > 0 {
552 let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
554 if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
555 if let Ok(base_seq) = meta.load_sequence() {
556 for pos in 0..(row_count_u64 as usize) {
557 base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
558 }
559 } else {
560 base_versions.resize(row_count_u64 as usize, prev_version);
561 }
562 } else {
563 base_versions.resize(row_count_u64 as usize, prev_version);
564 }
565
566 for &pos in updated_offsets {
568 if pos < base_versions.len() {
569 base_versions[pos] = current_version;
570 }
571 }
572
573 let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
575 if !base_versions.is_empty() {
576 let mut start = 0usize;
577 let mut curr_ver = base_versions[0];
578 for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
579 if ver != curr_ver {
580 runs.push(RowDatasetVersionRun {
581 span: U64Segment::Range(start as u64..idx as u64),
582 version: curr_ver,
583 });
584 start = idx;
585 curr_ver = ver;
586 }
587 }
588 runs.push(RowDatasetVersionRun {
589 span: U64Segment::Range(start as u64..base_versions.len() as u64),
590 version: curr_ver,
591 });
592 }
593 let new_seq = RowDatasetVersionSequence { runs };
594 let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
595 fragment.last_updated_at_version_meta = Some(new_meta);
596 }
597
598 Ok(())
599}
600
601impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
603 type Error = Error;
604
605 fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
606 match value {
607 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
608 Ok(Self::Inline(Arc::from(data)))
609 }
610 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
611 file,
612 ) => Ok(Self::External(ExternalFile {
613 path: file.path,
614 offset: file.offset,
615 size: file.size,
616 })),
617 }
618 }
619}
620
621impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
622 type Error = Error;
623
624 fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
625 match value {
626 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
627 Ok(Self::Inline(Arc::from(data)))
628 }
629 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
630 Ok(Self::External(ExternalFile {
631 path: file.path,
632 offset: file.offset,
633 size: file.size,
634 }))
635 }
636 }
637 }
638}
639
640#[cfg(test)]
641mod tests {
642 use super::*;
643
644 #[test]
645 fn test_version_random_access() {
646 let seq = RowDatasetVersionSequence {
647 runs: vec![
648 RowDatasetVersionRun {
649 span: U64Segment::Range(0..3),
650 version: 1,
651 },
652 RowDatasetVersionRun {
653 span: U64Segment::Range(0..2),
654 version: 2,
655 },
656 RowDatasetVersionRun {
657 span: U64Segment::Range(0..1),
658 version: 3,
659 },
660 ],
661 };
662 assert_eq!(seq.version_at(0), Some(1));
663 assert_eq!(seq.version_at(2), Some(1));
664 assert_eq!(seq.version_at(3), Some(2));
665 assert_eq!(seq.version_at(4), Some(2));
666 assert_eq!(seq.version_at(5), Some(3));
667 assert_eq!(seq.version_at(6), None);
668 }
669
670 #[test]
671 fn test_serialization_round_trip() {
672 let seq = RowDatasetVersionSequence {
673 runs: vec![
674 RowDatasetVersionRun {
675 span: U64Segment::Range(0..4),
676 version: 42,
677 },
678 RowDatasetVersionRun {
679 span: U64Segment::Range(0..3),
680 version: 99,
681 },
682 ],
683 };
684 let bytes = write_dataset_versions(&seq);
685 let seq2 = read_dataset_versions(&bytes).unwrap();
686 assert_eq!(seq2.runs.len(), 2);
687 assert_eq!(seq2.len(), 7);
688 assert_eq!(seq2.version_at(0), Some(42));
689 assert_eq!(seq2.version_at(5), Some(99));
690 }
691
692 #[test]
693 fn test_get_version_for_row_id() {
694 let seq = RowDatasetVersionSequence {
695 runs: vec![
696 RowDatasetVersionRun {
697 span: U64Segment::Range(0..2),
698 version: 8,
699 },
700 RowDatasetVersionRun {
701 span: U64Segment::Range(0..2),
702 version: 9,
703 },
704 ],
705 };
706 let rows = RowIdSequence::from(10..14); assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
708 assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
709 assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
710 assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
711 assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
712 }
713}