1use super::ManifestWriteConfig;
49use super::write::merge_insert::inserted_rows::KeyExistenceFilter;
50use crate::dataset::transaction::UpdateMode::RewriteRows;
51use crate::index::mem_wal::update_mem_wal_index_merged_generations;
52use crate::utils::temporal::timestamp_to_nanos;
53use deepsize::DeepSizeOf;
54use lance_core::{Error, Result, datatypes::Schema};
55use lance_file::{datatypes::Fields, version::LanceFileVersion};
56use lance_index::mem_wal::MergedGeneration;
57use lance_index::{frag_reuse::FRAG_REUSE_INDEX_NAME, is_system_index};
58use lance_io::object_store::ObjectStore;
59use lance_table::feature_flags::{FLAG_STABLE_ROW_IDS, apply_feature_flags};
60use lance_table::rowids::read_row_ids;
61use lance_table::{
62 format::{
63 BasePath, DataFile, DataStorageFormat, Fragment, IndexMetadata, Manifest, RowIdMeta, pb,
64 },
65 io::{
66 commit::CommitHandler,
67 manifest::{read_manifest, read_manifest_indexes},
68 },
69 rowids::{RowIdSequence, write_row_ids},
70};
71use object_store::path::Path;
72use roaring::RoaringBitmap;
73use std::cmp::Ordering;
74use std::{
75 collections::{HashMap, HashSet},
76 sync::Arc,
77};
78use uuid::Uuid;
79
80#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
85pub struct Transaction {
86 pub read_version: u64,
89 pub uuid: String,
90 pub operation: Operation,
91 pub tag: Option<String>,
92 pub transaction_properties: Option<Arc<HashMap<String, String>>>,
93}
94
95#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
96pub struct DataReplacementGroup(pub u64, pub DataFile);
97
98#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
100pub struct UpdateMapEntry {
101 pub key: String,
103 pub value: Option<String>,
105}
106
107impl From<(String, Option<String>)> for UpdateMapEntry {
108 fn from((key, value): (String, Option<String>)) -> Self {
109 Self { key, value }
110 }
111}
112
113impl From<(String, String)> for UpdateMapEntry {
114 fn from((key, value): (String, String)) -> Self {
115 Self::from((key, Some(value)))
116 }
117}
118
119impl From<(&str, Option<&str>)> for UpdateMapEntry {
120 fn from((key, value): (&str, Option<&str>)) -> Self {
121 Self {
122 key: key.to_string(),
123 value: value.map(str::to_owned),
124 }
125 }
126}
127
128impl From<(&str, &str)> for UpdateMapEntry {
129 fn from((key, value): (&str, &str)) -> Self {
130 Self::from((key, Some(value)))
131 }
132}
133
134#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
136pub struct UpdateMap {
137 pub update_entries: Vec<UpdateMapEntry>,
138 pub replace: bool,
141}
142
143#[derive(Debug, Clone, DeepSizeOf)]
145pub enum Operation {
146 Append { fragments: Vec<Fragment> },
149 Delete {
153 updated_fragments: Vec<Fragment>,
154 deleted_fragment_ids: Vec<u64>,
155 predicate: String,
156 },
157 Overwrite {
160 fragments: Vec<Fragment>,
161 schema: Schema,
162 config_upsert_values: Option<HashMap<String, String>>,
163 initial_bases: Option<Vec<BasePath>>,
164 },
165 CreateIndex {
167 new_indices: Vec<IndexMetadata>,
170 removed_indices: Vec<IndexMetadata>,
172 },
173 Rewrite {
181 groups: Vec<RewriteGroup>,
183 rewritten_indices: Vec<RewrittenIndex>,
185 frag_reuse_index: Option<IndexMetadata>,
187 },
188 DataReplacement {
205 replacements: Vec<DataReplacementGroup>,
206 },
207 Merge {
211 fragments: Vec<Fragment>,
212 schema: Schema,
213 },
214 Restore { version: u64 },
216 ReserveFragments { num_fragments: u32 },
221
222 Update {
239 removed_fragment_ids: Vec<u64>,
241 updated_fragments: Vec<Fragment>,
243 new_fragments: Vec<Fragment>,
245 fields_modified: Vec<u32>,
247 merged_generations: Vec<MergedGeneration>,
249 fields_for_preserving_frag_bitmap: Vec<u32>,
252 update_mode: Option<UpdateMode>,
254 inserted_rows_filter: Option<KeyExistenceFilter>,
257 },
258
259 Project { schema: Schema },
261
262 UpdateConfig {
264 config_updates: Option<UpdateMap>,
265 table_metadata_updates: Option<UpdateMap>,
266 schema_metadata_updates: Option<UpdateMap>,
267 field_metadata_updates: HashMap<i32, UpdateMap>,
268 },
269 UpdateMemWalState {
273 merged_generations: Vec<MergedGeneration>,
274 },
275
276 Clone {
278 is_shallow: bool,
279 ref_name: Option<String>,
280 ref_version: u64,
281 ref_path: String,
282 branch_name: Option<String>,
283 },
284
285 UpdateBases {
287 new_bases: Vec<BasePath>,
289 },
290}
291
292#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
293pub enum UpdateMode {
294 RewriteRows,
298
299 RewriteColumns,
303}
304
305impl std::fmt::Display for Operation {
306 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307 match self {
308 Self::Append { .. } => write!(f, "Append"),
309 Self::Delete { .. } => write!(f, "Delete"),
310 Self::Overwrite { .. } => write!(f, "Overwrite"),
311 Self::CreateIndex { .. } => write!(f, "CreateIndex"),
312 Self::Rewrite { .. } => write!(f, "Rewrite"),
313 Self::Merge { .. } => write!(f, "Merge"),
314 Self::Restore { .. } => write!(f, "Restore"),
315 Self::ReserveFragments { .. } => write!(f, "ReserveFragments"),
316 Self::Update { .. } => write!(f, "Update"),
317 Self::Project { .. } => write!(f, "Project"),
318 Self::UpdateConfig { .. } => write!(f, "UpdateConfig"),
319 Self::DataReplacement { .. } => write!(f, "DataReplacement"),
320 Self::Clone { .. } => write!(f, "Clone"),
321 Self::UpdateMemWalState { .. } => write!(f, "UpdateMemWalState"),
322 Self::UpdateBases { .. } => write!(f, "UpdateBases"),
323 }
324 }
325}
326
327impl From<&Transaction> for lance_table::format::Transaction {
328 fn from(value: &Transaction) -> Self {
329 let pb_transaction: pb::Transaction = value.into();
330 Self {
331 inner: pb_transaction,
332 }
333 }
334}
335
336impl PartialEq for Operation {
337 fn eq(&self, other: &Self) -> bool {
338 fn compare_vec<T: PartialEq>(a: &[T], b: &[T]) -> bool {
344 a.len() == b.len() && a.iter().all(|f| b.contains(f))
345 }
346 match (self, other) {
347 (Self::Append { fragments: a }, Self::Append { fragments: b }) => compare_vec(a, b),
348 (
349 Self::Clone {
350 is_shallow: a_is_shallow,
351 ref_name: a_ref_name,
352 ref_version: a_ref_version,
353 ref_path: a_source_path,
354 branch_name: a_branch_name,
355 },
356 Self::Clone {
357 is_shallow: b_is_shallow,
358 ref_name: b_ref_name,
359 ref_version: b_ref_version,
360 ref_path: b_source_path,
361 branch_name: b_branch_name,
362 },
363 ) => {
364 a_is_shallow == b_is_shallow
365 && a_ref_name == b_ref_name
366 && a_ref_version == b_ref_version
367 && a_source_path == b_source_path
368 && a_branch_name == b_branch_name
369 }
370 (
371 Self::Delete {
372 updated_fragments: a_updated,
373 deleted_fragment_ids: a_deleted,
374 predicate: a_predicate,
375 },
376 Self::Delete {
377 updated_fragments: b_updated,
378 deleted_fragment_ids: b_deleted,
379 predicate: b_predicate,
380 },
381 ) => {
382 compare_vec(a_updated, b_updated)
383 && compare_vec(a_deleted, b_deleted)
384 && a_predicate == b_predicate
385 }
386 (
387 Self::Overwrite {
388 fragments: a_fragments,
389 schema: a_schema,
390 config_upsert_values: a_config,
391 initial_bases: a_initial,
392 },
393 Self::Overwrite {
394 fragments: b_fragments,
395 schema: b_schema,
396 config_upsert_values: b_config,
397 initial_bases: b_initial,
398 },
399 ) => {
400 compare_vec(a_fragments, b_fragments)
401 && a_schema == b_schema
402 && a_config == b_config
403 && a_initial == b_initial
404 }
405 (
406 Self::CreateIndex {
407 new_indices: a_new,
408 removed_indices: a_removed,
409 },
410 Self::CreateIndex {
411 new_indices: b_new,
412 removed_indices: b_removed,
413 },
414 ) => compare_vec(a_new, b_new) && compare_vec(a_removed, b_removed),
415 (
416 Self::Rewrite {
417 groups: a_groups,
418 rewritten_indices: a_indices,
419 frag_reuse_index: a_frag_reuse_index,
420 },
421 Self::Rewrite {
422 groups: b_groups,
423 rewritten_indices: b_indices,
424 frag_reuse_index: b_frag_reuse_index,
425 },
426 ) => {
427 compare_vec(a_groups, b_groups)
428 && compare_vec(a_indices, b_indices)
429 && a_frag_reuse_index == b_frag_reuse_index
430 }
431 (
432 Self::Merge {
433 fragments: a_fragments,
434 schema: a_schema,
435 },
436 Self::Merge {
437 fragments: b_fragments,
438 schema: b_schema,
439 },
440 ) => compare_vec(a_fragments, b_fragments) && a_schema == b_schema,
441 (Self::Restore { version: a }, Self::Restore { version: b }) => a == b,
442 (
443 Self::ReserveFragments { num_fragments: a },
444 Self::ReserveFragments { num_fragments: b },
445 ) => a == b,
446 (
447 Self::Update {
448 removed_fragment_ids: a_removed,
449 updated_fragments: a_updated,
450 new_fragments: a_new,
451 fields_modified: a_fields,
452 merged_generations: a_merged_generations,
453 fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap,
454 update_mode: a_update_mode,
455 inserted_rows_filter: a_inserted_rows_filter,
456 },
457 Self::Update {
458 removed_fragment_ids: b_removed,
459 updated_fragments: b_updated,
460 new_fragments: b_new,
461 fields_modified: b_fields,
462 merged_generations: b_merged_generations,
463 fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap,
464 update_mode: b_update_mode,
465 inserted_rows_filter: b_inserted_rows_filter,
466 },
467 ) => {
468 compare_vec(a_removed, b_removed)
469 && compare_vec(a_updated, b_updated)
470 && compare_vec(a_new, b_new)
471 && compare_vec(a_fields, b_fields)
472 && compare_vec(a_merged_generations, b_merged_generations)
473 && compare_vec(
474 a_fields_for_preserving_frag_bitmap,
475 b_fields_for_preserving_frag_bitmap,
476 )
477 && a_update_mode == b_update_mode
478 && a_inserted_rows_filter == b_inserted_rows_filter
479 }
480 (Self::Project { schema: a }, Self::Project { schema: b }) => a == b,
481 (
482 Self::UpdateConfig {
483 config_updates: a_config,
484 table_metadata_updates: a_table_metadata,
485 schema_metadata_updates: a_schema,
486 field_metadata_updates: a_field,
487 },
488 Self::UpdateConfig {
489 config_updates: b_config,
490 table_metadata_updates: b_table_metadata,
491 schema_metadata_updates: b_schema,
492 field_metadata_updates: b_field,
493 },
494 ) => {
495 a_config == b_config
496 && a_table_metadata == b_table_metadata
497 && a_schema == b_schema
498 && a_field == b_field
499 }
500 (
501 Self::DataReplacement { replacements: a },
502 Self::DataReplacement { replacements: b },
503 ) => a.len() == b.len() && a.iter().all(|r| b.contains(r)),
504 (Self::Append { .. }, Self::Delete { .. }) => {
508 std::mem::discriminant(self) == std::mem::discriminant(other)
509 }
510 (Self::Append { .. }, Self::Overwrite { .. }) => {
511 std::mem::discriminant(self) == std::mem::discriminant(other)
512 }
513 (Self::Append { .. }, Self::CreateIndex { .. }) => {
514 std::mem::discriminant(self) == std::mem::discriminant(other)
515 }
516 (Self::Append { .. }, Self::Rewrite { .. }) => {
517 std::mem::discriminant(self) == std::mem::discriminant(other)
518 }
519 (Self::Append { .. }, Self::Merge { .. }) => {
520 std::mem::discriminant(self) == std::mem::discriminant(other)
521 }
522 (Self::Append { .. }, Self::Restore { .. }) => {
523 std::mem::discriminant(self) == std::mem::discriminant(other)
524 }
525 (Self::Append { .. }, Self::ReserveFragments { .. }) => {
526 std::mem::discriminant(self) == std::mem::discriminant(other)
527 }
528 (Self::Append { .. }, Self::Update { .. }) => {
529 std::mem::discriminant(self) == std::mem::discriminant(other)
530 }
531 (Self::Append { .. }, Self::Project { .. }) => {
532 std::mem::discriminant(self) == std::mem::discriminant(other)
533 }
534 (Self::Append { .. }, Self::UpdateConfig { .. }) => {
535 std::mem::discriminant(self) == std::mem::discriminant(other)
536 }
537 (Self::Append { .. }, Self::DataReplacement { .. }) => {
538 std::mem::discriminant(self) == std::mem::discriminant(other)
539 }
540 (Self::Append { .. }, Self::UpdateMemWalState { .. }) => {
541 std::mem::discriminant(self) == std::mem::discriminant(other)
542 }
543 (Self::Append { .. }, Self::Clone { .. }) => {
544 std::mem::discriminant(self) == std::mem::discriminant(other)
545 }
546
547 (Self::Delete { .. }, Self::Append { .. }) => {
548 std::mem::discriminant(self) == std::mem::discriminant(other)
549 }
550 (Self::Delete { .. }, Self::Overwrite { .. }) => {
551 std::mem::discriminant(self) == std::mem::discriminant(other)
552 }
553 (Self::Delete { .. }, Self::CreateIndex { .. }) => {
554 std::mem::discriminant(self) == std::mem::discriminant(other)
555 }
556 (Self::Delete { .. }, Self::Rewrite { .. }) => {
557 std::mem::discriminant(self) == std::mem::discriminant(other)
558 }
559 (Self::Delete { .. }, Self::Merge { .. }) => {
560 std::mem::discriminant(self) == std::mem::discriminant(other)
561 }
562 (Self::Delete { .. }, Self::Restore { .. }) => {
563 std::mem::discriminant(self) == std::mem::discriminant(other)
564 }
565 (Self::Delete { .. }, Self::ReserveFragments { .. }) => {
566 std::mem::discriminant(self) == std::mem::discriminant(other)
567 }
568 (Self::Delete { .. }, Self::Update { .. }) => {
569 std::mem::discriminant(self) == std::mem::discriminant(other)
570 }
571 (Self::Delete { .. }, Self::Project { .. }) => {
572 std::mem::discriminant(self) == std::mem::discriminant(other)
573 }
574 (Self::Delete { .. }, Self::UpdateConfig { .. }) => {
575 std::mem::discriminant(self) == std::mem::discriminant(other)
576 }
577 (Self::Delete { .. }, Self::DataReplacement { .. }) => {
578 std::mem::discriminant(self) == std::mem::discriminant(other)
579 }
580 (Self::Delete { .. }, Self::UpdateMemWalState { .. }) => {
581 std::mem::discriminant(self) == std::mem::discriminant(other)
582 }
583 (Self::Delete { .. }, Self::Clone { .. }) => {
584 std::mem::discriminant(self) == std::mem::discriminant(other)
585 }
586
587 (Self::Overwrite { .. }, Self::Append { .. }) => {
588 std::mem::discriminant(self) == std::mem::discriminant(other)
589 }
590 (Self::Overwrite { .. }, Self::Delete { .. }) => {
591 std::mem::discriminant(self) == std::mem::discriminant(other)
592 }
593 (Self::Overwrite { .. }, Self::CreateIndex { .. }) => {
594 std::mem::discriminant(self) == std::mem::discriminant(other)
595 }
596 (Self::Overwrite { .. }, Self::Rewrite { .. }) => {
597 std::mem::discriminant(self) == std::mem::discriminant(other)
598 }
599 (Self::Overwrite { .. }, Self::Merge { .. }) => {
600 std::mem::discriminant(self) == std::mem::discriminant(other)
601 }
602 (Self::Overwrite { .. }, Self::Restore { .. }) => {
603 std::mem::discriminant(self) == std::mem::discriminant(other)
604 }
605 (Self::Overwrite { .. }, Self::ReserveFragments { .. }) => {
606 std::mem::discriminant(self) == std::mem::discriminant(other)
607 }
608 (Self::Overwrite { .. }, Self::Update { .. }) => {
609 std::mem::discriminant(self) == std::mem::discriminant(other)
610 }
611 (Self::Overwrite { .. }, Self::Project { .. }) => {
612 std::mem::discriminant(self) == std::mem::discriminant(other)
613 }
614 (Self::Overwrite { .. }, Self::UpdateConfig { .. }) => {
615 std::mem::discriminant(self) == std::mem::discriminant(other)
616 }
617 (Self::Overwrite { .. }, Self::DataReplacement { .. }) => {
618 std::mem::discriminant(self) == std::mem::discriminant(other)
619 }
620 (Self::Overwrite { .. }, Self::UpdateMemWalState { .. }) => {
621 std::mem::discriminant(self) == std::mem::discriminant(other)
622 }
623 (Self::Overwrite { .. }, Self::Clone { .. }) => {
624 std::mem::discriminant(self) == std::mem::discriminant(other)
625 }
626
627 (Self::CreateIndex { .. }, Self::Append { .. }) => {
628 std::mem::discriminant(self) == std::mem::discriminant(other)
629 }
630 (Self::CreateIndex { .. }, Self::Delete { .. }) => {
631 std::mem::discriminant(self) == std::mem::discriminant(other)
632 }
633 (Self::CreateIndex { .. }, Self::Overwrite { .. }) => {
634 std::mem::discriminant(self) == std::mem::discriminant(other)
635 }
636 (Self::CreateIndex { .. }, Self::Rewrite { .. }) => {
637 std::mem::discriminant(self) == std::mem::discriminant(other)
638 }
639 (Self::CreateIndex { .. }, Self::Merge { .. }) => {
640 std::mem::discriminant(self) == std::mem::discriminant(other)
641 }
642 (Self::CreateIndex { .. }, Self::Restore { .. }) => {
643 std::mem::discriminant(self) == std::mem::discriminant(other)
644 }
645 (Self::CreateIndex { .. }, Self::ReserveFragments { .. }) => {
646 std::mem::discriminant(self) == std::mem::discriminant(other)
647 }
648 (Self::CreateIndex { .. }, Self::Update { .. }) => {
649 std::mem::discriminant(self) == std::mem::discriminant(other)
650 }
651 (Self::CreateIndex { .. }, Self::Project { .. }) => {
652 std::mem::discriminant(self) == std::mem::discriminant(other)
653 }
654 (Self::CreateIndex { .. }, Self::UpdateConfig { .. }) => {
655 std::mem::discriminant(self) == std::mem::discriminant(other)
656 }
657 (Self::CreateIndex { .. }, Self::DataReplacement { .. }) => {
658 std::mem::discriminant(self) == std::mem::discriminant(other)
659 }
660 (Self::CreateIndex { .. }, Self::UpdateMemWalState { .. }) => {
661 std::mem::discriminant(self) == std::mem::discriminant(other)
662 }
663 (Self::CreateIndex { .. }, Self::Clone { .. }) => {
664 std::mem::discriminant(self) == std::mem::discriminant(other)
665 }
666
667 (Self::Rewrite { .. }, Self::Append { .. }) => {
668 std::mem::discriminant(self) == std::mem::discriminant(other)
669 }
670 (Self::Rewrite { .. }, Self::Delete { .. }) => {
671 std::mem::discriminant(self) == std::mem::discriminant(other)
672 }
673 (Self::Rewrite { .. }, Self::Overwrite { .. }) => {
674 std::mem::discriminant(self) == std::mem::discriminant(other)
675 }
676 (Self::Rewrite { .. }, Self::CreateIndex { .. }) => {
677 std::mem::discriminant(self) == std::mem::discriminant(other)
678 }
679 (Self::Rewrite { .. }, Self::Merge { .. }) => {
680 std::mem::discriminant(self) == std::mem::discriminant(other)
681 }
682 (Self::Rewrite { .. }, Self::Restore { .. }) => {
683 std::mem::discriminant(self) == std::mem::discriminant(other)
684 }
685 (Self::Rewrite { .. }, Self::ReserveFragments { .. }) => {
686 std::mem::discriminant(self) == std::mem::discriminant(other)
687 }
688 (Self::Rewrite { .. }, Self::Update { .. }) => {
689 std::mem::discriminant(self) == std::mem::discriminant(other)
690 }
691 (Self::Rewrite { .. }, Self::Project { .. }) => {
692 std::mem::discriminant(self) == std::mem::discriminant(other)
693 }
694 (Self::Rewrite { .. }, Self::UpdateConfig { .. }) => {
695 std::mem::discriminant(self) == std::mem::discriminant(other)
696 }
697 (Self::Rewrite { .. }, Self::DataReplacement { .. }) => {
698 std::mem::discriminant(self) == std::mem::discriminant(other)
699 }
700 (Self::Rewrite { .. }, Self::UpdateMemWalState { .. }) => {
701 std::mem::discriminant(self) == std::mem::discriminant(other)
702 }
703 (Self::Rewrite { .. }, Self::Clone { .. }) => {
704 std::mem::discriminant(self) == std::mem::discriminant(other)
705 }
706
707 (Self::Merge { .. }, Self::Append { .. }) => {
708 std::mem::discriminant(self) == std::mem::discriminant(other)
709 }
710 (Self::Merge { .. }, Self::Delete { .. }) => {
711 std::mem::discriminant(self) == std::mem::discriminant(other)
712 }
713 (Self::Merge { .. }, Self::Overwrite { .. }) => {
714 std::mem::discriminant(self) == std::mem::discriminant(other)
715 }
716 (Self::Merge { .. }, Self::CreateIndex { .. }) => {
717 std::mem::discriminant(self) == std::mem::discriminant(other)
718 }
719 (Self::Merge { .. }, Self::Rewrite { .. }) => {
720 std::mem::discriminant(self) == std::mem::discriminant(other)
721 }
722 (Self::Merge { .. }, Self::Restore { .. }) => {
723 std::mem::discriminant(self) == std::mem::discriminant(other)
724 }
725 (Self::Merge { .. }, Self::ReserveFragments { .. }) => {
726 std::mem::discriminant(self) == std::mem::discriminant(other)
727 }
728 (Self::Merge { .. }, Self::Update { .. }) => {
729 std::mem::discriminant(self) == std::mem::discriminant(other)
730 }
731 (Self::Merge { .. }, Self::Project { .. }) => {
732 std::mem::discriminant(self) == std::mem::discriminant(other)
733 }
734 (Self::Merge { .. }, Self::UpdateConfig { .. }) => {
735 std::mem::discriminant(self) == std::mem::discriminant(other)
736 }
737 (Self::Merge { .. }, Self::DataReplacement { .. }) => {
738 std::mem::discriminant(self) == std::mem::discriminant(other)
739 }
740 (Self::Merge { .. }, Self::UpdateMemWalState { .. }) => {
741 std::mem::discriminant(self) == std::mem::discriminant(other)
742 }
743 (Self::Merge { .. }, Self::Clone { .. }) => {
744 std::mem::discriminant(self) == std::mem::discriminant(other)
745 }
746
747 (Self::Restore { .. }, Self::Append { .. }) => {
748 std::mem::discriminant(self) == std::mem::discriminant(other)
749 }
750 (Self::Restore { .. }, Self::Delete { .. }) => {
751 std::mem::discriminant(self) == std::mem::discriminant(other)
752 }
753 (Self::Restore { .. }, Self::Overwrite { .. }) => {
754 std::mem::discriminant(self) == std::mem::discriminant(other)
755 }
756 (Self::Restore { .. }, Self::CreateIndex { .. }) => {
757 std::mem::discriminant(self) == std::mem::discriminant(other)
758 }
759 (Self::Restore { .. }, Self::Rewrite { .. }) => {
760 std::mem::discriminant(self) == std::mem::discriminant(other)
761 }
762 (Self::Restore { .. }, Self::Merge { .. }) => {
763 std::mem::discriminant(self) == std::mem::discriminant(other)
764 }
765 (Self::Restore { .. }, Self::ReserveFragments { .. }) => {
766 std::mem::discriminant(self) == std::mem::discriminant(other)
767 }
768 (Self::Restore { .. }, Self::Update { .. }) => {
769 std::mem::discriminant(self) == std::mem::discriminant(other)
770 }
771 (Self::Restore { .. }, Self::Project { .. }) => {
772 std::mem::discriminant(self) == std::mem::discriminant(other)
773 }
774 (Self::Restore { .. }, Self::UpdateConfig { .. }) => {
775 std::mem::discriminant(self) == std::mem::discriminant(other)
776 }
777 (Self::Restore { .. }, Self::DataReplacement { .. }) => {
778 std::mem::discriminant(self) == std::mem::discriminant(other)
779 }
780 (Self::Restore { .. }, Self::UpdateMemWalState { .. }) => {
781 std::mem::discriminant(self) == std::mem::discriminant(other)
782 }
783 (Self::Restore { .. }, Self::Clone { .. }) => {
784 std::mem::discriminant(self) == std::mem::discriminant(other)
785 }
786
787 (Self::ReserveFragments { .. }, Self::Append { .. }) => {
788 std::mem::discriminant(self) == std::mem::discriminant(other)
789 }
790 (Self::ReserveFragments { .. }, Self::Delete { .. }) => {
791 std::mem::discriminant(self) == std::mem::discriminant(other)
792 }
793 (Self::ReserveFragments { .. }, Self::Overwrite { .. }) => {
794 std::mem::discriminant(self) == std::mem::discriminant(other)
795 }
796 (Self::ReserveFragments { .. }, Self::CreateIndex { .. }) => {
797 std::mem::discriminant(self) == std::mem::discriminant(other)
798 }
799 (Self::ReserveFragments { .. }, Self::Rewrite { .. }) => {
800 std::mem::discriminant(self) == std::mem::discriminant(other)
801 }
802 (Self::ReserveFragments { .. }, Self::Merge { .. }) => {
803 std::mem::discriminant(self) == std::mem::discriminant(other)
804 }
805 (Self::ReserveFragments { .. }, Self::Restore { .. }) => {
806 std::mem::discriminant(self) == std::mem::discriminant(other)
807 }
808 (Self::ReserveFragments { .. }, Self::Update { .. }) => {
809 std::mem::discriminant(self) == std::mem::discriminant(other)
810 }
811 (Self::ReserveFragments { .. }, Self::Project { .. }) => {
812 std::mem::discriminant(self) == std::mem::discriminant(other)
813 }
814 (Self::ReserveFragments { .. }, Self::UpdateConfig { .. }) => {
815 std::mem::discriminant(self) == std::mem::discriminant(other)
816 }
817 (Self::ReserveFragments { .. }, Self::DataReplacement { .. }) => {
818 std::mem::discriminant(self) == std::mem::discriminant(other)
819 }
820 (Self::ReserveFragments { .. }, Self::UpdateMemWalState { .. }) => {
821 std::mem::discriminant(self) == std::mem::discriminant(other)
822 }
823 (Self::ReserveFragments { .. }, Self::Clone { .. }) => {
824 std::mem::discriminant(self) == std::mem::discriminant(other)
825 }
826
827 (Self::Update { .. }, Self::Append { .. }) => {
828 std::mem::discriminant(self) == std::mem::discriminant(other)
829 }
830 (Self::Update { .. }, Self::Delete { .. }) => {
831 std::mem::discriminant(self) == std::mem::discriminant(other)
832 }
833 (Self::Update { .. }, Self::Overwrite { .. }) => {
834 std::mem::discriminant(self) == std::mem::discriminant(other)
835 }
836 (Self::Update { .. }, Self::CreateIndex { .. }) => {
837 std::mem::discriminant(self) == std::mem::discriminant(other)
838 }
839 (Self::Update { .. }, Self::Rewrite { .. }) => {
840 std::mem::discriminant(self) == std::mem::discriminant(other)
841 }
842 (Self::Update { .. }, Self::Merge { .. }) => {
843 std::mem::discriminant(self) == std::mem::discriminant(other)
844 }
845 (Self::Update { .. }, Self::Restore { .. }) => {
846 std::mem::discriminant(self) == std::mem::discriminant(other)
847 }
848 (Self::Update { .. }, Self::ReserveFragments { .. }) => {
849 std::mem::discriminant(self) == std::mem::discriminant(other)
850 }
851 (Self::Update { .. }, Self::Project { .. }) => {
852 std::mem::discriminant(self) == std::mem::discriminant(other)
853 }
854 (Self::Update { .. }, Self::UpdateConfig { .. }) => {
855 std::mem::discriminant(self) == std::mem::discriminant(other)
856 }
857 (Self::Update { .. }, Self::DataReplacement { .. }) => {
858 std::mem::discriminant(self) == std::mem::discriminant(other)
859 }
860 (Self::Update { .. }, Self::UpdateMemWalState { .. }) => {
861 std::mem::discriminant(self) == std::mem::discriminant(other)
862 }
863 (Self::Update { .. }, Self::Clone { .. }) => {
864 std::mem::discriminant(self) == std::mem::discriminant(other)
865 }
866
867 (Self::Project { .. }, Self::Append { .. }) => {
868 std::mem::discriminant(self) == std::mem::discriminant(other)
869 }
870 (Self::Project { .. }, Self::Delete { .. }) => {
871 std::mem::discriminant(self) == std::mem::discriminant(other)
872 }
873 (Self::Project { .. }, Self::Overwrite { .. }) => {
874 std::mem::discriminant(self) == std::mem::discriminant(other)
875 }
876 (Self::Project { .. }, Self::CreateIndex { .. }) => {
877 std::mem::discriminant(self) == std::mem::discriminant(other)
878 }
879 (Self::Project { .. }, Self::Rewrite { .. }) => {
880 std::mem::discriminant(self) == std::mem::discriminant(other)
881 }
882 (Self::Project { .. }, Self::Merge { .. }) => {
883 std::mem::discriminant(self) == std::mem::discriminant(other)
884 }
885 (Self::Project { .. }, Self::Restore { .. }) => {
886 std::mem::discriminant(self) == std::mem::discriminant(other)
887 }
888 (Self::Project { .. }, Self::ReserveFragments { .. }) => {
889 std::mem::discriminant(self) == std::mem::discriminant(other)
890 }
891 (Self::Project { .. }, Self::Update { .. }) => {
892 std::mem::discriminant(self) == std::mem::discriminant(other)
893 }
894 (Self::Project { .. }, Self::UpdateConfig { .. }) => {
895 std::mem::discriminant(self) == std::mem::discriminant(other)
896 }
897 (Self::Project { .. }, Self::DataReplacement { .. }) => {
898 std::mem::discriminant(self) == std::mem::discriminant(other)
899 }
900 (Self::Project { .. }, Self::UpdateMemWalState { .. }) => {
901 std::mem::discriminant(self) == std::mem::discriminant(other)
902 }
903 (Self::Project { .. }, Self::Clone { .. }) => {
904 std::mem::discriminant(self) == std::mem::discriminant(other)
905 }
906
907 (Self::UpdateConfig { .. }, Self::Append { .. }) => {
908 std::mem::discriminant(self) == std::mem::discriminant(other)
909 }
910 (Self::UpdateConfig { .. }, Self::Delete { .. }) => {
911 std::mem::discriminant(self) == std::mem::discriminant(other)
912 }
913 (Self::UpdateConfig { .. }, Self::Overwrite { .. }) => {
914 std::mem::discriminant(self) == std::mem::discriminant(other)
915 }
916 (Self::UpdateConfig { .. }, Self::CreateIndex { .. }) => {
917 std::mem::discriminant(self) == std::mem::discriminant(other)
918 }
919 (Self::UpdateConfig { .. }, Self::Rewrite { .. }) => {
920 std::mem::discriminant(self) == std::mem::discriminant(other)
921 }
922 (Self::UpdateConfig { .. }, Self::Merge { .. }) => {
923 std::mem::discriminant(self) == std::mem::discriminant(other)
924 }
925 (Self::UpdateConfig { .. }, Self::Restore { .. }) => {
926 std::mem::discriminant(self) == std::mem::discriminant(other)
927 }
928 (Self::UpdateConfig { .. }, Self::ReserveFragments { .. }) => {
929 std::mem::discriminant(self) == std::mem::discriminant(other)
930 }
931 (Self::UpdateConfig { .. }, Self::Update { .. }) => {
932 std::mem::discriminant(self) == std::mem::discriminant(other)
933 }
934 (Self::UpdateConfig { .. }, Self::Project { .. }) => {
935 std::mem::discriminant(self) == std::mem::discriminant(other)
936 }
937 (Self::UpdateConfig { .. }, Self::DataReplacement { .. }) => {
938 std::mem::discriminant(self) == std::mem::discriminant(other)
939 }
940 (Self::UpdateConfig { .. }, Self::UpdateMemWalState { .. }) => {
941 std::mem::discriminant(self) == std::mem::discriminant(other)
942 }
943 (Self::UpdateConfig { .. }, Self::Clone { .. }) => {
944 std::mem::discriminant(self) == std::mem::discriminant(other)
945 }
946
947 (Self::DataReplacement { .. }, Self::Append { .. }) => {
948 std::mem::discriminant(self) == std::mem::discriminant(other)
949 }
950 (Self::DataReplacement { .. }, Self::Delete { .. }) => {
951 std::mem::discriminant(self) == std::mem::discriminant(other)
952 }
953 (Self::DataReplacement { .. }, Self::Overwrite { .. }) => {
954 std::mem::discriminant(self) == std::mem::discriminant(other)
955 }
956 (Self::DataReplacement { .. }, Self::CreateIndex { .. }) => {
957 std::mem::discriminant(self) == std::mem::discriminant(other)
958 }
959 (Self::DataReplacement { .. }, Self::Rewrite { .. }) => {
960 std::mem::discriminant(self) == std::mem::discriminant(other)
961 }
962 (Self::DataReplacement { .. }, Self::Merge { .. }) => {
963 std::mem::discriminant(self) == std::mem::discriminant(other)
964 }
965 (Self::DataReplacement { .. }, Self::Restore { .. }) => {
966 std::mem::discriminant(self) == std::mem::discriminant(other)
967 }
968 (Self::DataReplacement { .. }, Self::ReserveFragments { .. }) => {
969 std::mem::discriminant(self) == std::mem::discriminant(other)
970 }
971 (Self::DataReplacement { .. }, Self::Update { .. }) => {
972 std::mem::discriminant(self) == std::mem::discriminant(other)
973 }
974 (Self::DataReplacement { .. }, Self::Project { .. }) => {
975 std::mem::discriminant(self) == std::mem::discriminant(other)
976 }
977 (Self::DataReplacement { .. }, Self::UpdateConfig { .. }) => {
978 std::mem::discriminant(self) == std::mem::discriminant(other)
979 }
980 (Self::DataReplacement { .. }, Self::UpdateMemWalState { .. }) => {
981 std::mem::discriminant(self) == std::mem::discriminant(other)
982 }
983 (Self::DataReplacement { .. }, Self::Clone { .. }) => {
984 std::mem::discriminant(self) == std::mem::discriminant(other)
985 }
986
987 (Self::UpdateMemWalState { .. }, Self::Append { .. }) => {
988 std::mem::discriminant(self) == std::mem::discriminant(other)
989 }
990 (Self::UpdateMemWalState { .. }, Self::Delete { .. }) => {
991 std::mem::discriminant(self) == std::mem::discriminant(other)
992 }
993 (Self::UpdateMemWalState { .. }, Self::Overwrite { .. }) => {
994 std::mem::discriminant(self) == std::mem::discriminant(other)
995 }
996 (Self::UpdateMemWalState { .. }, Self::CreateIndex { .. }) => {
997 std::mem::discriminant(self) == std::mem::discriminant(other)
998 }
999 (Self::UpdateMemWalState { .. }, Self::Rewrite { .. }) => {
1000 std::mem::discriminant(self) == std::mem::discriminant(other)
1001 }
1002 (Self::UpdateMemWalState { .. }, Self::Merge { .. }) => {
1003 std::mem::discriminant(self) == std::mem::discriminant(other)
1004 }
1005 (Self::UpdateMemWalState { .. }, Self::Restore { .. }) => {
1006 std::mem::discriminant(self) == std::mem::discriminant(other)
1007 }
1008 (Self::UpdateMemWalState { .. }, Self::ReserveFragments { .. }) => {
1009 std::mem::discriminant(self) == std::mem::discriminant(other)
1010 }
1011 (Self::UpdateMemWalState { .. }, Self::Update { .. }) => {
1012 std::mem::discriminant(self) == std::mem::discriminant(other)
1013 }
1014 (Self::UpdateMemWalState { .. }, Self::Project { .. }) => {
1015 std::mem::discriminant(self) == std::mem::discriminant(other)
1016 }
1017 (Self::UpdateMemWalState { .. }, Self::UpdateConfig { .. }) => {
1018 std::mem::discriminant(self) == std::mem::discriminant(other)
1019 }
1020 (Self::UpdateMemWalState { .. }, Self::DataReplacement { .. }) => {
1021 std::mem::discriminant(self) == std::mem::discriminant(other)
1022 }
1023 (Self::UpdateMemWalState { .. }, Self::Clone { .. }) => {
1024 std::mem::discriminant(self) == std::mem::discriminant(other)
1025 }
1026 (
1027 Self::UpdateMemWalState {
1028 merged_generations: a_merged,
1029 },
1030 Self::UpdateMemWalState {
1031 merged_generations: b_merged,
1032 },
1033 ) => compare_vec(a_merged, b_merged),
1034 (Self::Clone { .. }, Self::Append { .. }) => {
1035 std::mem::discriminant(self) == std::mem::discriminant(other)
1036 }
1037 (Self::Clone { .. }, Self::Delete { .. }) => {
1038 std::mem::discriminant(self) == std::mem::discriminant(other)
1039 }
1040 (Self::Clone { .. }, Self::Overwrite { .. }) => {
1041 std::mem::discriminant(self) == std::mem::discriminant(other)
1042 }
1043 (Self::Clone { .. }, Self::CreateIndex { .. }) => {
1044 std::mem::discriminant(self) == std::mem::discriminant(other)
1045 }
1046 (Self::Clone { .. }, Self::Rewrite { .. }) => {
1047 std::mem::discriminant(self) == std::mem::discriminant(other)
1048 }
1049 (Self::Clone { .. }, Self::Merge { .. }) => {
1050 std::mem::discriminant(self) == std::mem::discriminant(other)
1051 }
1052 (Self::Clone { .. }, Self::Restore { .. }) => {
1053 std::mem::discriminant(self) == std::mem::discriminant(other)
1054 }
1055 (Self::Clone { .. }, Self::ReserveFragments { .. }) => {
1056 std::mem::discriminant(self) == std::mem::discriminant(other)
1057 }
1058 (Self::Clone { .. }, Self::Update { .. }) => {
1059 std::mem::discriminant(self) == std::mem::discriminant(other)
1060 }
1061 (Self::Clone { .. }, Self::Project { .. }) => {
1062 std::mem::discriminant(self) == std::mem::discriminant(other)
1063 }
1064 (Self::Clone { .. }, Self::UpdateConfig { .. }) => {
1065 std::mem::discriminant(self) == std::mem::discriminant(other)
1066 }
1067 (Self::Clone { .. }, Self::DataReplacement { .. }) => {
1068 std::mem::discriminant(self) == std::mem::discriminant(other)
1069 }
1070 (Self::Clone { .. }, Self::UpdateMemWalState { .. }) => {
1071 std::mem::discriminant(self) == std::mem::discriminant(other)
1072 }
1073
1074 (Self::UpdateBases { new_bases: a }, Self::UpdateBases { new_bases: b }) => {
1075 compare_vec(a, b)
1076 }
1077
1078 (Self::UpdateBases { .. }, Self::Append { .. }) => {
1079 std::mem::discriminant(self) == std::mem::discriminant(other)
1080 }
1081 (Self::UpdateBases { .. }, Self::Delete { .. }) => {
1082 std::mem::discriminant(self) == std::mem::discriminant(other)
1083 }
1084 (Self::UpdateBases { .. }, Self::Overwrite { .. }) => {
1085 std::mem::discriminant(self) == std::mem::discriminant(other)
1086 }
1087 (Self::UpdateBases { .. }, Self::CreateIndex { .. }) => {
1088 std::mem::discriminant(self) == std::mem::discriminant(other)
1089 }
1090 (Self::UpdateBases { .. }, Self::Rewrite { .. }) => {
1091 std::mem::discriminant(self) == std::mem::discriminant(other)
1092 }
1093 (Self::UpdateBases { .. }, Self::Merge { .. }) => {
1094 std::mem::discriminant(self) == std::mem::discriminant(other)
1095 }
1096 (Self::UpdateBases { .. }, Self::Restore { .. }) => {
1097 std::mem::discriminant(self) == std::mem::discriminant(other)
1098 }
1099 (Self::UpdateBases { .. }, Self::ReserveFragments { .. }) => {
1100 std::mem::discriminant(self) == std::mem::discriminant(other)
1101 }
1102 (Self::UpdateBases { .. }, Self::Update { .. }) => {
1103 std::mem::discriminant(self) == std::mem::discriminant(other)
1104 }
1105 (Self::UpdateBases { .. }, Self::Project { .. }) => {
1106 std::mem::discriminant(self) == std::mem::discriminant(other)
1107 }
1108 (Self::UpdateBases { .. }, Self::UpdateConfig { .. }) => {
1109 std::mem::discriminant(self) == std::mem::discriminant(other)
1110 }
1111 (Self::UpdateBases { .. }, Self::DataReplacement { .. }) => {
1112 std::mem::discriminant(self) == std::mem::discriminant(other)
1113 }
1114 (Self::UpdateBases { .. }, Self::UpdateMemWalState { .. }) => {
1115 std::mem::discriminant(self) == std::mem::discriminant(other)
1116 }
1117 (Self::UpdateBases { .. }, Self::Clone { .. }) => {
1118 std::mem::discriminant(self) == std::mem::discriminant(other)
1119 }
1120
1121 (Self::Append { .. }, Self::UpdateBases { .. }) => {
1122 std::mem::discriminant(self) == std::mem::discriminant(other)
1123 }
1124 (Self::Delete { .. }, Self::UpdateBases { .. }) => {
1125 std::mem::discriminant(self) == std::mem::discriminant(other)
1126 }
1127 (Self::Overwrite { .. }, Self::UpdateBases { .. }) => {
1128 std::mem::discriminant(self) == std::mem::discriminant(other)
1129 }
1130 (Self::CreateIndex { .. }, Self::UpdateBases { .. }) => {
1131 std::mem::discriminant(self) == std::mem::discriminant(other)
1132 }
1133 (Self::Rewrite { .. }, Self::UpdateBases { .. }) => {
1134 std::mem::discriminant(self) == std::mem::discriminant(other)
1135 }
1136 (Self::Merge { .. }, Self::UpdateBases { .. }) => {
1137 std::mem::discriminant(self) == std::mem::discriminant(other)
1138 }
1139 (Self::Restore { .. }, Self::UpdateBases { .. }) => {
1140 std::mem::discriminant(self) == std::mem::discriminant(other)
1141 }
1142 (Self::ReserveFragments { .. }, Self::UpdateBases { .. }) => {
1143 std::mem::discriminant(self) == std::mem::discriminant(other)
1144 }
1145 (Self::Update { .. }, Self::UpdateBases { .. }) => {
1146 std::mem::discriminant(self) == std::mem::discriminant(other)
1147 }
1148 (Self::Project { .. }, Self::UpdateBases { .. }) => {
1149 std::mem::discriminant(self) == std::mem::discriminant(other)
1150 }
1151 (Self::UpdateConfig { .. }, Self::UpdateBases { .. }) => {
1152 std::mem::discriminant(self) == std::mem::discriminant(other)
1153 }
1154 (Self::DataReplacement { .. }, Self::UpdateBases { .. }) => {
1155 std::mem::discriminant(self) == std::mem::discriminant(other)
1156 }
1157 (Self::UpdateMemWalState { .. }, Self::UpdateBases { .. }) => {
1158 std::mem::discriminant(self) == std::mem::discriminant(other)
1159 }
1160 (Self::Clone { .. }, Self::UpdateBases { .. }) => {
1161 std::mem::discriminant(self) == std::mem::discriminant(other)
1162 }
1163 }
1164 }
1165}
1166
1167#[derive(Debug, Clone, PartialEq)]
1168pub struct RewrittenIndex {
1169 pub old_id: Uuid,
1170 pub new_id: Uuid,
1171 pub new_index_details: prost_types::Any,
1172 pub new_index_version: u32,
1173}
1174
1175impl DeepSizeOf for RewrittenIndex {
1176 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
1177 self.new_index_details
1178 .type_url
1179 .deep_size_of_children(context)
1180 + self.new_index_details.value.deep_size_of_children(context)
1181 }
1182}
1183
1184#[derive(Debug, Clone, DeepSizeOf)]
1185pub struct RewriteGroup {
1186 pub old_fragments: Vec<Fragment>,
1187 pub new_fragments: Vec<Fragment>,
1188}
1189
1190impl PartialEq for RewriteGroup {
1191 fn eq(&self, other: &Self) -> bool {
1192 fn compare_vec<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1193 a.len() == b.len() && a.iter().all(|f| b.contains(f))
1194 }
1195 compare_vec(&self.old_fragments, &other.old_fragments)
1196 && compare_vec(&self.new_fragments, &other.new_fragments)
1197 }
1198}
1199
1200impl Operation {
1201 fn get_upsert_config_keys(&self) -> Vec<String> {
1203 match self {
1204 Self::Overwrite {
1205 config_upsert_values: Some(upsert_values),
1206 ..
1207 } => {
1208 let vec: Vec<String> = upsert_values.keys().cloned().collect();
1209 vec
1210 }
1211 Self::UpdateConfig {
1212 config_updates: Some(config_updates),
1213 ..
1214 } => config_updates
1215 .update_entries
1216 .iter()
1217 .filter_map(|entry| {
1218 if entry.value.is_some() {
1219 Some(entry.key.clone())
1220 } else {
1221 None
1222 }
1223 })
1224 .collect(),
1225 _ => Vec::<String>::new(),
1226 }
1227 }
1228
1229 fn get_delete_config_keys(&self) -> Vec<String> {
1231 match self {
1232 Self::UpdateConfig {
1233 config_updates: Some(config_updates),
1234 ..
1235 } => config_updates
1236 .update_entries
1237 .iter()
1238 .filter_map(|entry| {
1239 if entry.value.is_none() {
1240 Some(entry.key.clone())
1241 } else {
1242 None
1243 }
1244 })
1245 .collect(),
1246 _ => Vec::<String>::new(),
1247 }
1248 }
1249
1250 pub(crate) fn modifies_same_metadata(&self, other: &Self) -> bool {
1251 match (self, other) {
1252 (
1253 Self::UpdateConfig {
1254 schema_metadata_updates,
1255 field_metadata_updates,
1256 ..
1257 },
1258 Self::UpdateConfig {
1259 schema_metadata_updates: other_schema_metadata,
1260 field_metadata_updates: other_field_metadata,
1261 ..
1262 },
1263 ) => {
1264 if schema_metadata_updates.is_some() && other_schema_metadata.is_some() {
1265 return true;
1266 }
1267 if !field_metadata_updates.is_empty() && !other_field_metadata.is_empty() {
1268 for field in field_metadata_updates.keys() {
1269 if other_field_metadata.contains_key(field) {
1270 return true;
1271 }
1272 }
1273 }
1274 false
1275 }
1276 _ => false,
1277 }
1278 }
1279
1280 pub(crate) fn upsert_key_conflict(&self, other: &Self) -> bool {
1282 let self_upsert_keys = self.get_upsert_config_keys();
1283 let other_upsert_keys = other.get_upsert_config_keys();
1284
1285 let self_delete_keys = self.get_delete_config_keys();
1286 let other_delete_keys = other.get_delete_config_keys();
1287
1288 self_upsert_keys
1289 .iter()
1290 .any(|x| other_upsert_keys.contains(x) || other_delete_keys.contains(x))
1291 || other_upsert_keys
1292 .iter()
1293 .any(|x| self_upsert_keys.contains(x) || self_delete_keys.contains(x))
1294 }
1295
1296 pub fn name(&self) -> &str {
1297 match self {
1298 Self::Append { .. } => "Append",
1299 Self::Delete { .. } => "Delete",
1300 Self::Overwrite { .. } => "Overwrite",
1301 Self::CreateIndex { .. } => "CreateIndex",
1302 Self::Rewrite { .. } => "Rewrite",
1303 Self::Merge { .. } => "Merge",
1304 Self::ReserveFragments { .. } => "ReserveFragments",
1305 Self::Restore { .. } => "Restore",
1306 Self::Update { .. } => "Update",
1307 Self::Project { .. } => "Project",
1308 Self::UpdateConfig { .. } => "UpdateConfig",
1309 Self::DataReplacement { .. } => "DataReplacement",
1310 Self::UpdateMemWalState { .. } => "UpdateMemWalState",
1311 Self::Clone { .. } => "Clone",
1312 Self::UpdateBases { .. } => "UpdateBases",
1313 }
1314 }
1315}
1316
1317fn apply_update_map(
1319 target: &mut std::collections::HashMap<String, String>,
1320 update_map: &UpdateMap,
1321) {
1322 if update_map.replace {
1323 target.clear();
1325 for entry in &update_map.update_entries {
1326 if let Some(value) = &entry.value {
1327 target.insert(entry.key.clone(), value.clone());
1328 }
1329 }
1330 } else {
1331 for entry in &update_map.update_entries {
1333 if let Some(value) = &entry.value {
1334 target.insert(entry.key.clone(), value.clone());
1335 } else {
1336 target.remove(&entry.key);
1337 }
1338 }
1339 }
1340}
1341
1342pub fn translate_config_updates(
1344 upsert_values: &std::collections::HashMap<String, String>,
1345 delete_keys: &[String],
1346) -> UpdateMap {
1347 let mut update_entries = Vec::new();
1348
1349 for (key, value) in upsert_values {
1351 update_entries.push(UpdateMapEntry {
1352 key: key.clone(),
1353 value: Some(value.clone()),
1354 });
1355 }
1356
1357 for key in delete_keys {
1359 update_entries.push(UpdateMapEntry {
1360 key: key.clone(),
1361 value: None,
1362 });
1363 }
1364
1365 UpdateMap {
1366 update_entries,
1367 replace: false, }
1369}
1370
1371pub fn translate_schema_metadata_updates(
1373 schema_metadata: &std::collections::HashMap<String, String>,
1374) -> UpdateMap {
1375 let update_entries = schema_metadata
1376 .iter()
1377 .map(|(key, value)| UpdateMapEntry {
1378 key: key.clone(),
1379 value: Some(value.clone()),
1380 })
1381 .collect();
1382
1383 UpdateMap {
1384 update_entries,
1385 replace: true, }
1387}
1388
1389impl From<&UpdateMap> for pb::transaction::UpdateMap {
1390 fn from(update_map: &UpdateMap) -> Self {
1391 Self {
1392 update_entries: update_map
1393 .update_entries
1394 .iter()
1395 .map(|entry| pb::transaction::UpdateMapEntry {
1396 key: entry.key.clone(),
1397 value: entry.value.clone(),
1398 })
1399 .collect(),
1400 replace: update_map.replace,
1401 }
1402 }
1403}
1404
1405impl From<&pb::transaction::UpdateMap> for UpdateMap {
1406 fn from(pb_update_map: &pb::transaction::UpdateMap) -> Self {
1407 Self {
1408 update_entries: pb_update_map
1409 .update_entries
1410 .iter()
1411 .map(|entry| UpdateMapEntry {
1412 key: entry.key.clone(),
1413 value: entry.value.clone(),
1414 })
1415 .collect(),
1416 replace: pb_update_map.replace,
1417 }
1418 }
1419}
1420
1421pub struct TransactionBuilder {
1423 read_version: u64,
1424 uuid: Option<String>,
1426 operation: Operation,
1427 tag: Option<String>,
1428 transaction_properties: Option<Arc<HashMap<String, String>>>,
1429}
1430
1431impl TransactionBuilder {
1432 pub fn new(read_version: u64, operation: Operation) -> Self {
1433 Self {
1434 read_version,
1435 uuid: None,
1436 operation,
1437 tag: None,
1438 transaction_properties: None,
1439 }
1440 }
1441
1442 pub fn uuid(mut self, uuid: String) -> Self {
1443 self.uuid = Some(uuid);
1444 self
1445 }
1446
1447 pub fn tag(mut self, tag: Option<String>) -> Self {
1448 self.tag = tag;
1449 self
1450 }
1451
1452 pub fn transaction_properties(
1453 mut self,
1454 transaction_properties: Option<Arc<HashMap<String, String>>>,
1455 ) -> Self {
1456 self.transaction_properties = transaction_properties;
1457 self
1458 }
1459
1460 pub fn build(self) -> Transaction {
1461 let uuid = self
1462 .uuid
1463 .unwrap_or_else(|| Uuid::new_v4().hyphenated().to_string());
1464 Transaction {
1465 read_version: self.read_version,
1466 uuid,
1467 operation: self.operation,
1468 tag: self.tag,
1469 transaction_properties: self.transaction_properties,
1470 }
1471 }
1472}
1473
1474impl Transaction {
1475 pub fn new_from_version(read_version: u64, operation: Operation) -> Self {
1476 TransactionBuilder::new(read_version, operation).build()
1477 }
1478
1479 pub fn new(read_version: u64, operation: Operation, tag: Option<String>) -> Self {
1480 TransactionBuilder::new(read_version, operation)
1481 .tag(tag)
1482 .build()
1483 }
1484
1485 fn fragments_with_ids<'a, T>(
1486 new_fragments: T,
1487 fragment_id: &'a mut u64,
1488 ) -> impl Iterator<Item = Fragment> + 'a
1489 where
1490 T: IntoIterator<Item = Fragment> + 'a,
1491 {
1492 new_fragments.into_iter().map(move |mut f| {
1493 if f.id == 0 {
1494 f.id = *fragment_id;
1495 *fragment_id += 1;
1496 }
1497 f
1498 })
1499 }
1500
1501 fn data_storage_format_from_files(
1502 fragments: &[Fragment],
1503 user_requested: Option<LanceFileVersion>,
1504 ) -> Result<DataStorageFormat> {
1505 if let Some(file_version) = Fragment::try_infer_version(fragments)? {
1506 if let Some(user_requested) = user_requested
1508 && user_requested != file_version
1509 {
1510 return Err(Error::invalid_input(format!(
1511 "User requested data storage version ({}) does not match version in data files ({})",
1512 user_requested, file_version
1513 )));
1514 }
1515 Ok(DataStorageFormat::new(file_version))
1516 } else {
1517 Ok(user_requested
1519 .map(DataStorageFormat::new)
1520 .unwrap_or_default())
1521 }
1522 }
1523
1524 pub(crate) async fn restore_old_manifest(
1525 object_store: &ObjectStore,
1526 commit_handler: &dyn CommitHandler,
1527 base_path: &Path,
1528 version: u64,
1529 config: &ManifestWriteConfig,
1530 tx_path: &str,
1531 current_manifest: &Manifest,
1532 ) -> Result<(Manifest, Vec<IndexMetadata>)> {
1533 let location = commit_handler
1534 .resolve_version_location(base_path, version, &object_store.inner)
1535 .await?;
1536 let mut manifest = read_manifest(object_store, &location.path, location.size).await?;
1537 manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
1538 manifest.transaction_file = Some(tx_path.to_string());
1539 let indices = read_manifest_indexes(object_store, &location, &manifest).await?;
1540 manifest.max_fragment_id = manifest
1541 .max_fragment_id
1542 .max(current_manifest.max_fragment_id);
1543 Ok((manifest, indices))
1544 }
1545
1546 pub(crate) fn build_manifest(
1550 &self,
1551 current_manifest: Option<&Manifest>,
1552 current_indices: Vec<IndexMetadata>,
1553 transaction_file_path: &str,
1554 config: &ManifestWriteConfig,
1555 ) -> Result<(Manifest, Vec<IndexMetadata>)> {
1556 if config.use_stable_row_ids
1557 && current_manifest
1558 .map(|m| !m.uses_stable_row_ids())
1559 .unwrap_or_default()
1560 {
1561 return Err(Error::not_supported_source(
1562 "Cannot enable stable row ids on existing dataset".into(),
1563 ));
1564 }
1565 let mut reference_paths = match current_manifest {
1566 Some(m) => m.base_paths.clone(),
1567 None => HashMap::new(),
1568 };
1569
1570 if let Operation::Overwrite {
1571 initial_bases: Some(initial_bases),
1572 ..
1573 } = &self.operation
1574 {
1575 if current_manifest.is_none() {
1576 for base_path in initial_bases.iter() {
1580 if reference_paths.contains_key(&base_path.id) {
1581 return Err(Error::invalid_input(format!(
1582 "Duplicate base path ID {} detected. Base path IDs must be unique.",
1583 base_path.id
1584 )));
1585 }
1586 reference_paths.insert(base_path.id, base_path.clone());
1587 }
1588 } else {
1589 return Err(Error::invalid_input(
1592 "OVERWRITE mode cannot register new bases. This should have been caught by validation.",
1593 ));
1594 }
1595 }
1596
1597 let schema = match self.operation {
1599 Operation::Overwrite { ref schema, .. } => schema.clone(),
1600 Operation::Merge { ref schema, .. } => schema.clone(),
1601 Operation::Project { ref schema, .. } => schema.clone(),
1602 _ => {
1603 if let Some(current_manifest) = current_manifest {
1604 current_manifest.schema.clone()
1605 } else {
1606 return Err(Error::internal(
1607 "Cannot create a new dataset without a schema".to_string(),
1608 ));
1609 }
1610 }
1611 };
1612
1613 let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) {
1614 0
1615 } else {
1616 current_manifest
1617 .and_then(|m| m.max_fragment_id())
1618 .map(|id| id + 1)
1619 .unwrap_or(0)
1620 };
1621 let mut final_fragments = Vec::new();
1622 let mut final_indices = current_indices;
1623
1624 let mut next_row_id = {
1625 match (current_manifest, config.use_stable_row_ids) {
1627 (Some(manifest), _) if manifest.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0 => {
1628 Some(manifest.next_row_id)
1629 }
1630 (None, true) => Some(0),
1631 (_, false) => None,
1632 (Some(_), true) => {
1633 return Err(Error::not_supported_source(
1634 "Cannot enable stable row ids on existing dataset".into(),
1635 ));
1636 }
1637 }
1638 };
1639
1640 let maybe_existing_fragments =
1641 current_manifest
1642 .map(|m| m.fragments.as_ref())
1643 .ok_or_else(|| {
1644 Error::internal(format!(
1645 "No current manifest was provided while building manifest for operation {}",
1646 self.operation.name()
1647 ))
1648 });
1649
1650 match &self.operation {
1651 Operation::Clone { .. } => {
1652 return Err(Error::internal(
1653 "Clone operation should not enter build_manifest.".to_string(),
1654 ));
1655 }
1656 Operation::Append { fragments } => {
1657 final_fragments.extend(maybe_existing_fragments?.clone());
1658 let mut new_fragments =
1659 Self::fragments_with_ids(fragments.clone(), &mut fragment_id)
1660 .collect::<Vec<_>>();
1661 if let Some(next_row_id) = &mut next_row_id {
1662 Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1663 let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1665 for fragment in new_fragments.iter_mut() {
1666 let version_meta =
1667 lance_table::rowids::version::build_version_meta(fragment, new_version);
1668 fragment.last_updated_at_version_meta = version_meta.clone();
1669 fragment.created_at_version_meta = version_meta;
1670 }
1671 }
1672 final_fragments.extend(new_fragments);
1673 }
1674 Operation::Delete {
1675 updated_fragments,
1676 deleted_fragment_ids,
1677 ..
1678 } => {
1679 final_fragments.extend(maybe_existing_fragments?.clone());
1681 final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
1682 final_fragments.iter_mut().for_each(|f| {
1683 for updated in updated_fragments {
1684 if updated.id == f.id {
1685 *f = updated.clone();
1686 }
1687 }
1688 });
1689 Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
1690 }
1691 Operation::Update {
1692 removed_fragment_ids,
1693 updated_fragments,
1694 new_fragments,
1695 fields_modified,
1696 merged_generations,
1697 fields_for_preserving_frag_bitmap,
1698 update_mode,
1699 ..
1700 } => {
1701 let existing_fragments = maybe_existing_fragments?;
1703
1704 let updated_frags: Vec<Fragment> = existing_fragments
1706 .iter()
1707 .filter_map(|f| {
1708 if removed_fragment_ids.contains(&f.id) {
1709 return None;
1710 }
1711 if let Some(updated) = updated_fragments.iter().find(|uf| uf.id == f.id) {
1712 Some(updated.clone())
1713 } else {
1714 Some(f.clone())
1715 }
1716 })
1717 .collect();
1718
1719 if next_row_id.is_some() {
1724 }
1726
1727 final_fragments.extend(updated_frags);
1728
1729 Self::prune_updated_fields_from_indices(
1731 &mut final_indices,
1732 updated_fragments,
1733 fields_modified,
1734 );
1735
1736 let mut new_fragments =
1737 Self::fragments_with_ids(new_fragments.clone(), &mut fragment_id)
1738 .collect::<Vec<_>>();
1739
1740 if let Some(next_row_id) = &mut next_row_id {
1743 Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1744 }
1745
1746 if next_row_id.is_some() {
1749 let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1750
1751 let original_frags_map: std::collections::HashMap<u64, &Fragment> =
1753 existing_fragments.iter().map(|f| (f.id, f)).collect();
1754
1755 for fragment in new_fragments.iter_mut() {
1756 let row_ids = if let Some(row_id_meta) = &fragment.row_id_meta {
1763 match row_id_meta {
1764 lance_table::format::RowIdMeta::Inline(data) => {
1765 lance_table::rowids::read_row_ids(data).ok()
1766 }
1767 lance_table::format::RowIdMeta::External(_) => None,
1768 }
1769 } else {
1770 None
1771 };
1772
1773 if let Some(row_ids) = row_ids {
1774 let physical_rows = fragment.physical_rows.unwrap_or(0);
1776 let mut created_at_versions = Vec::with_capacity(physical_rows);
1777
1778 for row_id in row_ids.iter() {
1779 let orig_frag_id = row_id >> 32;
1781 let row_offset = (row_id & 0xFFFFFFFF) as usize;
1782
1783 if let Some(orig_frag) = original_frags_map.get(&orig_frag_id) {
1785 let created_version = if let Some(created_meta) =
1787 &orig_frag.created_at_version_meta
1788 {
1789 match created_meta.load_sequence() {
1791 Ok(seq) => {
1792 let versions: Vec<u64> = seq.versions().collect();
1793 versions.get(row_offset).copied().unwrap_or(1)
1794 }
1795 Err(_e) => {
1796 1 }
1798 }
1799 } else {
1800 1
1802 };
1803 created_at_versions.push(created_version);
1804 } else {
1805 created_at_versions.push(1);
1807 }
1808 }
1809
1810 let mut runs = Vec::new();
1813 if !created_at_versions.is_empty() {
1814 let mut current_version = created_at_versions[0];
1815 let mut run_start = 0u64;
1816
1817 for (i, &version) in created_at_versions.iter().enumerate().skip(1)
1818 {
1819 if version != current_version {
1820 runs.push(lance_table::format::RowDatasetVersionRun {
1822 span: lance_table::rowids::segment::U64Segment::Range(
1823 run_start..i as u64,
1824 ),
1825 version: current_version,
1826 });
1827 current_version = version;
1828 run_start = i as u64;
1829 }
1830 }
1831 runs.push(lance_table::format::RowDatasetVersionRun {
1833 span: lance_table::rowids::segment::U64Segment::Range(
1834 run_start..created_at_versions.len() as u64,
1835 ),
1836 version: current_version,
1837 });
1838 }
1839
1840 let created_at_seq =
1841 lance_table::format::RowDatasetVersionSequence { runs };
1842 fragment.created_at_version_meta = Some(
1843 lance_table::format::RowDatasetVersionMeta::from_sequence(
1844 &created_at_seq,
1845 )
1846 .map_err(|e| {
1847 Error::internal(format!(
1848 "Failed to create created_at version metadata: {}",
1849 e
1850 ))
1851 })?,
1852 );
1853
1854 let last_updated_meta =
1856 lance_table::rowids::version::build_version_meta(
1857 fragment,
1858 new_version,
1859 );
1860 fragment.last_updated_at_version_meta = last_updated_meta;
1861 } else {
1862 let version_meta = lance_table::rowids::version::build_version_meta(
1864 fragment,
1865 new_version,
1866 );
1867 fragment.last_updated_at_version_meta = version_meta.clone();
1868 fragment.created_at_version_meta = version_meta;
1869 }
1870 }
1871 }
1872
1873 if config.use_stable_row_ids
1874 && update_mode.is_some()
1875 && *update_mode == Some(RewriteRows)
1876 {
1877 let pure_updated_frag_ids =
1878 Self::collect_pure_rewrite_row_update_frags_ids(&new_fragments)?;
1879
1880 let original_fragment_ids: Vec<u64> = removed_fragment_ids
1882 .iter()
1883 .chain(updated_fragments.iter().map(|f| &f.id))
1884 .copied()
1885 .collect();
1886
1887 Self::register_pure_rewrite_rows_update_frags_in_indices(
1888 &mut final_indices,
1889 &pure_updated_frag_ids,
1890 &original_fragment_ids,
1891 fields_for_preserving_frag_bitmap,
1892 );
1893 }
1894
1895 if let Some(next_row_id) = &mut next_row_id {
1896 Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1897 }
1901 let mut target_ids: HashSet<u64> = HashSet::new();
1903 target_ids.extend(new_fragments.iter().map(|f| f.id));
1904 final_fragments.extend(new_fragments);
1905 Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments);
1906
1907 if !merged_generations.is_empty() {
1908 update_mem_wal_index_merged_generations(
1909 &mut final_indices,
1910 current_manifest.map_or(1, |m| m.version + 1),
1911 merged_generations.clone(),
1912 )?;
1913 }
1914 }
1915 Operation::Overwrite { fragments, .. } => {
1916 let mut new_fragments =
1917 Self::fragments_with_ids(fragments.clone(), &mut fragment_id)
1918 .collect::<Vec<_>>();
1919 if let Some(next_row_id) = &mut next_row_id {
1920 Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1921 let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1923 for fragment in new_fragments.iter_mut() {
1924 let version_meta =
1925 lance_table::rowids::version::build_version_meta(fragment, new_version);
1926 fragment.last_updated_at_version_meta = version_meta.clone();
1927 fragment.created_at_version_meta = version_meta;
1928 }
1929 }
1930 final_fragments.extend(new_fragments);
1931 final_indices = Vec::new();
1932 }
1933 Operation::Rewrite {
1934 groups,
1935 rewritten_indices,
1936 frag_reuse_index,
1937 } => {
1938 final_fragments.extend(maybe_existing_fragments?.clone());
1939 let current_version = current_manifest.map(|m| m.version).unwrap_or_default();
1940 Self::handle_rewrite_fragments(
1941 &mut final_fragments,
1942 groups,
1943 &mut fragment_id,
1944 current_version,
1945 next_row_id.as_ref(),
1946 )?;
1947
1948 if next_row_id.is_some() {
1949 debug_assert!(rewritten_indices.is_empty());
1951 for index in final_indices.iter_mut() {
1952 if let Some(fragment_bitmap) = &mut index.fragment_bitmap {
1953 *fragment_bitmap =
1954 Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?;
1955 }
1956 }
1957 } else {
1958 Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?;
1959 }
1960
1961 if let Some(frag_reuse_index) = frag_reuse_index {
1962 final_indices.retain(|idx| idx.name != frag_reuse_index.name);
1963 final_indices.push(frag_reuse_index.clone());
1964 }
1965 }
1966 Operation::CreateIndex {
1967 new_indices,
1968 removed_indices,
1969 } => {
1970 final_fragments.extend(maybe_existing_fragments?.clone());
1971 final_indices.retain(|existing_index| {
1972 !new_indices
1973 .iter()
1974 .any(|new_index| new_index.name == existing_index.name)
1975 && !removed_indices
1976 .iter()
1977 .any(|old_index| old_index.uuid == existing_index.uuid)
1978 });
1979 final_indices.extend(new_indices.clone());
1980 }
1981 Operation::ReserveFragments { .. } | Operation::UpdateConfig { .. } => {
1982 final_fragments.extend(maybe_existing_fragments?.clone());
1983 }
1984 Operation::Merge { fragments, .. } => {
1985 final_fragments.extend(fragments.clone());
1986
1987 Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
1990 }
1991 Operation::Project { .. } => {
1992 final_fragments.extend(maybe_existing_fragments?.clone());
1993
1994 let remaining_field_ids = schema
1997 .fields_pre_order()
1998 .map(|f| f.id)
1999 .collect::<HashSet<_>>();
2000 for fragment in final_fragments.iter_mut() {
2001 fragment.files.retain(|file| {
2002 file.fields
2003 .iter()
2004 .any(|field_id| remaining_field_ids.contains(field_id))
2005 });
2006 }
2007
2008 Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
2011 }
2012 Operation::Restore { .. } => {
2013 unreachable!()
2014 }
2015 Operation::DataReplacement { replacements } => {
2016 log::warn!(
2017 "Building manifest with DataReplacement operation. This operation is not stable yet, please use with caution."
2018 );
2019
2020 let (old_fragment_ids, new_datafiles): (Vec<&u64>, Vec<&DataFile>) = replacements
2021 .iter()
2022 .map(|DataReplacementGroup(fragment_id, new_file)| (fragment_id, new_file))
2023 .unzip();
2024
2025 if new_datafiles
2029 .iter()
2030 .map(|f| f.fields.clone())
2031 .collect::<HashSet<_>>()
2032 .len()
2033 > 1
2034 {
2035 let field_info = new_datafiles
2036 .iter()
2037 .enumerate()
2038 .map(|(id, f)| (id, f.fields.clone()))
2039 .fold("".to_string(), |acc, (id, fields)| {
2040 format!("{}File {}: {:?}\n", acc, id, fields)
2041 });
2042
2043 return Err(Error::invalid_input(format!(
2044 "All new data files must have the same fields, but found different fields:\n{field_info}"
2045 )));
2046 }
2047
2048 let existing_fragments = maybe_existing_fragments?;
2049
2050 let replaced_fields: Vec<u32> = new_datafiles
2052 .first()
2053 .map(|f| {
2054 f.fields
2055 .iter()
2056 .filter(|&&id| id >= 0)
2057 .map(|&id| id as u32)
2058 .collect()
2059 })
2060 .unwrap_or_default();
2061
2062 for (frag_id, new_file) in old_fragment_ids.iter().zip(new_datafiles) {
2065 let frag = existing_fragments
2066 .iter()
2067 .find(|f| f.id == **frag_id)
2068 .ok_or_else(|| {
2069 Error::invalid_input(
2070 "Fragment being replaced not found in existing fragments",
2071 )
2072 })?;
2073 let mut new_frag = frag.clone();
2074
2075 let mut columns_covered = HashSet::new();
2078 for file in &mut new_frag.files {
2079 if file.fields == new_file.fields
2080 && file.file_major_version == new_file.file_major_version
2081 && file.file_minor_version == new_file.file_minor_version
2082 {
2083 file.path = new_file.path.clone();
2085 file.file_size_bytes = new_file.file_size_bytes.clone();
2086 }
2087 columns_covered.extend(file.fields.iter());
2088 }
2089 if columns_covered.is_disjoint(&new_file.fields.iter().collect()) {
2093 new_frag.add_file(
2094 new_file.path.clone(),
2095 new_file.fields.clone(),
2096 new_file.column_indices.clone(),
2097 &LanceFileVersion::try_from_major_minor(
2098 new_file.file_major_version,
2099 new_file.file_minor_version,
2100 )
2101 .expect("Expected valid file version"),
2102 new_file.file_size_bytes.get(),
2103 );
2104 }
2105
2106 if &new_frag == frag {
2108 return Err(Error::invalid_input(
2109 "Expected to modify the fragment but no changes were made. This means the new data files does not align with any exiting datafiles. Please check if the schema of the new data files matches the schema of the old data files including the file major and minor versions",
2110 ));
2111 }
2112 final_fragments.push(new_frag);
2113 }
2114
2115 let fragments_changed = old_fragment_ids
2116 .iter()
2117 .cloned()
2118 .cloned()
2119 .collect::<HashSet<_>>();
2120
2121 let unmodified_fragments = existing_fragments
2123 .iter()
2124 .filter(|f| !fragments_changed.contains(&f.id))
2125 .cloned()
2126 .collect::<Vec<_>>();
2127
2128 final_fragments.extend(unmodified_fragments);
2129
2130 let modified_fragments: Vec<Fragment> = final_fragments
2132 .iter()
2133 .filter(|f| fragments_changed.contains(&f.id))
2134 .cloned()
2135 .collect();
2136
2137 Self::prune_updated_fields_from_indices(
2138 &mut final_indices,
2139 &modified_fragments,
2140 &replaced_fields,
2141 );
2142 }
2143 Operation::UpdateMemWalState { merged_generations } => {
2144 update_mem_wal_index_merged_generations(
2145 &mut final_indices,
2146 current_manifest.map_or(1, |m| m.version + 1),
2147 merged_generations.clone(),
2148 )?;
2149 }
2150 Operation::UpdateBases { .. } => {
2151 final_fragments.extend(maybe_existing_fragments?.clone());
2154 }
2155 };
2156
2157 final_fragments.sort_by_key(|frag| frag.id);
2159
2160 Self::remove_tombstoned_data_files(&mut final_fragments);
2162
2163 let user_requested_version = match (&config.storage_format, config.use_legacy_format) {
2164 (Some(storage_format), _) => Some(storage_format.lance_file_version()?),
2165 (None, Some(true)) => Some(LanceFileVersion::Legacy),
2166 (None, Some(false)) => Some(LanceFileVersion::V2_0),
2167 (None, None) => None,
2168 };
2169
2170 let mut manifest = if let Some(current_manifest) = current_manifest {
2171 let mut prev_manifest =
2174 Manifest::new_from_previous(current_manifest, schema, Arc::new(final_fragments));
2175
2176 if let (Some(user_requested_version), Operation::Overwrite { .. }) =
2177 (user_requested_version, &self.operation)
2178 {
2179 prev_manifest.data_storage_format = DataStorageFormat::new(user_requested_version);
2183 }
2184
2185 prev_manifest
2186 } else {
2187 let data_storage_format =
2188 Self::data_storage_format_from_files(&final_fragments, user_requested_version)?;
2189 Manifest::new(
2190 schema,
2191 Arc::new(final_fragments),
2192 data_storage_format,
2193 reference_paths,
2194 )
2195 };
2196
2197 manifest.tag.clone_from(&self.tag);
2198
2199 if config.auto_set_feature_flags {
2200 apply_feature_flags(
2201 &mut manifest,
2202 config.use_stable_row_ids,
2203 config.disable_transaction_file,
2204 )?;
2205 }
2206 manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
2207
2208 manifest.update_max_fragment_id();
2209
2210 match &self.operation {
2211 Operation::Overwrite {
2212 config_upsert_values: Some(tm),
2213 ..
2214 } => {
2215 manifest.config_mut().extend(tm.clone());
2216 }
2217 Operation::UpdateConfig {
2218 config_updates,
2219 table_metadata_updates,
2220 schema_metadata_updates,
2221 field_metadata_updates,
2222 } => {
2223 if let Some(config_updates) = config_updates {
2224 let mut config = manifest.config.clone();
2225 apply_update_map(&mut config, config_updates);
2226 manifest.config = config;
2227 }
2228 if let Some(table_metadata_updates) = table_metadata_updates {
2229 let mut table_metadata = manifest.table_metadata.clone();
2230 apply_update_map(&mut table_metadata, table_metadata_updates);
2231 manifest.table_metadata = table_metadata;
2232 }
2233 if let Some(schema_metadata_updates) = schema_metadata_updates {
2234 let mut schema_metadata = manifest.schema.metadata.clone();
2235 apply_update_map(&mut schema_metadata, schema_metadata_updates);
2236 manifest.schema.metadata = schema_metadata;
2237 }
2238 for (field_id, field_metadata_update) in field_metadata_updates {
2239 if let Some(field) = manifest.schema.field_by_id_mut(*field_id) {
2240 apply_update_map(&mut field.metadata, field_metadata_update);
2241 } else {
2242 return Err(Error::invalid_input_source(
2243 format!("Field with id {} does not exist", field_id).into(),
2244 ));
2245 }
2246 }
2247 }
2248 _ => {}
2249 }
2250
2251 if let Operation::UpdateBases { new_bases } = &self.operation {
2253 for new_base in new_bases {
2255 if let Some(existing_base) = manifest
2257 .base_paths
2258 .values()
2259 .find(|bp| bp.name == new_base.name || bp.path == new_base.path)
2260 {
2261 return Err(Error::invalid_input(format!(
2262 "Conflict detected: Base path with name '{:?}' or path '{}' already exists. Existing: name='{:?}', path='{}'",
2263 new_base.name, new_base.path, existing_base.name, existing_base.path
2264 )));
2265 }
2266
2267 let mut base_to_add = new_base.clone();
2269 if base_to_add.id == 0 {
2270 let next_id = manifest
2271 .base_paths
2272 .keys()
2273 .max()
2274 .map(|&id| id + 1)
2275 .unwrap_or(1);
2276 base_to_add.id = next_id;
2277 }
2278
2279 manifest.base_paths.insert(base_to_add.id, base_to_add);
2280 }
2281 }
2282
2283 if let Operation::ReserveFragments { num_fragments } = self.operation {
2284 manifest.max_fragment_id = Some(manifest.max_fragment_id.unwrap_or(0) + num_fragments);
2285 }
2286
2287 manifest.transaction_file = Some(transaction_file_path.to_string());
2288
2289 if let Some(next_row_id) = next_row_id {
2290 manifest.next_row_id = next_row_id;
2291 }
2292
2293 Ok((manifest, final_indices))
2294 }
2295
2296 fn register_pure_rewrite_rows_update_frags_in_indices(
2297 indices: &mut [IndexMetadata],
2298 pure_update_frag_ids: &[u64],
2299 original_fragment_ids: &[u64],
2300 fields_for_preserving_frag_bitmap: &[u32],
2301 ) {
2302 if pure_update_frag_ids.is_empty() {
2303 return;
2304 }
2305
2306 let value_updated_field_set = fields_for_preserving_frag_bitmap
2307 .iter()
2308 .collect::<HashSet<_>>();
2309
2310 for index in indices.iter_mut() {
2311 let index_covers_modified_field = index.fields.iter().any(|field_id| {
2312 value_updated_field_set.contains(&u32::try_from(*field_id).unwrap())
2313 });
2314
2315 if !index_covers_modified_field
2316 && let Some(fragment_bitmap) = &mut index.fragment_bitmap
2317 {
2318 let index_covers_all_original_fragments = original_fragment_ids
2323 .iter()
2324 .all(|&fragment_id| fragment_bitmap.contains(fragment_id as u32));
2325
2326 if index_covers_all_original_fragments {
2327 for fragment_id in pure_update_frag_ids.iter().map(|f| *f as u32) {
2328 fragment_bitmap.insert(fragment_id);
2329 }
2330 }
2331 }
2332 }
2333 }
2334
2335 fn prune_updated_fields_from_indices(
2338 indices: &mut [IndexMetadata],
2339 updated_fragments: &[Fragment],
2340 fields_modified: &[u32],
2341 ) {
2342 if fields_modified.is_empty() {
2343 return;
2344 }
2345
2346 let fields_modified_set = fields_modified.iter().collect::<HashSet<_>>();
2349 for index in indices.iter_mut() {
2350 if index
2351 .fields
2352 .iter()
2353 .any(|field_id| fields_modified_set.contains(&u32::try_from(*field_id).unwrap()))
2354 && let Some(fragment_bitmap) = &mut index.fragment_bitmap
2355 {
2356 for fragment_id in updated_fragments.iter().map(|f| f.id as u32) {
2357 fragment_bitmap.remove(fragment_id);
2358 }
2359 }
2360 }
2361 }
2362
2363 fn is_vector_index(index: &IndexMetadata) -> bool {
2364 if let Some(details) = &index.index_details {
2365 details.type_url.ends_with("VectorIndexDetails")
2366 } else {
2367 false
2368 }
2369 }
2370
2371 fn remove_tombstoned_data_files(fragments: &mut [Fragment]) {
2374 for fragment in fragments {
2375 fragment.files.retain(|file| {
2376 file.fields.iter().any(|&field_id| field_id != -2)
2378 });
2379 }
2380 }
2381
2382 fn retain_relevant_indices(
2383 indices: &mut Vec<IndexMetadata>,
2384 schema: &Schema,
2385 fragments: &[Fragment],
2386 ) {
2387 let field_ids = schema
2388 .fields_pre_order()
2389 .map(|f| f.id)
2390 .collect::<HashSet<_>>();
2391
2392 indices.retain(|existing_index| {
2394 existing_index
2395 .fields
2396 .iter()
2397 .all(|field_id| field_ids.contains(field_id))
2398 || is_system_index(existing_index)
2399 });
2400
2401 let mut indices_by_name: std::collections::HashMap<String, Vec<&IndexMetadata>> =
2408 std::collections::HashMap::new();
2409
2410 for index in indices.iter() {
2412 if index.name != FRAG_REUSE_INDEX_NAME {
2413 indices_by_name
2414 .entry(index.name.clone())
2415 .or_default()
2416 .push(index);
2417 }
2418 }
2419
2420 let mut uuids_to_keep = std::collections::HashSet::new();
2422
2423 let existing_fragments = fragments
2424 .iter()
2425 .map(|f| f.id as u32)
2426 .collect::<RoaringBitmap>();
2427
2428 for (_, same_name_indices) in indices_by_name {
2430 if same_name_indices.len() > 1 {
2431 let (empty_indices, non_empty_indices): (Vec<_>, Vec<_>) =
2433 same_name_indices.iter().partition(|index| {
2434 index
2435 .effective_fragment_bitmap(&existing_fragments)
2436 .as_ref()
2437 .is_none_or(|bitmap| bitmap.is_empty())
2438 });
2439
2440 if non_empty_indices.is_empty() {
2441 let mut sorted_indices = empty_indices;
2444 sorted_indices.sort_by_key(|index: &&IndexMetadata| index.dataset_version); if let Some(oldest) = sorted_indices.first()
2448 && !Self::is_vector_index(oldest)
2449 {
2450 uuids_to_keep.insert(oldest.uuid);
2451 }
2452 } else {
2453 for index in non_empty_indices {
2455 uuids_to_keep.insert(index.uuid);
2456 }
2457 }
2458 } else {
2459 if let Some(index) = same_name_indices.first() {
2461 let is_empty = index
2462 .effective_fragment_bitmap(&existing_fragments)
2463 .as_ref()
2464 .is_none_or(|bitmap| bitmap.is_empty());
2465 let is_vector = Self::is_vector_index(index);
2466
2467 if !is_empty || !is_vector {
2469 uuids_to_keep.insert(index.uuid);
2470 }
2471 }
2472 }
2473 }
2474
2475 indices.retain(|index| {
2477 index.name == FRAG_REUSE_INDEX_NAME || uuids_to_keep.contains(&index.uuid)
2478 });
2479 }
2480
2481 fn recalculate_fragment_bitmap(
2482 old: &RoaringBitmap,
2483 groups: &[RewriteGroup],
2484 ) -> Result<RoaringBitmap> {
2485 let mut new_bitmap = old.clone();
2486 for group in groups {
2487 let any_in_index = group
2488 .old_fragments
2489 .iter()
2490 .any(|frag| old.contains(frag.id as u32));
2491 let all_in_index = group
2492 .old_fragments
2493 .iter()
2494 .all(|frag| old.contains(frag.id as u32));
2495 if any_in_index {
2500 if all_in_index {
2501 for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) {
2502 new_bitmap.remove(frag_id);
2503 }
2504 new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32));
2505 } else {
2506 return Err(Error::invalid_input(
2507 "The compaction plan included a rewrite group that was a split of indexed and non-indexed data",
2508 ));
2509 }
2510 }
2511 }
2512 Ok(new_bitmap)
2513 }
2514
2515 fn handle_rewrite_indices(
2516 indices: &mut [IndexMetadata],
2517 rewritten_indices: &[RewrittenIndex],
2518 groups: &[RewriteGroup],
2519 ) -> Result<()> {
2520 let mut modified_indices = HashSet::new();
2521
2522 for rewritten_index in rewritten_indices {
2523 if !modified_indices.insert(rewritten_index.old_id) {
2524 return Err(Error::invalid_input(format!(
2525 "An invalid compaction plan must have been generated because multiple tasks modified the same index: {}",
2526 rewritten_index.old_id
2527 )));
2528 }
2529
2530 let Some(index) = indices
2532 .iter_mut()
2533 .find(|idx| idx.uuid == rewritten_index.old_id)
2534 else {
2535 continue;
2536 };
2537
2538 index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap(
2539 index.fragment_bitmap.as_ref().ok_or_else(|| {
2540 Error::invalid_input(format!(
2541 "Cannot rewrite index {} which did not store fragment bitmap",
2542 index.uuid
2543 ))
2544 })?,
2545 groups,
2546 )?);
2547 index.uuid = rewritten_index.new_id;
2548 }
2549 Ok(())
2550 }
2551
2552 fn handle_rewrite_fragments(
2553 final_fragments: &mut Vec<Fragment>,
2554 groups: &[RewriteGroup],
2555 fragment_id: &mut u64,
2556 version: u64,
2557 _next_row_id: Option<&u64>,
2558 ) -> Result<()> {
2559 for group in groups {
2560 let replace_range = {
2562 let start = final_fragments
2563 .iter()
2564 .enumerate()
2565 .find(|(_, f)| f.id == group.old_fragments[0].id)
2566 .ok_or_else(|| {
2567 Error::commit_conflict_source(
2568 version,
2569 format!(
2570 "dataset does not contain a fragment a rewrite operation wants to replace: id={}",
2571 group.old_fragments[0].id
2572 )
2573 .into(),
2574 )
2575 })?
2576 .0;
2577
2578 let mut i = 1;
2580 loop {
2581 if i == group.old_fragments.len() {
2582 break Some(start..start + i);
2583 }
2584 if final_fragments[start + i].id != group.old_fragments[i].id {
2585 break None;
2586 }
2587 i += 1;
2588 }
2589 };
2590
2591 let new_fragments = Self::fragments_with_ids(group.new_fragments.clone(), fragment_id)
2592 .collect::<Vec<_>>();
2593
2594 if let Some(replace_range) = replace_range {
2599 final_fragments.splice(replace_range, new_fragments);
2601 } else {
2602 for fragment in group.old_fragments.iter() {
2604 final_fragments.retain(|f| f.id != fragment.id);
2605 }
2606 final_fragments.extend(new_fragments);
2607 }
2608 }
2609 Ok(())
2610 }
2611
2612 fn collect_pure_rewrite_row_update_frags_ids(fragments: &[Fragment]) -> Result<Vec<u64>> {
2614 let mut pure_update_frag_ids = Vec::new();
2615
2616 for fragment in fragments {
2617 let physical_rows = fragment
2618 .physical_rows
2619 .ok_or_else(|| Error::internal("Fragment does not have physical rows"))?
2620 as u64;
2621
2622 if let Some(row_id_meta) = &fragment.row_id_meta {
2623 let existing_row_count = match row_id_meta {
2624 RowIdMeta::Inline(data) => {
2625 let sequence = read_row_ids(data)?;
2626 sequence.len() as u64
2627 }
2628 _ => 0,
2629 };
2630
2631 if existing_row_count == physical_rows {
2634 pure_update_frag_ids.push(fragment.id);
2635 }
2636 }
2637 }
2638
2639 Ok(pure_update_frag_ids)
2640 }
2641
2642 fn assign_row_ids(next_row_id: &mut u64, fragments: &mut [Fragment]) -> Result<()> {
2643 for fragment in fragments {
2644 let physical_rows = fragment
2645 .physical_rows
2646 .ok_or_else(|| Error::internal("Fragment does not have physical rows"))?
2647 as u64;
2648
2649 if fragment.row_id_meta.is_some() {
2650 let existing_row_count = match &fragment.row_id_meta {
2657 Some(RowIdMeta::Inline(data)) => {
2658 let sequence = read_row_ids(data)?;
2660 sequence.len() as u64
2661 }
2662 _ => 0,
2663 };
2664
2665 match existing_row_count.cmp(&physical_rows) {
2666 Ordering::Equal => {
2667 continue;
2669 }
2670 Ordering::Less => {
2671 let remaining_rows = physical_rows - existing_row_count;
2673 let new_row_ids = *next_row_id..(*next_row_id + remaining_rows);
2674
2675 let combined_sequence = match &fragment.row_id_meta {
2677 Some(RowIdMeta::Inline(data)) => read_row_ids(data)?,
2678 _ => {
2679 return Err(Error::internal(
2680 "Failed to deserialize existing row ID sequence",
2681 ));
2682 }
2683 };
2684
2685 let mut row_ids: Vec<u64> = combined_sequence.iter().collect();
2686 for row_id in new_row_ids {
2687 row_ids.push(row_id);
2688 }
2689 let combined_sequence = RowIdSequence::from(row_ids.as_slice());
2690
2691 let serialized = write_row_ids(&combined_sequence);
2692 fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
2693 *next_row_id += remaining_rows;
2694 }
2695 Ordering::Greater => {
2696 return Err(Error::internal(format!(
2698 "Fragment has more row IDs ({}) than physical rows ({})",
2699 existing_row_count, physical_rows
2700 )));
2701 }
2702 }
2703 } else {
2704 let row_ids = *next_row_id..(*next_row_id + physical_rows);
2705 let sequence = RowIdSequence::from(row_ids);
2706 let serialized = write_row_ids(&sequence);
2708 fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
2709 *next_row_id += physical_rows;
2710 }
2711 }
2712 Ok(())
2713 }
2714}
2715
2716impl From<&DataReplacementGroup> for pb::transaction::DataReplacementGroup {
2717 fn from(DataReplacementGroup(fragment_id, new_file): &DataReplacementGroup) -> Self {
2718 Self {
2719 fragment_id: *fragment_id,
2720 new_file: Some(new_file.into()),
2721 }
2722 }
2723}
2724
2725impl TryFrom<pb::transaction::DataReplacementGroup> for DataReplacementGroup {
2728 type Error = Error;
2729
2730 fn try_from(message: pb::transaction::DataReplacementGroup) -> Result<Self> {
2731 Ok(Self(
2732 message.fragment_id,
2733 message
2734 .new_file
2735 .ok_or(Error::invalid_input(
2736 "DataReplacementGroup must have a new_file",
2737 ))?
2738 .try_into()?,
2739 ))
2740 }
2741}
2742
2743impl TryFrom<pb::Transaction> for Transaction {
2744 type Error = Error;
2745
2746 fn try_from(message: pb::Transaction) -> Result<Self> {
2747 let operation = match message.operation {
2748 Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => {
2749 Operation::Append {
2750 fragments: fragments
2751 .into_iter()
2752 .map(Fragment::try_from)
2753 .collect::<Result<Vec<_>>>()?,
2754 }
2755 }
2756 Some(pb::transaction::Operation::Clone(pb::transaction::Clone {
2757 is_shallow,
2758 ref_name,
2759 ref_version,
2760 ref_path,
2761 branch_name,
2762 })) => Operation::Clone {
2763 is_shallow,
2764 ref_name,
2765 ref_version,
2766 ref_path,
2767 branch_name,
2768 },
2769 Some(pb::transaction::Operation::Delete(pb::transaction::Delete {
2770 updated_fragments,
2771 deleted_fragment_ids,
2772 predicate,
2773 })) => Operation::Delete {
2774 updated_fragments: updated_fragments
2775 .into_iter()
2776 .map(Fragment::try_from)
2777 .collect::<Result<Vec<_>>>()?,
2778 deleted_fragment_ids,
2779 predicate,
2780 },
2781 Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
2782 fragments,
2783 schema,
2784 schema_metadata: _schema_metadata, config_upsert_values,
2786 initial_bases,
2787 })) => {
2788 let config_upsert_option = if config_upsert_values.is_empty() {
2789 None
2790 } else {
2791 Some(config_upsert_values)
2792 };
2793
2794 Operation::Overwrite {
2795 fragments: fragments
2796 .into_iter()
2797 .map(Fragment::try_from)
2798 .collect::<Result<Vec<_>>>()?,
2799 schema: Schema::from(&Fields(schema)),
2800 config_upsert_values: config_upsert_option,
2801 initial_bases: if initial_bases.is_empty() {
2802 None
2803 } else {
2804 Some(initial_bases.into_iter().map(BasePath::from).collect())
2805 },
2806 }
2807 }
2808 Some(pb::transaction::Operation::ReserveFragments(
2809 pb::transaction::ReserveFragments { num_fragments },
2810 )) => Operation::ReserveFragments { num_fragments },
2811 Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
2812 old_fragments,
2813 new_fragments,
2814 groups,
2815 rewritten_indices,
2816 })) => {
2817 let groups = if !groups.is_empty() {
2818 groups
2819 .into_iter()
2820 .map(RewriteGroup::try_from)
2821 .collect::<Result<_>>()?
2822 } else {
2823 vec![RewriteGroup {
2824 old_fragments: old_fragments
2825 .into_iter()
2826 .map(Fragment::try_from)
2827 .collect::<Result<Vec<_>>>()?,
2828 new_fragments: new_fragments
2829 .into_iter()
2830 .map(Fragment::try_from)
2831 .collect::<Result<Vec<_>>>()?,
2832 }]
2833 };
2834 let rewritten_indices = rewritten_indices
2835 .iter()
2836 .map(RewrittenIndex::try_from)
2837 .collect::<Result<_>>()?;
2838
2839 Operation::Rewrite {
2840 groups,
2841 rewritten_indices,
2842 frag_reuse_index: None,
2843 }
2844 }
2845 Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
2846 new_indices,
2847 removed_indices,
2848 })) => Operation::CreateIndex {
2849 new_indices: new_indices
2850 .into_iter()
2851 .map(IndexMetadata::try_from)
2852 .collect::<Result<_>>()?,
2853 removed_indices: removed_indices
2854 .into_iter()
2855 .map(IndexMetadata::try_from)
2856 .collect::<Result<_>>()?,
2857 },
2858 Some(pb::transaction::Operation::Merge(pb::transaction::Merge {
2859 fragments,
2860 schema,
2861 schema_metadata: _schema_metadata, })) => Operation::Merge {
2863 fragments: fragments
2864 .into_iter()
2865 .map(Fragment::try_from)
2866 .collect::<Result<Vec<_>>>()?,
2867 schema: Schema::from(&Fields(schema)),
2868 },
2869 Some(pb::transaction::Operation::Restore(pb::transaction::Restore { version })) => {
2870 Operation::Restore { version }
2871 }
2872 Some(pb::transaction::Operation::Update(pb::transaction::Update {
2873 removed_fragment_ids,
2874 updated_fragments,
2875 new_fragments,
2876 fields_modified,
2877 merged_generations,
2878 fields_for_preserving_frag_bitmap,
2879 update_mode,
2880 inserted_rows,
2881 })) => Operation::Update {
2882 removed_fragment_ids,
2883 updated_fragments: updated_fragments
2884 .into_iter()
2885 .map(Fragment::try_from)
2886 .collect::<Result<Vec<_>>>()?,
2887 new_fragments: new_fragments
2888 .into_iter()
2889 .map(Fragment::try_from)
2890 .collect::<Result<Vec<_>>>()?,
2891 fields_modified,
2892 merged_generations: merged_generations
2893 .into_iter()
2894 .map(|m| MergedGeneration::try_from(m).unwrap())
2895 .collect(),
2896 fields_for_preserving_frag_bitmap,
2897 update_mode: match update_mode {
2898 0 => Some(UpdateMode::RewriteRows),
2899 1 => Some(UpdateMode::RewriteColumns),
2900 _ => Some(UpdateMode::RewriteRows),
2901 },
2902 inserted_rows_filter: inserted_rows
2903 .map(|ik| KeyExistenceFilter::try_from(&ik))
2904 .transpose()?,
2905 },
2906 Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => {
2907 Operation::Project {
2908 schema: Schema::from(&Fields(schema)),
2909 }
2910 }
2911 Some(pb::transaction::Operation::UpdateConfig(update_config)) => {
2912 let has_new_fields = update_config.config_updates.is_some()
2914 || update_config.table_metadata_updates.is_some()
2915 || update_config.schema_metadata_updates.is_some()
2916 || !update_config.field_metadata_updates.is_empty();
2917
2918 let has_old_fields = !update_config.upsert_values.is_empty()
2920 || !update_config.delete_keys.is_empty()
2921 || !update_config.schema_metadata.is_empty()
2922 || !update_config.field_metadata.is_empty();
2923
2924 if has_new_fields && has_old_fields {
2926 return Err(Error::invalid_input_source(
2927 "Cannot mix old and new style UpdateConfig fields".into(),
2928 ));
2929 }
2930
2931 if has_old_fields {
2932 let config_updates = if !update_config.upsert_values.is_empty()
2934 || !update_config.delete_keys.is_empty()
2935 {
2936 Some(translate_config_updates(
2937 &update_config.upsert_values,
2938 &update_config.delete_keys,
2939 ))
2940 } else {
2941 None
2942 };
2943
2944 let schema_metadata_updates = if !update_config.schema_metadata.is_empty() {
2945 Some(translate_schema_metadata_updates(
2946 &update_config.schema_metadata,
2947 ))
2948 } else {
2949 None
2950 };
2951
2952 let field_metadata_updates = update_config
2953 .field_metadata
2954 .into_iter()
2955 .map(|(field_id, field_meta_update)| {
2956 (
2957 field_id as i32,
2958 translate_schema_metadata_updates(&field_meta_update.metadata),
2959 )
2960 })
2961 .collect();
2962
2963 Operation::UpdateConfig {
2964 config_updates,
2965 table_metadata_updates: None,
2966 schema_metadata_updates,
2967 field_metadata_updates,
2968 }
2969 } else {
2970 Operation::UpdateConfig {
2972 config_updates: update_config.config_updates.as_ref().map(UpdateMap::from),
2973 table_metadata_updates: update_config
2974 .table_metadata_updates
2975 .as_ref()
2976 .map(UpdateMap::from),
2977 schema_metadata_updates: update_config
2978 .schema_metadata_updates
2979 .as_ref()
2980 .map(UpdateMap::from),
2981 field_metadata_updates: update_config
2982 .field_metadata_updates
2983 .iter()
2984 .map(|(field_id, pb_update_map)| {
2985 (*field_id, UpdateMap::from(pb_update_map))
2986 })
2987 .collect(),
2988 }
2989 }
2990 }
2991 Some(pb::transaction::Operation::DataReplacement(
2992 pb::transaction::DataReplacement { replacements },
2993 )) => Operation::DataReplacement {
2994 replacements: replacements
2995 .into_iter()
2996 .map(DataReplacementGroup::try_from)
2997 .collect::<Result<Vec<_>>>()?,
2998 },
2999 Some(pb::transaction::Operation::UpdateMemWalState(
3000 pb::transaction::UpdateMemWalState { merged_generations },
3001 )) => Operation::UpdateMemWalState {
3002 merged_generations: merged_generations
3003 .into_iter()
3004 .map(|m| MergedGeneration::try_from(m).unwrap())
3005 .collect(),
3006 },
3007 Some(pb::transaction::Operation::UpdateBases(pb::transaction::UpdateBases {
3008 new_bases,
3009 })) => Operation::UpdateBases {
3010 new_bases: new_bases.into_iter().map(BasePath::from).collect(),
3011 },
3012 None => {
3013 return Err(Error::internal(
3014 "Transaction message did not contain an operation".to_string(),
3015 ));
3016 }
3017 };
3018 Ok(Self {
3019 read_version: message.read_version,
3020 uuid: message.uuid.clone(),
3021 operation,
3022 tag: if message.tag.is_empty() {
3023 None
3024 } else {
3025 Some(message.tag.clone())
3026 },
3027 transaction_properties: if message.transaction_properties.is_empty() {
3028 None
3029 } else {
3030 Some(Arc::new(message.transaction_properties))
3031 },
3032 })
3033 }
3034}
3035
3036impl TryFrom<&pb::transaction::rewrite::RewrittenIndex> for RewrittenIndex {
3037 type Error = Error;
3038
3039 fn try_from(message: &pb::transaction::rewrite::RewrittenIndex) -> Result<Self> {
3040 Ok(Self {
3041 old_id: message
3042 .old_id
3043 .as_ref()
3044 .map(Uuid::try_from)
3045 .ok_or_else(|| {
3046 Error::invalid_input("required field (old_id) missing from message".to_string())
3047 })??,
3048 new_id: message
3049 .new_id
3050 .as_ref()
3051 .map(Uuid::try_from)
3052 .ok_or_else(|| {
3053 Error::invalid_input("required field (new_id) missing from message".to_string())
3054 })??,
3055 new_index_details: message
3056 .new_index_details
3057 .as_ref()
3058 .ok_or_else(|| {
3059 Error::invalid_input("new_index_details is a required field".to_string())
3060 })?
3061 .clone(),
3062 new_index_version: message.new_index_version,
3063 })
3064 }
3065}
3066
3067impl TryFrom<pb::transaction::rewrite::RewriteGroup> for RewriteGroup {
3068 type Error = Error;
3069
3070 fn try_from(message: pb::transaction::rewrite::RewriteGroup) -> Result<Self> {
3071 Ok(Self {
3072 old_fragments: message
3073 .old_fragments
3074 .into_iter()
3075 .map(Fragment::try_from)
3076 .collect::<Result<Vec<_>>>()?,
3077 new_fragments: message
3078 .new_fragments
3079 .into_iter()
3080 .map(Fragment::try_from)
3081 .collect::<Result<Vec<_>>>()?,
3082 })
3083 }
3084}
3085
3086impl From<&Transaction> for pb::Transaction {
3087 fn from(value: &Transaction) -> Self {
3088 let operation = match &value.operation {
3089 Operation::Append { fragments } => {
3090 pb::transaction::Operation::Append(pb::transaction::Append {
3091 fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3092 })
3093 }
3094 Operation::Clone {
3095 is_shallow,
3096 ref_name,
3097 ref_version,
3098 ref_path,
3099 branch_name,
3100 } => pb::transaction::Operation::Clone(pb::transaction::Clone {
3101 is_shallow: *is_shallow,
3102 ref_name: ref_name.clone(),
3103 ref_version: *ref_version,
3104 ref_path: ref_path.clone(),
3105 branch_name: branch_name.clone(),
3106 }),
3107 Operation::Delete {
3108 updated_fragments,
3109 deleted_fragment_ids,
3110 predicate,
3111 } => pb::transaction::Operation::Delete(pb::transaction::Delete {
3112 updated_fragments: updated_fragments
3113 .iter()
3114 .map(pb::DataFragment::from)
3115 .collect(),
3116 deleted_fragment_ids: deleted_fragment_ids.clone(),
3117 predicate: predicate.clone(),
3118 }),
3119 Operation::Overwrite {
3120 fragments,
3121 schema,
3122 config_upsert_values,
3123 initial_bases,
3124 } => {
3125 pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
3126 fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3127 schema: Fields::from(schema).0,
3128 schema_metadata: Default::default(), config_upsert_values: config_upsert_values
3130 .clone()
3131 .unwrap_or(Default::default()),
3132 initial_bases: initial_bases
3133 .as_ref()
3134 .map(|paths| {
3135 paths
3136 .iter()
3137 .cloned()
3138 .map(|bp: BasePath| -> pb::BasePath { bp.into() })
3139 .collect::<Vec<pb::BasePath>>()
3140 })
3141 .unwrap_or_default(),
3142 })
3143 }
3144 Operation::ReserveFragments { num_fragments } => {
3145 pb::transaction::Operation::ReserveFragments(pb::transaction::ReserveFragments {
3146 num_fragments: *num_fragments,
3147 })
3148 }
3149 Operation::Rewrite {
3150 groups,
3151 rewritten_indices,
3152 frag_reuse_index: _,
3153 } => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
3154 groups: groups
3155 .iter()
3156 .map(pb::transaction::rewrite::RewriteGroup::from)
3157 .collect(),
3158 rewritten_indices: rewritten_indices
3159 .iter()
3160 .map(|rewritten| rewritten.into())
3161 .collect(),
3162 ..Default::default()
3163 }),
3164 Operation::CreateIndex {
3165 new_indices,
3166 removed_indices,
3167 } => pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
3168 new_indices: new_indices.iter().map(pb::IndexMetadata::from).collect(),
3169 removed_indices: removed_indices
3170 .iter()
3171 .map(pb::IndexMetadata::from)
3172 .collect(),
3173 }),
3174 Operation::Merge { fragments, schema } => {
3175 pb::transaction::Operation::Merge(pb::transaction::Merge {
3176 fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3177 schema: Fields::from(schema).0,
3178 schema_metadata: Default::default(), })
3180 }
3181 Operation::Restore { version } => {
3182 pb::transaction::Operation::Restore(pb::transaction::Restore { version: *version })
3183 }
3184 Operation::Update {
3185 removed_fragment_ids,
3186 updated_fragments,
3187 new_fragments,
3188 fields_modified,
3189 merged_generations,
3190 fields_for_preserving_frag_bitmap,
3191 update_mode,
3192 inserted_rows_filter,
3193 } => pb::transaction::Operation::Update(pb::transaction::Update {
3194 removed_fragment_ids: removed_fragment_ids.clone(),
3195 updated_fragments: updated_fragments
3196 .iter()
3197 .map(pb::DataFragment::from)
3198 .collect(),
3199 new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(),
3200 fields_modified: fields_modified.clone(),
3201 merged_generations: merged_generations
3202 .iter()
3203 .map(pb::MergedGeneration::from)
3204 .collect(),
3205 fields_for_preserving_frag_bitmap: fields_for_preserving_frag_bitmap.clone(),
3206 update_mode: update_mode
3207 .as_ref()
3208 .map(|mode| match mode {
3209 UpdateMode::RewriteRows => 0,
3210 UpdateMode::RewriteColumns => 1,
3211 })
3212 .unwrap_or(0),
3213 inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()),
3214 }),
3215 Operation::Project { schema } => {
3216 pb::transaction::Operation::Project(pb::transaction::Project {
3217 schema: Fields::from(schema).0,
3218 })
3219 }
3220 Operation::UpdateConfig {
3221 config_updates,
3222 table_metadata_updates,
3223 schema_metadata_updates,
3224 field_metadata_updates,
3225 } => pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig {
3226 config_updates: config_updates
3227 .as_ref()
3228 .map(pb::transaction::UpdateMap::from),
3229 table_metadata_updates: table_metadata_updates
3230 .as_ref()
3231 .map(pb::transaction::UpdateMap::from),
3232 schema_metadata_updates: schema_metadata_updates
3233 .as_ref()
3234 .map(pb::transaction::UpdateMap::from),
3235 field_metadata_updates: field_metadata_updates
3236 .iter()
3237 .map(|(field_id, update_map)| {
3238 (*field_id, pb::transaction::UpdateMap::from(update_map))
3239 })
3240 .collect(),
3241 upsert_values: Default::default(),
3243 delete_keys: Default::default(),
3244 schema_metadata: Default::default(),
3245 field_metadata: Default::default(),
3246 }),
3247 Operation::DataReplacement { replacements } => {
3248 pb::transaction::Operation::DataReplacement(pb::transaction::DataReplacement {
3249 replacements: replacements
3250 .iter()
3251 .map(pb::transaction::DataReplacementGroup::from)
3252 .collect(),
3253 })
3254 }
3255 Operation::UpdateMemWalState { merged_generations } => {
3256 pb::transaction::Operation::UpdateMemWalState(pb::transaction::UpdateMemWalState {
3257 merged_generations: merged_generations
3258 .iter()
3259 .map(pb::MergedGeneration::from)
3260 .collect::<Vec<_>>(),
3261 })
3262 }
3263 Operation::UpdateBases { new_bases } => {
3264 pb::transaction::Operation::UpdateBases(pb::transaction::UpdateBases {
3265 new_bases: new_bases
3266 .iter()
3267 .cloned()
3268 .map(|bp: BasePath| -> pb::BasePath { bp.into() })
3269 .collect::<Vec<pb::BasePath>>(),
3270 })
3271 }
3272 };
3273
3274 let transaction_properties = value
3275 .transaction_properties
3276 .as_ref()
3277 .map(|arc| arc.as_ref().clone())
3278 .unwrap_or_default();
3279 Self {
3280 read_version: value.read_version,
3281 uuid: value.uuid.clone(),
3282 operation: Some(operation),
3283 tag: value.tag.clone().unwrap_or("".to_string()),
3284 transaction_properties,
3285 }
3286 }
3287}
3288
3289impl From<&RewrittenIndex> for pb::transaction::rewrite::RewrittenIndex {
3290 fn from(value: &RewrittenIndex) -> Self {
3291 Self {
3292 old_id: Some((&value.old_id).into()),
3293 new_id: Some((&value.new_id).into()),
3294 new_index_details: Some(value.new_index_details.clone()),
3295 new_index_version: value.new_index_version,
3296 }
3297 }
3298}
3299
3300impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup {
3301 fn from(value: &RewriteGroup) -> Self {
3302 Self {
3303 old_fragments: value
3304 .old_fragments
3305 .iter()
3306 .map(pb::DataFragment::from)
3307 .collect(),
3308 new_fragments: value
3309 .new_fragments
3310 .iter()
3311 .map(pb::DataFragment::from)
3312 .collect(),
3313 }
3314 }
3315}
3316
3317pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Result<()> {
3319 let manifest = match (manifest, operation) {
3320 (
3321 None,
3322 Operation::Overwrite {
3323 fragments, schema, ..
3324 },
3325 ) => {
3326 schema_fragments_valid(None, schema, fragments)?;
3328
3329 return Ok(());
3330 }
3331 (None, Operation::Clone { .. }) => return Ok(()),
3332 (Some(manifest), _) => manifest,
3333 (None, _) => {
3334 return Err(Error::invalid_input(format!(
3335 "Cannot apply operation {} to non-existent dataset",
3336 operation.name()
3337 )));
3338 }
3339 };
3340
3341 match operation {
3342 Operation::Append { fragments } => {
3343 schema_fragments_valid(Some(manifest), &manifest.schema, fragments)
3345 }
3346 Operation::Project { schema } => {
3347 schema_fragments_valid(Some(manifest), schema, manifest.fragments.as_ref())
3348 }
3349 Operation::Merge { fragments, schema } => {
3350 merge_fragments_valid(manifest, fragments)?;
3351 schema_fragments_valid(Some(manifest), schema, fragments)
3352 }
3353 Operation::Overwrite {
3354 fragments,
3355 schema,
3356 config_upsert_values: None,
3357 initial_bases: _,
3358 } => schema_fragments_valid(Some(manifest), schema, fragments),
3359 Operation::Update {
3360 updated_fragments,
3361 new_fragments,
3362 ..
3363 } => {
3364 schema_fragments_valid(Some(manifest), &manifest.schema, updated_fragments)?;
3365 schema_fragments_valid(Some(manifest), &manifest.schema, new_fragments)
3366 }
3367 _ => Ok(()),
3368 }
3369}
3370
3371fn schema_fragments_valid(
3372 manifest: Option<&Manifest>,
3373 schema: &Schema,
3374 fragments: &[Fragment],
3375) -> Result<()> {
3376 if let Some(manifest) = manifest
3377 && manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy
3378 {
3379 return schema_fragments_legacy_valid(schema, fragments);
3380 }
3381 for fragment in fragments {
3383 for data_file in &fragment.files {
3384 if data_file.fields.iter().len() == 0 {
3385 return Err(Error::invalid_input(format!(
3386 "Datafile {} does not contain any fields",
3387 data_file.path
3388 )));
3389 }
3390 }
3391 }
3392 Ok(())
3393}
3394
3395fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> {
3399 for fragment in fragments {
3402 for field in schema.fields_pre_order() {
3403 if !fragment
3404 .files
3405 .iter()
3406 .flat_map(|f| f.fields.iter())
3407 .any(|f_id| f_id == &field.id)
3408 {
3409 return Err(Error::invalid_input(format!(
3410 "Fragment {} does not contain field {:?}",
3411 fragment.id, field
3412 )));
3413 }
3414 }
3415 }
3416 Ok(())
3417}
3418
3419fn merge_fragments_valid(manifest: &Manifest, new_fragments: &[Fragment]) -> Result<()> {
3423 let original_fragments = manifest.fragments.as_ref();
3424
3425 if new_fragments.len() < original_fragments.len() {
3427 return Err(Error::invalid_input(format!(
3428 "Merge operation reduced fragment count from {} to {}. \
3429 Merge operations should only add columns, not reduce fragments.",
3430 original_fragments.len(),
3431 new_fragments.len()
3432 )));
3433 }
3434
3435 let new_fragment_map: HashMap<u64, &Fragment> =
3437 new_fragments.iter().map(|f| (f.id, f)).collect();
3438
3439 let mut missing_fragments: Vec<u64> = Vec::new();
3442 for original_fragment in original_fragments {
3443 if let Some(new_fragment) = new_fragment_map.get(&original_fragment.id) {
3444 if original_fragment.physical_rows != new_fragment.physical_rows {
3446 return Err(Error::invalid_input(format!(
3447 "Merge operation changed row count for fragment {}. \
3448 Original: {:?}, New: {:?}. \
3449 Merge operations should preserve fragment row counts and only add new columns.",
3450 original_fragment.id,
3451 original_fragment.physical_rows,
3452 new_fragment.physical_rows
3453 )));
3454 }
3455 } else {
3456 missing_fragments.push(original_fragment.id);
3457 }
3458 }
3459
3460 if !missing_fragments.is_empty() {
3461 return Err(Error::invalid_input(format!(
3462 "Merge operation is missing original fragments: {:?}. \
3463 Merge operations should preserve all original fragments and only add new columns. \
3464 Expected fragments: {:?}, but got: {:?}",
3465 missing_fragments,
3466 original_fragments.iter().map(|f| f.id).collect::<Vec<_>>(),
3467 new_fragment_map.keys().copied().collect::<Vec<_>>()
3468 )));
3469 }
3470
3471 Ok(())
3472}
3473
3474#[cfg(test)]
3475mod tests {
3476 use super::*;
3477 use lance_io::utils::CachedFileSize;
3478
3479 #[test]
3480 fn test_rewrite_fragments() {
3481 let existing_fragments: Vec<Fragment> = (0..10).map(Fragment::new).collect();
3482
3483 let mut final_fragments = existing_fragments;
3484 let rewrite_groups = vec![
3485 RewriteGroup {
3488 old_fragments: vec![Fragment::new(1), Fragment::new(2)],
3489 new_fragments: vec![Fragment::new(15), Fragment::new(16)],
3491 },
3492 RewriteGroup {
3494 old_fragments: vec![Fragment::new(5), Fragment::new(8)],
3495 new_fragments: vec![Fragment::new(0)],
3498 },
3499 ];
3500
3501 let mut fragment_id = 20;
3502 let version = 0;
3503
3504 Transaction::handle_rewrite_fragments(
3505 &mut final_fragments,
3506 &rewrite_groups,
3507 &mut fragment_id,
3508 version,
3509 None,
3510 )
3511 .unwrap();
3512
3513 assert_eq!(fragment_id, 21);
3514
3515 let expected_fragments: Vec<Fragment> = vec![
3516 Fragment::new(0),
3517 Fragment::new(15),
3518 Fragment::new(16),
3519 Fragment::new(3),
3520 Fragment::new(4),
3521 Fragment::new(6),
3522 Fragment::new(7),
3523 Fragment::new(9),
3524 Fragment::new(20),
3525 ];
3526
3527 assert_eq!(final_fragments, expected_fragments);
3528 }
3529
3530 #[test]
3531 fn test_merge_fragments_valid() {
3532 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
3533 use lance_core::datatypes::Schema as LanceSchema;
3534 use lance_table::format::Manifest;
3535 use std::sync::Arc;
3536
3537 let schema = ArrowSchema::new(vec![
3539 ArrowField::new("id", DataType::Int32, false),
3540 ArrowField::new("name", DataType::Utf8, false),
3541 ]);
3542
3543 let original_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)];
3545
3546 let manifest = Manifest::new(
3548 LanceSchema::try_from(&schema).unwrap(),
3549 Arc::new(original_fragments),
3550 DataStorageFormat::new(LanceFileVersion::V2_0),
3551 HashMap::new(),
3552 );
3553
3554 let empty_fragments = vec![];
3556 let result = merge_fragments_valid(&manifest, &empty_fragments);
3557 assert!(result.is_err());
3558 assert!(
3559 result
3560 .unwrap_err()
3561 .to_string()
3562 .contains("reduced fragment count")
3563 );
3564
3565 let missing_fragments = vec![
3567 Fragment::new(1),
3568 Fragment::new(2),
3569 Fragment::new(4), ];
3572 let result = merge_fragments_valid(&manifest, &missing_fragments);
3573 assert!(result.is_err());
3574 assert!(
3575 result
3576 .unwrap_err()
3577 .to_string()
3578 .contains("missing original fragments")
3579 );
3580
3581 let reduced_fragments = vec![
3583 Fragment::new(1),
3584 Fragment::new(2),
3585 ];
3587 let result = merge_fragments_valid(&manifest, &reduced_fragments);
3588 assert!(result.is_err());
3589 assert!(
3590 result
3591 .unwrap_err()
3592 .to_string()
3593 .contains("reduced fragment count")
3594 );
3595
3596 let valid_fragments = vec![
3598 Fragment::new(1),
3599 Fragment::new(2),
3600 Fragment::new(3),
3601 Fragment::new(4), Fragment::new(5), ];
3604 let result = merge_fragments_valid(&manifest, &valid_fragments);
3605 assert!(result.is_ok());
3606
3607 let same_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)];
3609 let result = merge_fragments_valid(&manifest, &same_fragments);
3610 assert!(result.is_ok());
3611 }
3612
3613 #[test]
3614 fn test_remove_tombstoned_data_files() {
3615 let mut fragment = Fragment::new(1);
3617
3618 fragment.files.push(DataFile {
3620 path: "normal.lance".to_string(),
3621 fields: vec![1, 2, 3],
3622 column_indices: vec![],
3623 file_major_version: 2,
3624 file_minor_version: 0,
3625 file_size_bytes: CachedFileSize::new(1000),
3626 base_id: None,
3627 });
3628
3629 fragment.files.push(DataFile {
3631 path: "all_tombstoned.lance".to_string(),
3632 fields: vec![-2, -2, -2],
3633 column_indices: vec![],
3634 file_major_version: 2,
3635 file_minor_version: 0,
3636 file_size_bytes: CachedFileSize::new(500),
3637 base_id: None,
3638 });
3639
3640 fragment.files.push(DataFile {
3642 path: "mixed.lance".to_string(),
3643 fields: vec![4, -2, 5],
3644 column_indices: vec![],
3645 file_major_version: 2,
3646 file_minor_version: 0,
3647 file_size_bytes: CachedFileSize::new(750),
3648 base_id: None,
3649 });
3650
3651 fragment.files.push(DataFile {
3653 path: "another_tombstoned.lance".to_string(),
3654 fields: vec![-2],
3655 column_indices: vec![],
3656 file_major_version: 2,
3657 file_minor_version: 0,
3658 file_size_bytes: CachedFileSize::new(250),
3659 base_id: None,
3660 });
3661
3662 let mut fragments = vec![fragment];
3663
3664 Transaction::remove_tombstoned_data_files(&mut fragments);
3666
3667 assert_eq!(fragments[0].files.len(), 2);
3669 assert_eq!(fragments[0].files[0].path, "normal.lance");
3670 assert_eq!(fragments[0].files[1].path, "mixed.lance");
3671 }
3672
3673 #[test]
3674 fn test_assign_row_ids_new_fragment() {
3675 let mut fragments = vec![Fragment {
3677 id: 1,
3678 physical_rows: Some(100),
3679 row_id_meta: None,
3680 files: vec![],
3681 deletion_file: None,
3682 last_updated_at_version_meta: None,
3683 created_at_version_meta: None,
3684 }];
3685 let mut next_row_id = 0;
3686
3687 Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3688
3689 assert_eq!(next_row_id, 100);
3690 assert!(fragments[0].row_id_meta.is_some());
3691
3692 if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3693 let sequence = read_row_ids(data).unwrap();
3694 assert_eq!(sequence.len(), 100);
3695 let row_ids: Vec<u64> = sequence.iter().collect();
3696 assert_eq!(row_ids, (0..100).collect::<Vec<u64>>());
3697 } else {
3698 panic!("Expected inline row ID metadata");
3699 }
3700 }
3701
3702 #[test]
3703 fn test_assign_row_ids_existing_complete() {
3704 let existing_sequence = RowIdSequence::from(0..50);
3706 let serialized = write_row_ids(&existing_sequence);
3707
3708 let mut fragments = vec![Fragment {
3709 id: 1,
3710 physical_rows: Some(50),
3711 row_id_meta: Some(RowIdMeta::Inline(serialized)),
3712 files: vec![],
3713 deletion_file: None,
3714 last_updated_at_version_meta: None,
3715 created_at_version_meta: None,
3716 }];
3717 let mut next_row_id = 100;
3718
3719 Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3720
3721 assert_eq!(next_row_id, 100);
3723
3724 if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3725 let sequence = read_row_ids(data).unwrap();
3726 assert_eq!(sequence.len(), 50);
3727 let row_ids: Vec<u64> = sequence.iter().collect();
3728 assert_eq!(row_ids, (0..50).collect::<Vec<u64>>());
3729 } else {
3730 panic!("Expected inline row ID metadata");
3731 }
3732 }
3733
3734 #[test]
3735 fn test_assign_row_ids_partial_existing() {
3736 let existing_sequence = RowIdSequence::from(0..30);
3738 let serialized = write_row_ids(&existing_sequence);
3739
3740 let mut fragments = vec![Fragment {
3741 id: 1,
3742 physical_rows: Some(50), row_id_meta: Some(RowIdMeta::Inline(serialized)),
3744 files: vec![],
3745 deletion_file: None,
3746 last_updated_at_version_meta: None,
3747 created_at_version_meta: None,
3748 }];
3749 let mut next_row_id = 100;
3750
3751 Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3752
3753 assert_eq!(next_row_id, 120);
3755
3756 if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3757 let sequence = read_row_ids(data).unwrap();
3758 assert_eq!(sequence.len(), 50);
3759 let row_ids: Vec<u64> = sequence.iter().collect();
3760 let mut expected = (0..30).collect::<Vec<u64>>();
3762 expected.extend(100..120);
3763 assert_eq!(row_ids, expected);
3764 } else {
3765 panic!("Expected inline row ID metadata");
3766 }
3767 }
3768
3769 #[test]
3770 fn test_assign_row_ids_excess_row_ids() {
3771 let existing_sequence = RowIdSequence::from(0..60);
3773 let serialized = write_row_ids(&existing_sequence);
3774
3775 let mut fragments = vec![Fragment {
3776 id: 1,
3777 physical_rows: Some(50), row_id_meta: Some(RowIdMeta::Inline(serialized)),
3779 files: vec![],
3780 deletion_file: None,
3781 last_updated_at_version_meta: None,
3782 created_at_version_meta: None,
3783 }];
3784 let mut next_row_id = 100;
3785
3786 let result = Transaction::assign_row_ids(&mut next_row_id, &mut fragments);
3787
3788 assert!(result.is_err());
3789 if let Err(Error::Internal { message, .. }) = result {
3790 assert!(message.contains("more row IDs (60) than physical rows (50)"));
3791 } else {
3792 panic!("Expected Internal error about excess row IDs");
3793 }
3794 }
3795
3796 #[test]
3797 fn test_assign_row_ids_multiple_fragments() {
3798 let existing_sequence = RowIdSequence::from(500..520);
3800 let serialized = write_row_ids(&existing_sequence);
3801
3802 let mut fragments = vec![
3803 Fragment {
3804 id: 1,
3805 physical_rows: Some(30), row_id_meta: None,
3807 files: vec![],
3808 deletion_file: None,
3809 last_updated_at_version_meta: None,
3810 created_at_version_meta: None,
3811 },
3812 Fragment {
3813 id: 2,
3814 physical_rows: Some(25), row_id_meta: Some(RowIdMeta::Inline(serialized)),
3816 files: vec![],
3817 deletion_file: None,
3818 last_updated_at_version_meta: None,
3819 created_at_version_meta: None,
3820 },
3821 ];
3822 let mut next_row_id = 1000;
3823
3824 Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3825
3826 assert_eq!(next_row_id, 1035);
3828
3829 if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3831 let sequence = read_row_ids(data).unwrap();
3832 assert_eq!(sequence.len(), 30);
3833 let row_ids: Vec<u64> = sequence.iter().collect();
3834 assert_eq!(row_ids, (1000..1030).collect::<Vec<u64>>());
3835 } else {
3836 panic!("Expected inline row ID metadata for first fragment");
3837 }
3838
3839 if let Some(RowIdMeta::Inline(data)) = &fragments[1].row_id_meta {
3841 let sequence = read_row_ids(data).unwrap();
3842 assert_eq!(sequence.len(), 25);
3843 let row_ids: Vec<u64> = sequence.iter().collect();
3844 let mut expected = (500..520).collect::<Vec<u64>>();
3846 expected.extend(1030..1035);
3847 assert_eq!(row_ids, expected);
3848 } else {
3849 panic!("Expected inline row ID metadata for second fragment");
3850 }
3851 }
3852
3853 #[test]
3854 fn test_assign_row_ids_missing_physical_rows() {
3855 let mut fragments = vec![Fragment {
3857 id: 1,
3858 physical_rows: None,
3859 row_id_meta: None,
3860 files: vec![],
3861 deletion_file: None,
3862 last_updated_at_version_meta: None,
3863 created_at_version_meta: None,
3864 }];
3865 let mut next_row_id = 0;
3866
3867 let result = Transaction::assign_row_ids(&mut next_row_id, &mut fragments);
3868
3869 assert!(result.is_err());
3870 if let Err(Error::Internal { message, .. }) = result {
3871 assert!(message.contains("Fragment does not have physical rows"));
3872 } else {
3873 panic!("Expected Internal error about missing physical rows");
3874 }
3875 }
3876
3877 fn create_test_index(
3879 name: &str,
3880 field_id: i32,
3881 dataset_version: u64,
3882 fragment_bitmap: Option<RoaringBitmap>,
3883 is_vector: bool,
3884 ) -> IndexMetadata {
3885 use prost_types::Any;
3886 use std::sync::Arc;
3887 use uuid::Uuid;
3888
3889 let index_details = if is_vector {
3890 Some(Arc::new(Any {
3891 type_url: "type.googleapis.com/lance.index.VectorIndexDetails".to_string(),
3892 value: vec![],
3893 }))
3894 } else {
3895 Some(Arc::new(Any {
3896 type_url: "type.googleapis.com/lance.index.ScalarIndexDetails".to_string(),
3897 value: vec![],
3898 }))
3899 };
3900
3901 IndexMetadata {
3902 uuid: Uuid::new_v4(),
3903 fields: vec![field_id],
3904 name: name.to_string(),
3905 dataset_version,
3906 fragment_bitmap,
3907 index_details,
3908 index_version: 1,
3909 created_at: None,
3910 base_id: None,
3911 }
3912 }
3913
3914 fn create_system_index(name: &str, field_id: i32) -> IndexMetadata {
3915 use prost_types::Any;
3916 use std::sync::Arc;
3917 use uuid::Uuid;
3918
3919 IndexMetadata {
3920 uuid: Uuid::new_v4(),
3921 fields: vec![field_id],
3922 name: name.to_string(),
3923 dataset_version: 1,
3924 fragment_bitmap: Some(RoaringBitmap::from_iter([1, 2])),
3925 index_details: Some(Arc::new(Any {
3926 type_url: "type.googleapis.com/lance.index.SystemIndexDetails".to_string(),
3927 value: vec![],
3928 })),
3929 index_version: 1,
3930 created_at: None,
3931 base_id: None,
3932 }
3933 }
3934
3935 fn create_test_schema(field_ids: &[i32]) -> Schema {
3936 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
3937 use lance_core::datatypes::Schema as LanceSchema;
3938
3939 let fields: Vec<ArrowField> = field_ids
3940 .iter()
3941 .map(|id| ArrowField::new(format!("field_{}", id), DataType::Int32, false))
3942 .collect();
3943
3944 let arrow_schema = ArrowSchema::new(fields);
3945 let mut lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
3946
3947 for (i, field_id) in field_ids.iter().enumerate() {
3949 lance_schema.mut_field_by_id(i as i32).unwrap().id = *field_id;
3950 }
3951
3952 lance_schema
3953 }
3954
3955 #[test]
3956 fn test_retain_indices_removes_missing_fields() {
3957 let schema = create_test_schema(&[1, 2]);
3958 let fragments = vec![Fragment::new(1), Fragment::new(2)];
3959
3960 let mut indices = vec![
3961 create_test_index("idx1", 1, 1, Some(RoaringBitmap::from_iter([1])), false),
3962 create_test_index("idx2", 2, 1, Some(RoaringBitmap::from_iter([1])), false),
3963 create_test_index("idx3", 99, 1, Some(RoaringBitmap::from_iter([1])), false), ];
3965
3966 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
3967
3968 assert_eq!(indices.len(), 2);
3969 assert!(indices.iter().all(|idx| idx.fields[0] != 99));
3970 }
3971
3972 #[test]
3973 fn test_retain_indices_keeps_system_indices() {
3974 use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
3975
3976 let schema = create_test_schema(&[1, 2]);
3977 let fragments = vec![Fragment::new(1)];
3978
3979 let mut indices = vec![
3980 create_system_index(FRAG_REUSE_INDEX_NAME, 99), create_system_index(MEM_WAL_INDEX_NAME, 99), create_test_index("regular_idx", 99, 1, Some(RoaringBitmap::new()), false), ];
3984
3985 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
3986
3987 assert_eq!(indices.len(), 2);
3988 assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
3989 assert!(indices.iter().any(|idx| idx.name == MEM_WAL_INDEX_NAME));
3990 }
3991
3992 #[test]
3993 fn test_retain_indices_keeps_fragment_reuse_index() {
3994 let schema = create_test_schema(&[1]);
3995 let fragments = vec![Fragment::new(1)];
3996
3997 let mut indices = vec![
3998 create_system_index(FRAG_REUSE_INDEX_NAME, 1),
3999 create_test_index("other_idx", 1, 1, Some(RoaringBitmap::new()), false),
4000 ];
4001
4002 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4003
4004 assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
4006 }
4007
4008 #[test]
4009 fn test_retain_single_empty_scalar_index() {
4010 let schema = create_test_schema(&[1]);
4011 let fragments = vec![Fragment::new(1)];
4012
4013 let mut indices = vec![create_test_index(
4014 "scalar_idx",
4015 1,
4016 1,
4017 Some(RoaringBitmap::new()), false,
4019 )];
4020
4021 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4022
4023 assert_eq!(indices.len(), 1);
4025 }
4026
4027 #[test]
4028 fn test_retain_single_empty_vector_index() {
4029 let schema = create_test_schema(&[1]);
4030 let fragments = vec![Fragment::new(1)];
4031
4032 let mut indices = vec![create_test_index(
4033 "vector_idx",
4034 1,
4035 1,
4036 Some(RoaringBitmap::new()), true,
4038 )];
4039
4040 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4041
4042 assert_eq!(indices.len(), 0);
4044 }
4045
4046 #[test]
4047 fn test_retain_single_nonempty_index() {
4048 let schema = create_test_schema(&[1]);
4049 let fragments = vec![Fragment::new(1)];
4050
4051 let mut scalar_indices = vec![create_test_index(
4052 "scalar_idx",
4053 1,
4054 1,
4055 Some(RoaringBitmap::from_iter([1])),
4056 false,
4057 )];
4058
4059 let mut vector_indices = vec![create_test_index(
4060 "vector_idx",
4061 1,
4062 1,
4063 Some(RoaringBitmap::from_iter([1])),
4064 true,
4065 )];
4066
4067 Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4068 Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4069
4070 assert_eq!(scalar_indices.len(), 1);
4072 assert_eq!(vector_indices.len(), 1);
4073 }
4074
4075 #[test]
4076 fn test_retain_single_index_with_none_bitmap() {
4077 let schema = create_test_schema(&[1]);
4078 let fragments = vec![Fragment::new(1)];
4079
4080 let mut scalar_indices = vec![create_test_index("scalar_idx", 1, 1, None, false)];
4081 let mut vector_indices = vec![create_test_index("vector_idx", 1, 1, None, true)];
4082
4083 Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4084 Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4085
4086 assert_eq!(scalar_indices.len(), 1);
4088 assert_eq!(vector_indices.len(), 0);
4089 }
4090
4091 #[test]
4092 fn test_retain_multiple_empty_scalar_indices_keeps_oldest() {
4093 let schema = create_test_schema(&[1]);
4094 let fragments = vec![Fragment::new(1)];
4095
4096 let mut indices = vec![
4097 create_test_index("idx", 1, 3, Some(RoaringBitmap::new()), false),
4098 create_test_index("idx", 1, 1, Some(RoaringBitmap::new()), false), create_test_index("idx", 1, 2, Some(RoaringBitmap::new()), false),
4100 ];
4101
4102 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4103
4104 assert_eq!(indices.len(), 1);
4106 assert_eq!(indices[0].dataset_version, 1);
4107 }
4108
4109 #[test]
4110 fn test_retain_multiple_empty_vector_indices_removes_all() {
4111 let schema = create_test_schema(&[1]);
4112 let fragments = vec![Fragment::new(1)];
4113
4114 let mut indices = vec![
4115 create_test_index("vec_idx", 1, 1, Some(RoaringBitmap::new()), true),
4116 create_test_index("vec_idx", 1, 2, Some(RoaringBitmap::new()), true),
4117 create_test_index("vec_idx", 1, 3, Some(RoaringBitmap::new()), true),
4118 ];
4119
4120 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4121
4122 assert_eq!(indices.len(), 0);
4124 }
4125
4126 #[test]
4127 fn test_retain_mixed_empty_nonempty_keeps_nonempty() {
4128 let schema = create_test_schema(&[1]);
4129 let fragments = vec![Fragment::new(1)];
4130
4131 let mut indices = vec![
4132 create_test_index("idx", 1, 1, Some(RoaringBitmap::new()), false), create_test_index("idx", 1, 2, Some(RoaringBitmap::from_iter([1])), false), create_test_index("idx", 1, 3, Some(RoaringBitmap::new()), false), create_test_index("idx", 1, 4, Some(RoaringBitmap::from_iter([1])), false), ];
4137
4138 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4139
4140 assert_eq!(indices.len(), 2);
4142 assert!(
4143 indices
4144 .iter()
4145 .all(|idx| idx.dataset_version == 2 || idx.dataset_version == 4)
4146 );
4147 }
4148
4149 #[test]
4150 fn test_retain_mixed_empty_nonempty_vector_keeps_nonempty() {
4151 let schema = create_test_schema(&[1]);
4152 let fragments = vec![Fragment::new(1)];
4153
4154 let mut indices = vec![
4155 create_test_index("vec_idx", 1, 1, Some(RoaringBitmap::new()), true), create_test_index("vec_idx", 1, 2, Some(RoaringBitmap::from_iter([1])), true), create_test_index("vec_idx", 1, 3, Some(RoaringBitmap::new()), true), ];
4159
4160 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4161
4162 assert_eq!(indices.len(), 1);
4164 assert_eq!(indices[0].dataset_version, 2);
4165 }
4166
4167 #[test]
4168 fn test_retain_fragment_bitmap_with_nonexistent_fragments() {
4169 let schema = create_test_schema(&[1]);
4170 let fragments = vec![Fragment::new(1), Fragment::new(2)]; let mut indices = vec![create_test_index(
4173 "idx",
4174 1,
4175 1,
4176 Some(RoaringBitmap::from_iter([1, 2, 3, 4])), false,
4178 )];
4179
4180 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4181
4182 assert_eq!(indices.len(), 1);
4184 assert_eq!(
4186 indices[0].fragment_bitmap.as_ref().unwrap(),
4187 &RoaringBitmap::from_iter([1, 2, 3, 4])
4188 );
4189 }
4190
4191 #[test]
4192 fn test_retain_effective_empty_bitmap_single_index() {
4193 let schema = create_test_schema(&[1]);
4194 let fragments = vec![Fragment::new(5), Fragment::new(6)];
4195
4196 let mut scalar_indices = vec![create_test_index(
4198 "scalar_idx",
4199 1,
4200 1,
4201 Some(RoaringBitmap::from_iter([1, 2, 3])),
4202 false,
4203 )];
4204
4205 let mut vector_indices = vec![create_test_index(
4206 "vector_idx",
4207 1,
4208 1,
4209 Some(RoaringBitmap::from_iter([1, 2, 3])),
4210 true,
4211 )];
4212
4213 Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4214 Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4215
4216 assert_eq!(scalar_indices.len(), 1);
4219 assert_eq!(vector_indices.len(), 0);
4220 }
4221
4222 #[test]
4223 fn test_retain_different_index_names() {
4224 let schema = create_test_schema(&[1]);
4225 let fragments = vec![Fragment::new(1)];
4226
4227 let mut indices = vec![
4228 create_test_index("idx_a", 1, 1, Some(RoaringBitmap::new()), false),
4229 create_test_index("idx_b", 1, 1, Some(RoaringBitmap::new()), true),
4230 create_test_index("idx_c", 1, 1, Some(RoaringBitmap::from_iter([1])), false),
4231 ];
4232
4233 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4234
4235 assert_eq!(indices.len(), 2);
4237 assert!(indices.iter().any(|idx| idx.name == "idx_a"));
4238 assert!(indices.iter().any(|idx| idx.name == "idx_c"));
4239 assert!(!indices.iter().any(|idx| idx.name == "idx_b"));
4240 }
4241
4242 #[test]
4243 fn test_retain_empty_indices_vec() {
4244 let schema = create_test_schema(&[1]);
4245 let fragments = vec![Fragment::new(1)];
4246
4247 let mut indices: Vec<IndexMetadata> = vec![];
4248
4249 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4250
4251 assert_eq!(indices.len(), 0);
4252 }
4253
4254 #[test]
4255 fn test_retain_all_indices_removed() {
4256 let schema = create_test_schema(&[1]);
4257 let fragments = vec![Fragment::new(1)];
4258
4259 let mut indices = vec![
4260 create_test_index("vec1", 1, 1, Some(RoaringBitmap::new()), true),
4261 create_test_index("vec2", 1, 1, Some(RoaringBitmap::new()), true),
4262 create_test_index("idx3", 99, 1, Some(RoaringBitmap::from_iter([1])), false), ];
4264
4265 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4266
4267 assert_eq!(indices.len(), 0);
4268 }
4269
4270 #[test]
4271 fn test_retain_complex_scenario() {
4272 let schema = create_test_schema(&[1, 2]);
4273 let fragments = vec![Fragment::new(1), Fragment::new(2)];
4274
4275 let mut indices = vec![
4276 create_system_index(FRAG_REUSE_INDEX_NAME, 1),
4278 create_test_index("idx_a", 1, 3, Some(RoaringBitmap::new()), false),
4280 create_test_index("idx_a", 1, 1, Some(RoaringBitmap::new()), false), create_test_index("idx_a", 1, 2, Some(RoaringBitmap::new()), false),
4282 create_test_index("vec_b", 1, 1, Some(RoaringBitmap::new()), true),
4284 create_test_index("vec_b", 1, 2, Some(RoaringBitmap::new()), true),
4285 create_test_index("idx_c", 2, 1, Some(RoaringBitmap::new()), false),
4287 create_test_index("idx_c", 2, 2, Some(RoaringBitmap::from_iter([1])), false), create_test_index("idx_c", 2, 3, Some(RoaringBitmap::from_iter([2])), false), create_test_index("idx_d", 1, 1, Some(RoaringBitmap::from_iter([1, 2])), false),
4291 create_test_index("idx_e", 99, 1, Some(RoaringBitmap::from_iter([1])), false),
4293 ];
4294
4295 Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4296
4297 assert_eq!(indices.len(), 5);
4299
4300 assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
4302
4303 let idx_a_indices: Vec<_> = indices.iter().filter(|idx| idx.name == "idx_a").collect();
4305 assert_eq!(idx_a_indices.len(), 1);
4306 assert_eq!(idx_a_indices[0].dataset_version, 1);
4307
4308 assert!(!indices.iter().any(|idx| idx.name == "vec_b"));
4310
4311 let idx_c_indices: Vec<_> = indices.iter().filter(|idx| idx.name == "idx_c").collect();
4313 assert_eq!(idx_c_indices.len(), 2);
4314 assert!(
4315 idx_c_indices
4316 .iter()
4317 .all(|idx| idx.dataset_version == 2 || idx.dataset_version == 3)
4318 );
4319
4320 assert!(indices.iter().any(|idx| idx.name == "idx_d"));
4322
4323 assert!(!indices.iter().any(|idx| idx.name == "idx_e"));
4325 }
4326
4327 #[test]
4328 fn test_handle_rewrite_indices_skips_missing_index() {
4329 use uuid::Uuid;
4330
4331 let mut indices = vec![];
4333
4334 let rewritten_indices = vec![RewrittenIndex {
4336 old_id: Uuid::new_v4(),
4337 new_id: Uuid::new_v4(),
4338 new_index_details: prost_types::Any {
4339 type_url: String::new(),
4340 value: vec![],
4341 },
4342 new_index_version: 1,
4343 }];
4344
4345 let result = Transaction::handle_rewrite_indices(&mut indices, &rewritten_indices, &[]);
4347 assert!(result.is_ok());
4348 assert!(indices.is_empty());
4349 }
4350}