1use deepsize::DeepSizeOf;
11use lance_core::Error;
12use lance_core::Result;
13use prost::Message;
14use serde::{Deserialize, Serialize};
15
16use crate::format::{ExternalFile, Fragment, pb};
17use crate::rowids::segment::U64Segment;
18use crate::rowids::{RowIdSequence, read_row_ids};
19
20#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf)]
26pub struct RowDatasetVersionRun {
27 pub span: U64Segment,
28 pub version: u64,
29}
30
31impl RowDatasetVersionRun {
32 pub fn len(&self) -> usize {
34 self.span.len()
35 }
36
37 pub fn is_empty(&self) -> bool {
39 self.span.is_empty()
40 }
41
42 pub fn version(&self) -> u64 {
44 self.version
45 }
46}
47
48#[derive(Debug, Clone, PartialEq, Eq, DeepSizeOf, Default)]
54pub struct RowDatasetVersionSequence {
55 pub runs: Vec<RowDatasetVersionRun>,
56}
57
58impl RowDatasetVersionSequence {
59 pub fn new() -> Self {
61 Self { runs: Vec::new() }
62 }
63
64 pub fn from_uniform_row_count(row_count: u64, version: u64) -> Self {
66 if row_count == 0 {
67 return Self::new();
68 }
69 let run = RowDatasetVersionRun {
70 span: U64Segment::Range(0..row_count),
71 version,
72 };
73 Self { runs: vec![run] }
74 }
75
76 pub fn len(&self) -> u64 {
78 self.runs.iter().map(|s| s.len() as u64).sum()
79 }
80
81 pub fn is_empty(&self) -> bool {
83 self.runs.is_empty() || self.runs.iter().all(|s| s.is_empty())
84 }
85
86 pub fn versions(&self) -> VersionsIter<'_> {
88 VersionsIter::new(&self.runs)
89 }
90
91 pub fn version_at(&self, index: usize) -> Option<u64> {
93 let mut offset = 0usize;
94 for run in &self.runs {
95 let len = run.len();
96 if index < offset + len {
97 return Some(run.version());
98 }
99 offset += len;
100 }
101 None
102 }
103
104 pub fn get_version_for_row_id(&self, row_ids: &RowIdSequence, row_id: u64) -> Option<u64> {
108 let mut offset = 0usize;
109 for seg in &row_ids.0 {
110 if seg.range().is_some_and(|r| r.contains(&row_id))
111 && let Some(local) = seg.position(row_id)
112 {
113 return self.version_at(offset + local);
114 }
115 offset += seg.len();
116 }
117 None
118 }
119
120 pub fn rows_with_version_greater_than(
122 &self,
123 row_ids: &RowIdSequence,
124 threshold: u64,
125 ) -> Vec<u64> {
126 row_ids
127 .iter()
128 .zip(self.versions())
129 .filter_map(|(rid, v)| if v > threshold { Some(rid) } else { None })
130 .collect()
131 }
132
133 pub fn mask(&mut self, positions: impl IntoIterator<Item = u32>) -> Result<()> {
135 let mut local_positions: Vec<u32> = Vec::new();
136 let mut positions_iter = positions.into_iter();
137 let mut curr_position = positions_iter.next();
138 let mut offset: usize = 0;
139 let mut cutoff: usize = 0;
140
141 for run in self.runs.iter_mut() {
142 cutoff += run.span.len();
143 while let Some(position) = curr_position {
144 if position as usize >= cutoff {
145 break;
146 }
147 local_positions.push(position - offset as u32);
148 curr_position = positions_iter.next();
149 }
150
151 if !local_positions.is_empty() {
152 run.span.mask(local_positions.as_slice());
153 local_positions.clear();
154 }
155 offset = cutoff;
156 }
157
158 self.runs.retain(|r| !r.span.is_empty());
159 Ok(())
160 }
161}
162
163pub struct VersionsIter<'a> {
165 runs: &'a [RowDatasetVersionRun],
166 run_idx: usize,
167 remaining_in_run: usize,
168 current_version: u64,
169}
170
171impl<'a> VersionsIter<'a> {
172 fn new(runs: &'a [RowDatasetVersionRun]) -> Self {
173 let mut it = Self {
174 runs,
175 run_idx: 0,
176 remaining_in_run: 0,
177 current_version: 0,
178 };
179 it.advance_run();
180 it
181 }
182
183 fn advance_run(&mut self) {
184 if self.run_idx < self.runs.len() {
185 let run = &self.runs[self.run_idx];
186 self.remaining_in_run = run.len();
187 self.current_version = run.version();
188 } else {
189 self.remaining_in_run = 0;
190 }
191 }
192}
193
194impl<'a> Iterator for VersionsIter<'a> {
195 type Item = u64;
196
197 fn next(&mut self) -> Option<Self::Item> {
198 if self.remaining_in_run == 0 {
199 self.run_idx += 1;
201 if self.run_idx >= self.runs.len() {
202 return None;
203 }
204 self.advance_run();
205 }
206 self.remaining_in_run = self.remaining_in_run.saturating_sub(1);
207 Some(self.current_version)
208 }
209}
210
211#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
214pub enum RowDatasetVersionMeta {
215 Inline(Vec<u8>),
217 External(ExternalFile),
219}
220
221impl RowDatasetVersionMeta {
222 pub fn from_sequence(sequence: &RowDatasetVersionSequence) -> lance_core::Result<Self> {
224 let bytes = write_dataset_versions(sequence);
225 Ok(Self::Inline(bytes))
226 }
227
228 pub fn from_external_file(path: String, offset: u64, size: u64) -> Self {
230 Self::External(ExternalFile { path, offset, size })
231 }
232
233 pub fn load_sequence(&self) -> lance_core::Result<RowDatasetVersionSequence> {
235 match self {
236 Self::Inline(data) => read_dataset_versions(data),
237 Self::External(_file) => {
238 todo!("External file loading not yet implemented")
239 }
240 }
241 }
242}
243
244pub fn last_updated_at_version_meta_to_pb(
246 meta: &Option<RowDatasetVersionMeta>,
247) -> Option<pb::data_fragment::LastUpdatedAtVersionSequence> {
248 meta.as_ref().map(|m| match m {
249 RowDatasetVersionMeta::Inline(data) => {
250 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(
251 data.clone(),
252 )
253 }
254 RowDatasetVersionMeta::External(file) => {
255 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
256 pb::ExternalFile {
257 path: file.path.clone(),
258 offset: file.offset,
259 size: file.size,
260 },
261 )
262 }
263 })
264}
265
266pub fn created_at_version_meta_to_pb(
268 meta: &Option<RowDatasetVersionMeta>,
269) -> Option<pb::data_fragment::CreatedAtVersionSequence> {
270 meta.as_ref().map(|m| match m {
271 RowDatasetVersionMeta::Inline(data) => {
272 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data.clone())
273 }
274 RowDatasetVersionMeta::External(file) => {
275 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(
276 pb::ExternalFile {
277 path: file.path.clone(),
278 offset: file.offset,
279 size: file.size,
280 },
281 )
282 }
283 })
284}
285
286pub fn write_dataset_versions(sequence: &RowDatasetVersionSequence) -> Vec<u8> {
288 let pb_sequence = pb::RowDatasetVersionSequence {
290 runs: sequence
291 .runs
292 .iter()
293 .map(|run| pb::RowDatasetVersionRun {
294 span: Some(pb::U64Segment::from(run.span.clone())),
295 version: run.version,
296 })
297 .collect(),
298 };
299
300 pb_sequence.encode_to_vec()
301}
302
303pub fn read_dataset_versions(data: &[u8]) -> lance_core::Result<RowDatasetVersionSequence> {
305 let pb_sequence = pb::RowDatasetVersionSequence::decode(data).map_err(|e| {
306 Error::internal(format!("Failed to decode RowDatasetVersionSequence: {}", e))
307 })?;
308
309 let segments = pb_sequence
310 .runs
311 .into_iter()
312 .map(|pb_run| {
313 let positions_pb = pb_run.span.ok_or_else(|| {
314 Error::internal("Missing positions in RowDatasetVersionRun".to_string())
315 })?;
316 let segment = U64Segment::try_from(positions_pb)?;
317 Ok(RowDatasetVersionRun {
318 span: segment,
319 version: pb_run.version,
320 })
321 })
322 .collect::<Result<Vec<_>>>()?;
323
324 Ok(RowDatasetVersionSequence { runs: segments })
325}
326
327pub fn rechunk_version_sequences(
329 sequences: impl IntoIterator<Item = RowDatasetVersionSequence>,
330 chunk_sizes: impl IntoIterator<Item = u64>,
331 allow_incomplete: bool,
332) -> Result<Vec<RowDatasetVersionSequence>> {
333 let chunk_sizes_vec: Vec<u64> = chunk_sizes.into_iter().collect();
334 let total_chunks = chunk_sizes_vec.len();
335 let mut chunked_sequences: Vec<RowDatasetVersionSequence> = Vec::with_capacity(total_chunks);
336
337 let mut run_iter = sequences
338 .into_iter()
339 .flat_map(|sequence| sequence.runs.into_iter())
340 .peekable();
341
342 let too_few_segments_error = |chunk_index: usize, expected_chunk_size: u64, remaining: u64| {
343 Error::invalid_input(format!(
344 "Got too few version runs for chunk {}. Expected chunk size: {}, remaining needed: {}",
345 chunk_index, expected_chunk_size, remaining
346 ))
347 };
348
349 let too_many_segments_error = |processed_chunks: usize, total_chunk_sizes: usize| {
350 Error::invalid_input(format!(
351 "Got too many version runs for the provided chunk lengths. Processed {} chunks out of {} expected",
352 processed_chunks, total_chunk_sizes
353 ))
354 };
355
356 let mut segment_offset = 0_u64;
357
358 for (chunk_index, chunk_size) in chunk_sizes_vec.iter().enumerate() {
359 let chunk_size = *chunk_size;
360 let mut out_seq = RowDatasetVersionSequence::new();
361 let mut remaining = chunk_size;
362
363 while remaining > 0 {
364 let remaining_in_segment = run_iter
365 .peek()
366 .map_or(0, |run| run.span.len() as u64 - segment_offset);
367
368 if remaining_in_segment == 0 {
369 if run_iter.next().is_some() {
370 segment_offset = 0;
371 continue;
372 } else if allow_incomplete {
373 break;
374 } else {
375 return Err(too_few_segments_error(chunk_index, chunk_size, remaining));
376 }
377 }
378
379 match remaining_in_segment.cmp(&remaining) {
380 std::cmp::Ordering::Greater => {
381 let run = run_iter.peek().unwrap();
382 let seg = run.span.slice(segment_offset as usize, remaining as usize);
383 out_seq.runs.push(RowDatasetVersionRun {
384 span: seg,
385 version: run.version,
386 });
387 segment_offset += remaining;
388 remaining = 0;
389 }
390 std::cmp::Ordering::Equal | std::cmp::Ordering::Less => {
391 let run = run_iter.next().ok_or_else(|| {
392 too_few_segments_error(chunk_index, chunk_size, remaining)
393 })?;
394 let seg = run
395 .span
396 .slice(segment_offset as usize, remaining_in_segment as usize);
397 out_seq.runs.push(RowDatasetVersionRun {
398 span: seg,
399 version: run.version,
400 });
401 segment_offset = 0;
402 remaining -= remaining_in_segment;
403 }
404 }
405 }
406
407 chunked_sequences.push(out_seq);
408 }
409
410 if run_iter.peek().is_some() {
411 return Err(too_many_segments_error(
412 chunked_sequences.len(),
413 total_chunks,
414 ));
415 }
416
417 Ok(chunked_sequences)
418}
419
420pub fn build_version_meta(
422 fragment: &Fragment,
423 current_version: u64,
424) -> Option<RowDatasetVersionMeta> {
425 if let Some(physical_rows) = fragment.physical_rows
426 && physical_rows > 0
427 {
428 if fragment.row_id_meta.is_none() {
430 panic!("Can not find row id meta, please make sure you have enabled stable row id.")
431 }
432
433 let version_sequence = RowDatasetVersionSequence::from_uniform_row_count(
437 physical_rows as u64,
438 current_version,
439 );
440
441 return Some(RowDatasetVersionMeta::from_sequence(&version_sequence).unwrap());
442 }
443 None
444}
445
446pub fn refresh_row_latest_update_meta_for_full_frag_rewrite_cols(
450 fragment: &mut Fragment,
451 current_version: u64,
452) -> Result<()> {
453 let row_count = if let Some(pr) = fragment.physical_rows {
454 pr as u64
455 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
456 match row_id_meta {
457 crate::format::RowIdMeta::Inline(data) => {
458 let sequence = read_row_ids(data).unwrap();
459 sequence.len()
460 }
461 crate::format::RowIdMeta::External(_file) => 0,
463 }
464 } else {
465 0
466 };
467
468 if row_count > 0 {
469 let version_seq =
470 RowDatasetVersionSequence::from_uniform_row_count(row_count, current_version);
471 let version_meta = RowDatasetVersionMeta::from_sequence(&version_seq)?;
472 fragment.last_updated_at_version_meta = Some(version_meta);
473 }
474
475 Ok(())
476}
477
478pub fn refresh_row_latest_update_meta_for_partial_frag_rewrite_cols(
484 fragment: &mut Fragment,
485 updated_offsets: &[usize],
486 current_version: u64,
487 prev_version: u64,
488) -> Result<()> {
489 let row_count_u64: u64 = if let Some(pr) = fragment.physical_rows {
491 pr as u64
492 } else if let Some(row_id_meta) = fragment.row_id_meta.as_ref() {
493 match row_id_meta {
494 crate::format::RowIdMeta::Inline(data) => {
495 let sequence = read_row_ids(data).unwrap();
496 sequence.len()
497 }
498 crate::format::RowIdMeta::External(_file) => {
499 todo!("External file loading not yet implemented")
501 }
502 }
503 } else {
504 0
505 };
506
507 if row_count_u64 > 0 {
508 let mut base_versions: Vec<u64> = Vec::with_capacity(row_count_u64 as usize);
510 if let Some(meta) = fragment.last_updated_at_version_meta.as_ref() {
511 if let Ok(base_seq) = meta.load_sequence() {
512 for pos in 0..(row_count_u64 as usize) {
513 base_versions.push(base_seq.version_at(pos).unwrap_or(prev_version));
514 }
515 } else {
516 base_versions.resize(row_count_u64 as usize, prev_version);
517 }
518 } else {
519 base_versions.resize(row_count_u64 as usize, prev_version);
520 }
521
522 for &pos in updated_offsets {
524 if pos < base_versions.len() {
525 base_versions[pos] = current_version;
526 }
527 }
528
529 let mut runs: Vec<RowDatasetVersionRun> = Vec::new();
531 if !base_versions.is_empty() {
532 let mut start = 0usize;
533 let mut curr_ver = base_versions[0];
534 for (idx, &ver) in base_versions.iter().enumerate().skip(1) {
535 if ver != curr_ver {
536 runs.push(RowDatasetVersionRun {
537 span: U64Segment::Range(start as u64..idx as u64),
538 version: curr_ver,
539 });
540 start = idx;
541 curr_ver = ver;
542 }
543 }
544 runs.push(RowDatasetVersionRun {
545 span: U64Segment::Range(start as u64..base_versions.len() as u64),
546 version: curr_ver,
547 });
548 }
549 let new_seq = RowDatasetVersionSequence { runs };
550 let new_meta = RowDatasetVersionMeta::from_sequence(&new_seq)?;
551 fragment.last_updated_at_version_meta = Some(new_meta);
552 }
553
554 Ok(())
555}
556
557impl TryFrom<pb::data_fragment::LastUpdatedAtVersionSequence> for RowDatasetVersionMeta {
559 type Error = Error;
560
561 fn try_from(value: pb::data_fragment::LastUpdatedAtVersionSequence) -> Result<Self> {
562 match value {
563 pb::data_fragment::LastUpdatedAtVersionSequence::InlineLastUpdatedAtVersions(data) => {
564 Ok(Self::Inline(data))
565 }
566 pb::data_fragment::LastUpdatedAtVersionSequence::ExternalLastUpdatedAtVersions(
567 file,
568 ) => Ok(Self::External(ExternalFile {
569 path: file.path,
570 offset: file.offset,
571 size: file.size,
572 })),
573 }
574 }
575}
576
577impl TryFrom<pb::data_fragment::CreatedAtVersionSequence> for RowDatasetVersionMeta {
578 type Error = Error;
579
580 fn try_from(value: pb::data_fragment::CreatedAtVersionSequence) -> Result<Self> {
581 match value {
582 pb::data_fragment::CreatedAtVersionSequence::InlineCreatedAtVersions(data) => {
583 Ok(Self::Inline(data))
584 }
585 pb::data_fragment::CreatedAtVersionSequence::ExternalCreatedAtVersions(file) => {
586 Ok(Self::External(ExternalFile {
587 path: file.path,
588 offset: file.offset,
589 size: file.size,
590 }))
591 }
592 }
593 }
594}
595
596#[cfg(test)]
597mod tests {
598 use super::*;
599
600 #[test]
601 fn test_version_random_access() {
602 let seq = RowDatasetVersionSequence {
603 runs: vec![
604 RowDatasetVersionRun {
605 span: U64Segment::Range(0..3),
606 version: 1,
607 },
608 RowDatasetVersionRun {
609 span: U64Segment::Range(0..2),
610 version: 2,
611 },
612 RowDatasetVersionRun {
613 span: U64Segment::Range(0..1),
614 version: 3,
615 },
616 ],
617 };
618 assert_eq!(seq.version_at(0), Some(1));
619 assert_eq!(seq.version_at(2), Some(1));
620 assert_eq!(seq.version_at(3), Some(2));
621 assert_eq!(seq.version_at(4), Some(2));
622 assert_eq!(seq.version_at(5), Some(3));
623 assert_eq!(seq.version_at(6), None);
624 }
625
626 #[test]
627 fn test_serialization_round_trip() {
628 let seq = RowDatasetVersionSequence {
629 runs: vec![
630 RowDatasetVersionRun {
631 span: U64Segment::Range(0..4),
632 version: 42,
633 },
634 RowDatasetVersionRun {
635 span: U64Segment::Range(0..3),
636 version: 99,
637 },
638 ],
639 };
640 let bytes = write_dataset_versions(&seq);
641 let seq2 = read_dataset_versions(&bytes).unwrap();
642 assert_eq!(seq2.runs.len(), 2);
643 assert_eq!(seq2.len(), 7);
644 assert_eq!(seq2.version_at(0), Some(42));
645 assert_eq!(seq2.version_at(5), Some(99));
646 }
647
648 #[test]
649 fn test_get_version_for_row_id() {
650 let seq = RowDatasetVersionSequence {
651 runs: vec![
652 RowDatasetVersionRun {
653 span: U64Segment::Range(0..2),
654 version: 8,
655 },
656 RowDatasetVersionRun {
657 span: U64Segment::Range(0..2),
658 version: 9,
659 },
660 ],
661 };
662 let rows = RowIdSequence::from(10..14); assert_eq!(seq.get_version_for_row_id(&rows, 10), Some(8));
664 assert_eq!(seq.get_version_for_row_id(&rows, 11), Some(8));
665 assert_eq!(seq.get_version_for_row_id(&rows, 12), Some(9));
666 assert_eq!(seq.get_version_for_row_id(&rows, 13), Some(9));
667 assert_eq!(seq.get_version_for_row_id(&rows, 99), None);
668 }
669}