1use std::collections::{HashMap, HashSet};
23use std::num::NonZero;
24use std::sync::Arc;
25use std::time::Instant;
26
27use conflict_resolver::TransactionRebase;
28use lance_core::utils::backoff::{Backoff, SlotBackoff};
29use lance_core::utils::mask::RowIdTreeMap;
30use lance_file::version::LanceFileVersion;
31use lance_index::metrics::NoOpMetricsCollector;
32use lance_io::utils::CachedFileSize;
33use lance_table::format::{
34 is_detached_version, pb, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest,
35 WriterVersion, DETACHED_VERSION_MASK,
36};
37use lance_table::io::commit::{
38 CommitConfig, CommitError, CommitHandler, ManifestLocation, ManifestNamingScheme,
39};
40use rand::{rng, Rng};
41use snafu::location;
42
43use futures::future::Either;
44use futures::{StreamExt, TryFutureExt, TryStreamExt};
45use lance_core::{Error, Result};
46use lance_index::{is_system_index, DatasetIndexExt};
47use log;
48use object_store::path::Path;
49use prost::Message;
50
51use super::ObjectStore;
52use crate::dataset::cleanup::auto_cleanup_hook;
53use crate::dataset::fragment::FileFragment;
54use crate::dataset::transaction::{Operation, Transaction};
55use crate::dataset::{
56 load_new_transactions, write_manifest_file, ManifestWriteConfig, NewTransactionResult, BLOB_DIR,
57};
58use crate::index::DatasetIndexInternalExt;
59use crate::io::deletion::read_dataset_deletion_file;
60use crate::session::caches::DSMetadataCache;
61use crate::session::index_caches::IndexMetadataKey;
62use crate::session::Session;
63use crate::Dataset;
64
65mod conflict_resolver;
66#[cfg(all(feature = "dynamodb_tests", test))]
67mod dynamodb;
68#[cfg(test)]
69mod external_manifest;
70#[cfg(all(feature = "dynamodb_tests", test))]
71mod s3_test;
72
73pub(crate) async fn read_transaction_file(
75 object_store: &ObjectStore,
76 base_path: &Path,
77 transaction_file: &str,
78) -> Result<Transaction> {
79 let path = base_path.child("_transactions").child(transaction_file);
80 let result = object_store.inner.get(&path).await?;
81 let data = result.bytes().await?;
82 let transaction = pb::Transaction::decode(data)?;
83 transaction.try_into()
84}
85
86async fn write_transaction_file(
88 object_store: &ObjectStore,
89 base_path: &Path,
90 transaction: &Transaction,
91) -> Result<String> {
92 let file_name = format!("{}-{}.txn", transaction.read_version, transaction.uuid);
93 let path = base_path.child("_transactions").child(file_name.as_str());
94
95 let message = pb::Transaction::from(transaction);
96 let buf = message.encode_to_vec();
97 object_store.inner.put(&path, buf.into()).await?;
98
99 Ok(file_name)
100}
101
102#[allow(clippy::too_many_arguments)]
103async fn do_commit_new_dataset(
104 object_store: &ObjectStore,
105 commit_handler: &dyn CommitHandler,
106 base_path: &Path,
107 transaction: &Transaction,
108 write_config: &ManifestWriteConfig,
109 manifest_naming_scheme: ManifestNamingScheme,
110 blob_version: Option<u64>,
111 metadata_cache: &DSMetadataCache,
112) -> Result<(Manifest, ManifestLocation)> {
113 let transaction_file = write_transaction_file(object_store, base_path, transaction).await?;
114
115 let (mut manifest, indices) = if let Operation::Clone {
116 ref_name,
117 ref_version,
118 ref_path,
119 branch_name,
120 ..
121 } = &transaction.operation
122 {
123 let source_manifest_location = commit_handler
124 .resolve_version_location(
125 &Path::parse(ref_path.as_str())?,
126 *ref_version,
127 &object_store.inner,
128 )
129 .await?;
130 let source_manifest = Dataset::load_manifest(
131 object_store,
132 &source_manifest_location,
133 base_path.to_string().as_str(),
134 &Session::default(),
135 )
136 .await?;
137
138 let new_base_id = source_manifest
139 .base_paths
140 .keys()
141 .max()
142 .map(|id| *id + 1)
143 .unwrap_or(0);
144 let new_manifest = source_manifest.shallow_clone(
145 ref_name.clone(),
146 ref_path.clone(),
147 new_base_id,
148 branch_name.clone(),
149 transaction_file,
150 );
151
152 let updated_indices = if let Some(index_section_pos) = source_manifest.index_section {
153 let reader = object_store.open(&source_manifest_location.path).await?;
154 let section: pb::IndexSection =
155 lance_io::utils::read_message(reader.as_ref(), index_section_pos).await?;
156 section
157 .indices
158 .into_iter()
159 .map(|index_pb| {
160 let mut index = IndexMetadata::try_from(index_pb)?;
161 index.base_id = Some(new_base_id);
162 Ok(index)
163 })
164 .collect::<Result<Vec<_>>>()?
165 } else {
166 vec![]
167 };
168 (new_manifest, updated_indices)
169 } else {
170 let (manifest, indices) = transaction.build_manifest(
171 None,
172 vec![],
173 &transaction_file,
174 write_config,
175 blob_version,
176 )?;
177 (manifest, indices)
178 };
179
180 manifest.blob_dataset_version = blob_version;
181
182 let result = write_manifest_file(
183 object_store,
184 commit_handler,
185 base_path,
186 &mut manifest,
187 if indices.is_empty() {
188 None
189 } else {
190 Some(indices.clone())
191 },
192 write_config,
193 manifest_naming_scheme,
194 )
195 .await;
196
197 match result {
200 Ok(manifest_location) => {
201 let tx_key = crate::session::caches::TransactionKey {
202 version: manifest.version,
203 };
204 metadata_cache
205 .insert_with_key(&tx_key, Arc::new(transaction.clone()))
206 .await;
207
208 let manifest_key = crate::session::caches::ManifestKey {
209 version: manifest_location.version,
210 e_tag: manifest_location.e_tag.as_deref(),
211 };
212 metadata_cache
213 .insert_with_key(&manifest_key, Arc::new(manifest.clone()))
214 .await;
215 Ok((manifest, manifest_location))
216 }
217 Err(CommitError::CommitConflict) => Err(crate::Error::DatasetAlreadyExists {
218 uri: base_path.to_string(),
219 location: location!(),
220 }),
221 Err(CommitError::OtherError(err)) => Err(err),
222 }
223}
224
225pub(crate) async fn commit_new_dataset(
226 object_store: &ObjectStore,
227 commit_handler: &dyn CommitHandler,
228 base_path: &Path,
229 transaction: &Transaction,
230 write_config: &ManifestWriteConfig,
231 manifest_naming_scheme: ManifestNamingScheme,
232 metadata_cache: &crate::session::caches::DSMetadataCache,
233) -> Result<(Manifest, ManifestLocation)> {
234 let blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
235 let blob_path = base_path.child(BLOB_DIR);
236 let blob_tx = Transaction::new(0, blob_op.clone(), None, None);
237 let (blob_manifest, _) = do_commit_new_dataset(
238 object_store,
239 commit_handler,
240 &blob_path,
241 &blob_tx,
242 write_config,
243 manifest_naming_scheme,
244 None,
245 metadata_cache,
246 )
247 .await?;
248 Some(blob_manifest.version)
249 } else {
250 None
251 };
252
253 do_commit_new_dataset(
254 object_store,
255 commit_handler,
256 base_path,
257 transaction,
258 write_config,
259 manifest_naming_scheme,
260 blob_version,
261 metadata_cache,
262 )
263 .await
264}
265
266pub fn manifest_needs_migration(manifest: &Manifest, indices: &[IndexMetadata]) -> bool {
274 manifest.writer_version.is_none()
275 || manifest.fragments.iter().any(|f| {
276 f.physical_rows.is_none()
277 || (f
278 .deletion_file
279 .as_ref()
280 .map(|d| d.num_deleted_rows.is_none())
281 .unwrap_or(false))
282 })
283 || indices
284 .iter()
285 .any(|i| must_recalculate_fragment_bitmap(i, manifest.writer_version.as_ref()))
286}
287
288async fn migrate_manifest(
293 dataset: &Dataset,
294 manifest: &mut Manifest,
295 recompute_stats: bool,
296) -> Result<()> {
297 if !recompute_stats
298 && manifest.fragments.iter().all(|f| {
299 f.num_rows().map(|n| n > 0).unwrap_or(false)
300 && f.files.iter().all(|f| f.file_size_bytes.get().is_some())
301 })
302 {
303 return Ok(());
304 }
305
306 manifest.fragments =
307 Arc::new(migrate_fragments(dataset, &manifest.fragments, recompute_stats).await?);
308
309 Ok(())
310}
311
312fn check_storage_version(manifest: &mut Manifest) -> Result<()> {
313 let data_storage_version = manifest.data_storage_format.lance_file_version()?;
314 if manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy {
315 if let Some(actual_file_version) =
319 Fragment::try_infer_version(&manifest.fragments).map_err(|e| Error::Internal {
320 message: format!(
321 "The dataset contains a mixture of file versions. You will need to rollback to an earlier version: {}",
322 e
323 ),
324 location: location!(),
325 })? {
326 if actual_file_version > data_storage_version {
327 log::warn!(
328 "Data storage version {} is less than the actual file version {}. This has been automatically updated.",
329 data_storage_version,
330 actual_file_version
331 );
332 manifest.data_storage_format = DataStorageFormat::new(actual_file_version);
333 }
334 }
335 } else {
336 if let Some(actual_file_version) = Fragment::try_infer_version(&manifest.fragments)? {
339 if actual_file_version != data_storage_version {
340 return Err(Error::Internal {
341 message: format!(
342 "The operation added files with version {}. However, the data storage version is {}.",
343 actual_file_version,
344 data_storage_version
345 ),
346 location: location!(),
347 });
348 }
349 }
350 }
351 Ok(())
352}
353
354fn fix_schema(manifest: &mut Manifest) -> Result<()> {
358 if manifest.fragments.iter().all(|f| f.files.len() <= 1) {
360 return Ok(());
361 }
362
363 let mut fields_with_duplicate_ids = HashSet::new();
365 let mut seen_fields = HashSet::new();
366 for fragment in manifest.fragments.iter() {
367 for file in fragment.files.iter() {
368 for field_id in file.fields.iter() {
369 if *field_id >= 0 && !seen_fields.insert(*field_id) {
370 fields_with_duplicate_ids.insert(*field_id);
371 }
372 }
373 }
374 seen_fields.clear();
375 }
376 if fields_with_duplicate_ids.is_empty() {
377 return Ok(());
378 }
379
380 let mut field_id_seed = manifest.max_field_id() + 1;
382 let mut old_field_id_mapping: HashMap<i32, i32> = HashMap::new();
383 let mut fields_with_duplicate_ids = fields_with_duplicate_ids.into_iter().collect::<Vec<_>>();
384 fields_with_duplicate_ids.sort_unstable();
385 for field_id in fields_with_duplicate_ids {
386 old_field_id_mapping.insert(field_id, field_id_seed);
387 field_id_seed += 1;
388 }
389
390 let mut fragments = manifest.fragments.as_ref().clone();
391
392 seen_fields.clear();
395 for fragment in fragments.iter_mut() {
396 for field_id in fragment
397 .files
398 .iter_mut()
399 .rev()
400 .flat_map(|file| file.fields.iter_mut())
401 {
402 if let Some(new_field_id) = old_field_id_mapping.get(field_id) {
403 if seen_fields.insert(*field_id) {
404 *field_id = *new_field_id;
405 }
406 }
407 }
408 seen_fields.clear();
409 }
410
411 for (old_field_id, new_field_id) in &old_field_id_mapping {
413 let field = manifest.schema.mut_field_by_id(*old_field_id).unwrap();
414 field.id = *new_field_id;
415
416 if let Some(local_field) = manifest.local_schema.mut_field_by_id(*old_field_id) {
417 local_field.id = *new_field_id;
418 }
419 }
420
421 let remaining_field_ids = manifest
423 .schema
424 .fields_pre_order()
425 .map(|f| f.id)
426 .collect::<HashSet<_>>();
427 for fragment in fragments.iter_mut() {
428 fragment.files.retain(|file| {
429 file.fields
430 .iter()
431 .any(|field_id| remaining_field_ids.contains(field_id))
432 });
433 }
434
435 manifest.fragments = Arc::new(fragments);
436
437 Ok(())
438}
439
440pub(crate) async fn migrate_fragments(
444 dataset: &Dataset,
445 fragments: &[Fragment],
446 recompute_stats: bool,
447) -> Result<Vec<Fragment>> {
448 let dataset = Arc::new(dataset.clone());
449 let new_fragments = futures::stream::iter(fragments)
450 .map(|fragment| async {
451 let physical_rows = if recompute_stats {
452 None
453 } else {
454 fragment.physical_rows
455 };
456 let physical_rows = if let Some(physical_rows) = physical_rows {
457 Either::Right(futures::future::ready(Ok(physical_rows)))
458 } else {
459 let file_fragment = FileFragment::new(dataset.clone(), fragment.clone());
460 Either::Left(async move { file_fragment.physical_rows().await })
461 };
462 let num_deleted_rows = match &fragment.deletion_file {
463 None => Either::Left(futures::future::ready(Ok(None))),
464 Some(DeletionFile {
465 num_deleted_rows: Some(deleted_rows),
466 ..
467 }) if !recompute_stats => {
468 Either::Left(futures::future::ready(Ok(Some(*deleted_rows))))
469 }
470 Some(deletion_file) => Either::Right(async {
471 let deletion_vector =
472 read_dataset_deletion_file(dataset.as_ref(), fragment.id, deletion_file)
473 .await?;
474 Ok(Some(deletion_vector.len()))
475 }),
476 };
477
478 let (physical_rows, num_deleted_rows) =
479 futures::future::try_join(physical_rows, num_deleted_rows).await?;
480
481 let mut data_files = fragment.files.clone();
482
483 let object_store = dataset.object_store();
485 let get_sizes = data_files
486 .iter()
487 .map(|file| {
488 if let Some(size) = file.file_size_bytes.get() {
489 Either::Left(futures::future::ready(Ok(size)))
490 } else {
491 Either::Right(async {
492 object_store
493 .size(&dataset.base.child("data").child(file.path.clone()))
494 .map_ok(|size| {
495 NonZero::new(size).ok_or_else(|| Error::Internal {
496 message: format!("File {} has size 0", file.path),
497 location: location!(),
498 })
499 })
500 .await?
501 })
502 }
503 })
504 .collect::<Vec<_>>();
505 let sizes = futures::future::try_join_all(get_sizes).await?;
506 data_files.iter_mut().zip(sizes).for_each(|(file, size)| {
507 file.file_size_bytes = CachedFileSize::new(size.into());
508 });
509
510 let deletion_file = fragment
511 .deletion_file
512 .as_ref()
513 .map(|deletion_file| DeletionFile {
514 num_deleted_rows,
515 ..deletion_file.clone()
516 });
517
518 Ok::<_, Error>(Fragment {
519 physical_rows: Some(physical_rows),
520 deletion_file,
521 files: data_files,
522 ..fragment.clone()
523 })
524 })
525 .buffered(dataset.object_store.io_parallelism())
526 .try_filter(|frag| futures::future::ready(frag.num_rows().map(|n| n > 0).unwrap_or(false)))
528 .boxed();
529
530 new_fragments.try_collect().await
531}
532
533fn must_recalculate_fragment_bitmap(
534 index: &IndexMetadata,
535 version: Option<&WriterVersion>,
536) -> bool {
537 index.fragment_bitmap.is_none() || version.map(|v| v.older_than(0, 8, 15)).unwrap_or(true)
540}
541
542async fn migrate_indices(dataset: &Dataset, indices: &mut [IndexMetadata]) -> Result<()> {
546 let needs_recalculating = match detect_overlapping_fragments(indices) {
547 Ok(()) => vec![],
548 Err(BadFragmentBitmapError { bad_indices }) => {
549 bad_indices.into_iter().map(|(name, _)| name).collect()
550 }
551 };
552 for index in indices {
553 if needs_recalculating.contains(&index.name)
554 || must_recalculate_fragment_bitmap(index, dataset.manifest.writer_version.as_ref())
555 && !is_system_index(index)
556 {
557 debug_assert_eq!(index.fields.len(), 1);
558 let idx_field = dataset.schema().field_by_id(index.fields[0]).ok_or_else(|| Error::Internal { message: format!("Index with uuid {} referred to field with id {} which did not exist in dataset", index.uuid, index.fields[0]), location: location!() })?;
559 let idx = dataset
561 .open_generic_index(
562 &idx_field.name,
563 &index.uuid.to_string(),
564 &NoOpMetricsCollector,
565 )
566 .await?;
567 index.fragment_bitmap = Some(idx.calculate_included_frags().await?);
568 }
569 if index.index_details.is_none() {
572 log::debug!("the index with uuid {} is missing index metadata. This probably means it was written with Lance version <= 0.19.2. This is not a problem.", index.uuid);
573 }
574 }
575
576 Ok(())
577}
578
579pub(crate) struct BadFragmentBitmapError {
580 pub bad_indices: Vec<(String, Vec<u32>)>,
581}
582
583pub(crate) fn detect_overlapping_fragments(
586 indices: &[IndexMetadata],
587) -> std::result::Result<(), BadFragmentBitmapError> {
588 let index_names: HashSet<&str> = indices.iter().map(|i| i.name.as_str()).collect();
589 let mut bad_indices = Vec::new(); for name in index_names {
591 let mut seen_fragment_ids = HashSet::new();
592 let mut overlap = Vec::new();
593 for index in indices.iter().filter(|i| i.name == name) {
594 if let Some(fragment_bitmap) = index.fragment_bitmap.as_ref() {
595 for fragment in fragment_bitmap {
596 if !seen_fragment_ids.insert(fragment) {
597 overlap.push(fragment);
598 }
599 }
600 }
601 }
602 if !overlap.is_empty() {
603 bad_indices.push((name.to_string(), overlap));
604 }
605 }
606 if bad_indices.is_empty() {
607 Ok(())
608 } else {
609 Err(BadFragmentBitmapError { bad_indices })
610 }
611}
612
613pub(crate) async fn do_commit_detached_transaction(
614 dataset: &Dataset,
615 object_store: &ObjectStore,
616 commit_handler: &dyn CommitHandler,
617 transaction: &Transaction,
618 write_config: &ManifestWriteConfig,
619 commit_config: &CommitConfig,
620 new_blob_version: Option<u64>,
621) -> Result<(Manifest, ManifestLocation)> {
622 let transaction_file = write_transaction_file(object_store, &dataset.base, transaction).await?;
625
626 let mut backoff = Backoff::default();
628 while backoff.attempt() < commit_config.num_retries {
629 let random_version = rng().random::<u64>() | DETACHED_VERSION_MASK;
631
632 let (mut manifest, mut indices) = match transaction.operation {
633 Operation::Restore { version } => {
634 Transaction::restore_old_manifest(
635 object_store,
636 commit_handler,
637 &dataset.base,
638 version,
639 write_config,
640 &transaction_file,
641 )
642 .await?
643 }
644 _ => transaction.build_manifest(
645 Some(dataset.manifest.as_ref()),
646 dataset.load_indices().await?.as_ref().clone(),
647 &transaction_file,
648 write_config,
649 new_blob_version,
650 )?,
651 };
652
653 manifest.version = random_version;
654
655 migrate_manifest(dataset, &mut manifest, false).await?;
658 fix_schema(&mut manifest)?;
660 check_storage_version(&mut manifest)?;
661 migrate_indices(dataset, &mut indices).await?;
662
663 let result = write_manifest_file(
665 object_store,
666 commit_handler,
667 &dataset.base,
668 &mut manifest,
669 if indices.is_empty() {
670 None
671 } else {
672 Some(indices.clone())
673 },
674 write_config,
675 ManifestNamingScheme::V2,
676 )
677 .await;
678
679 match result {
680 Ok(location) => {
681 return Ok((manifest, location));
682 }
683 Err(CommitError::CommitConflict) => {
684 tokio::time::sleep(backoff.next_backoff()).await;
687 }
688 Err(CommitError::OtherError(err)) => {
689 return Err(err);
691 }
692 }
693 }
694
695 Err(crate::Error::CommitConflict {
698 version: 0,
699 source: format!(
700 "Failed find unused random u64 after {} retries.",
701 commit_config.num_retries
702 )
703 .into(),
704 location: location!(),
705 })
706}
707
708pub(crate) async fn commit_detached_transaction(
709 dataset: &Dataset,
710 object_store: &ObjectStore,
711 commit_handler: &dyn CommitHandler,
712 transaction: &Transaction,
713 write_config: &ManifestWriteConfig,
714 commit_config: &CommitConfig,
715) -> Result<(Manifest, ManifestLocation)> {
716 let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
717 let blobs_dataset = dataset.blobs_dataset().await?.unwrap();
718 let blobs_tx =
719 Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None);
720 let (blobs_manifest, _) = do_commit_detached_transaction(
721 blobs_dataset.as_ref(),
722 object_store,
723 commit_handler,
724 &blobs_tx,
725 write_config,
726 commit_config,
727 None,
728 )
729 .await?;
730 Some(blobs_manifest.version)
731 } else {
732 None
733 };
734
735 do_commit_detached_transaction(
736 dataset,
737 object_store,
738 commit_handler,
739 transaction,
740 write_config,
741 commit_config,
742 new_blob_version,
743 )
744 .await
745}
746
747async fn load_and_sort_new_transactions(
749 dataset: &Dataset,
750) -> Result<(Dataset, Vec<(u64, Arc<Transaction>)>)> {
751 let NewTransactionResult {
752 dataset: new_ds,
753 new_transactions,
754 } = load_new_transactions(dataset);
755 let new_transactions = new_transactions.try_collect::<Vec<_>>();
756 let (new_ds, mut txns) = futures::future::try_join(new_ds, new_transactions).await?;
757 txns.sort_by_key(|(version, _)| *version);
758 Ok((new_ds, txns))
759}
760
761#[allow(clippy::too_many_arguments)]
763pub(crate) async fn commit_transaction(
764 dataset: &Dataset,
765 object_store: &ObjectStore,
766 commit_handler: &dyn CommitHandler,
767 transaction: &Transaction,
768 write_config: &ManifestWriteConfig,
769 commit_config: &CommitConfig,
770 manifest_naming_scheme: ManifestNamingScheme,
771 affected_rows: Option<&RowIdTreeMap>,
772) -> Result<(Manifest, ManifestLocation)> {
773 let new_blob_version = if let Some(blob_op) = transaction.blobs_op.as_ref() {
774 let blobs_dataset = dataset.blobs_dataset().await?.unwrap();
775 let blobs_tx =
776 Transaction::new(blobs_dataset.version().version, blob_op.clone(), None, None);
777 let (blobs_manifest, _) = do_commit_detached_transaction(
778 blobs_dataset.as_ref(),
779 object_store,
780 commit_handler,
781 &blobs_tx,
782 write_config,
783 commit_config,
784 None,
785 )
786 .await?;
787 Some(blobs_manifest.version)
788 } else {
789 None
790 };
791
792 let read_version = transaction.read_version;
795 let mut target_version = read_version + 1;
796 let original_dataset = dataset.clone();
797
798 let strict_overwrite = matches!(transaction.operation, Operation::Overwrite { .. })
802 && commit_config.num_retries == 0;
803 let mut dataset =
804 if dataset.manifest.version != read_version && (read_version != 0 || strict_overwrite) {
805 dataset.checkout_version(read_version).await?
808 } else {
809 dataset.clone()
811 };
812
813 let mut transaction = transaction.clone();
814
815 let num_attempts = std::cmp::max(commit_config.num_retries, 1);
816 let mut backoff = SlotBackoff::default();
817 let start = Instant::now();
818
819 let mut other_transactions: Vec<(u64, Arc<Transaction>)>;
822
823 while backoff.attempt() < num_attempts {
824 if !strict_overwrite {
832 (dataset, other_transactions) = load_and_sort_new_transactions(&dataset).await?;
833
834 let mut rebase =
840 TransactionRebase::try_new(&original_dataset, transaction, affected_rows).await?;
841
842 for (other_version, other_transaction) in other_transactions.iter() {
843 rebase.check_txn(other_transaction, *other_version)?;
844 }
845
846 transaction = rebase.finish(&dataset).await?;
847 }
848
849 let transaction_file =
850 write_transaction_file(object_store, &dataset.base, &transaction).await?;
851
852 target_version = dataset.manifest.version + 1;
853 if is_detached_version(target_version) {
854 return Err(Error::Internal { message: "more than 2^65 versions have been created and so regular version numbers are appearing as 'detached' versions.".into(), location: location!() });
855 }
856 let (mut manifest, mut indices) = match transaction.operation {
858 Operation::Restore { version } => {
859 Transaction::restore_old_manifest(
860 object_store,
861 commit_handler,
862 &dataset.base,
863 version,
864 write_config,
865 &transaction_file,
866 )
867 .await?
868 }
869 _ => transaction.build_manifest(
870 Some(dataset.manifest.as_ref()),
871 dataset.load_indices().await?.as_ref().clone(),
872 &transaction_file,
873 write_config,
874 new_blob_version,
875 )?,
876 };
877
878 manifest.version = target_version;
879
880 let previous_writer_version = &dataset.manifest.writer_version;
881 let recompute_stats = previous_writer_version.is_none();
886
887 migrate_manifest(&dataset, &mut manifest, recompute_stats).await?;
888
889 fix_schema(&mut manifest)?;
890
891 check_storage_version(&mut manifest)?;
892
893 migrate_indices(&dataset, &mut indices).await?;
894
895 let result = write_manifest_file(
897 object_store,
898 commit_handler,
899 &dataset.base,
900 &mut manifest,
901 if indices.is_empty() {
902 None
903 } else {
904 Some(indices.clone())
905 },
906 write_config,
907 manifest_naming_scheme,
908 )
909 .await;
910
911 match result {
912 Ok(manifest_location) => {
913 let tx_key = crate::session::caches::TransactionKey {
915 version: target_version,
916 };
917 dataset
918 .metadata_cache
919 .insert_with_key(&tx_key, Arc::new(transaction.clone()))
920 .await;
921
922 let manifest_key = crate::session::caches::ManifestKey {
923 version: manifest_location.version,
924 e_tag: manifest_location.e_tag.as_deref(),
925 };
926 dataset
927 .metadata_cache
928 .insert_with_key(&manifest_key, Arc::new(manifest.clone()))
929 .await;
930 if !indices.is_empty() {
931 let key = IndexMetadataKey {
932 version: target_version,
933 };
934 dataset
935 .index_cache
936 .insert_with_key(&key, Arc::new(indices))
937 .await;
938 }
939
940 if !commit_config.skip_auto_cleanup {
941 match auto_cleanup_hook(&dataset, &manifest).await {
945 Ok(Some(stats)) => log::info!("Auto cleanup triggered: {:?}", stats),
946 Err(e) => log::error!("Error encountered during auto_cleanup_hook: {}", e),
947 _ => {}
948 };
949 }
950 return Ok((manifest, manifest_location));
951 }
952 Err(CommitError::CommitConflict) => {
953 let next_attempt_i = backoff.attempt() + 1;
954
955 if backoff.attempt() == 0 {
956 backoff = backoff.with_unit((start.elapsed().as_millis() * 11 / 10) as u32);
961 }
962
963 if next_attempt_i < num_attempts {
964 tokio::time::sleep(backoff.next_backoff()).await;
965 continue;
966 } else {
967 break;
968 }
969 }
970 Err(CommitError::OtherError(err)) => {
971 return Err(err);
973 }
974 }
975 }
976
977 Err(crate::Error::CommitConflict {
978 version: target_version,
979 source: format!(
980 "Failed to commit the transaction after {} retries.",
981 commit_config.num_retries
982 )
983 .into(),
984 location: location!(),
985 })
986}
987
988#[cfg(test)]
989mod tests {
990 use std::sync::Mutex;
991
992 use arrow_array::types::Int32Type;
993 use arrow_array::{Int32Array, Int64Array, RecordBatch, RecordBatchIterator};
994 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
995 use futures::future::join_all;
996 use lance_arrow::FixedSizeListArrayExt;
997 use lance_core::datatypes::{Field, Schema};
998 use lance_core::utils::tempfile::TempStrDir;
999 use lance_index::IndexType;
1000 use lance_linalg::distance::MetricType;
1001 use lance_table::format::{DataFile, DataStorageFormat};
1002 use lance_table::io::commit::{
1003 CommitLease, CommitLock, RenameCommitHandler, UnsafeCommitHandler,
1004 };
1005 use lance_testing::datagen::generate_random_array;
1006
1007 use super::*;
1008
1009 use crate::dataset::{WriteMode, WriteParams};
1010 use crate::index::vector::VectorIndexParams;
1011 use crate::utils::test::{DatagenExt, FragmentCount, FragmentRowCount};
1012 use crate::Dataset;
1013
1014 async fn test_commit_handler(handler: Arc<dyn CommitHandler>, should_succeed: bool) {
1015 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1017 "x",
1018 DataType::Int64,
1019 false,
1020 )]));
1021 let data = RecordBatch::try_new(
1022 schema.clone(),
1023 vec![Arc::new(Int64Array::from(vec![1, 2, 3]))],
1024 )
1025 .unwrap();
1026 let reader = RecordBatchIterator::new(vec![Ok(data)], schema);
1027
1028 let options = WriteParams {
1029 commit_handler: Some(handler),
1030 ..Default::default()
1031 };
1032 let dataset = Dataset::write(reader, "memory://test", Some(options))
1033 .await
1034 .unwrap();
1035
1036 let tasks = (0..10).map(|_| {
1039 let mut dataset = dataset.clone();
1040 tokio::task::spawn(async move {
1041 dataset
1042 .delete("x = 2")
1043 .await
1044 .map(|_| dataset.manifest.version)
1045 })
1046 });
1047
1048 let task_results: Vec<Option<u64>> = join_all(tasks)
1049 .await
1050 .iter()
1051 .map(|res| match res {
1052 Ok(Ok(version)) => Some(*version),
1053 _ => None,
1054 })
1055 .collect();
1056
1057 let num_successes = task_results.iter().filter(|x| x.is_some()).count();
1058 let distinct_results: HashSet<_> = task_results.iter().filter_map(|x| x.as_ref()).collect();
1059
1060 if should_succeed {
1061 assert_eq!(
1062 num_successes,
1063 distinct_results.len(),
1064 "Expected no two tasks to succeed for the same version. Got {:?}",
1065 task_results
1066 );
1067 } else {
1068 assert!(num_successes >= distinct_results.len(),);
1071 }
1072 }
1073
1074 #[tokio::test]
1075 async fn test_rename_commit_handler() {
1076 let handler = Arc::new(RenameCommitHandler);
1078 test_commit_handler(handler, true).await;
1079 }
1080
1081 #[tokio::test]
1082 async fn test_custom_commit() {
1083 #[derive(Debug)]
1084 struct CustomCommitHandler {
1085 locked_version: Arc<Mutex<Option<u64>>>,
1086 }
1087
1088 struct CustomCommitLease {
1089 version: u64,
1090 locked_version: Arc<Mutex<Option<u64>>>,
1091 }
1092
1093 #[async_trait::async_trait]
1094 impl CommitLock for CustomCommitHandler {
1095 type Lease = CustomCommitLease;
1096
1097 async fn lock(&self, version: u64) -> std::result::Result<Self::Lease, CommitError> {
1098 let mut locked_version = self.locked_version.lock().unwrap();
1099 if locked_version.is_some() {
1100 return Err(CommitError::CommitConflict);
1102 }
1103
1104 *locked_version = Some(version);
1106
1107 Ok(CustomCommitLease {
1108 version,
1109 locked_version: self.locked_version.clone(),
1110 })
1111 }
1112 }
1113
1114 #[async_trait::async_trait]
1115 impl CommitLease for CustomCommitLease {
1116 async fn release(&self, _success: bool) -> std::result::Result<(), CommitError> {
1117 let mut locked_version = self.locked_version.lock().unwrap();
1118 if *locked_version != Some(self.version) {
1119 return Err(CommitError::CommitConflict);
1121 }
1122
1123 *locked_version = None;
1125
1126 Ok(())
1127 }
1128 }
1129
1130 let locked_version = Arc::new(Mutex::new(None));
1131 let handler = Arc::new(CustomCommitHandler { locked_version });
1132 test_commit_handler(handler, true).await;
1133 }
1134
1135 #[tokio::test]
1136 async fn test_unsafe_commit_handler() {
1137 let handler = Arc::new(UnsafeCommitHandler);
1138 test_commit_handler(handler, false).await;
1139 }
1140
1141 #[tokio::test]
1142 async fn test_roundtrip_transaction_file() {
1143 let object_store = ObjectStore::memory();
1144 let base_path = Path::from("test");
1145 let transaction = Transaction::new(
1146 42,
1147 Operation::Append { fragments: vec![] },
1148 None,
1149 Some("hello world".to_string()),
1150 );
1151
1152 let file_name = write_transaction_file(&object_store, &base_path, &transaction)
1153 .await
1154 .unwrap();
1155 let read_transaction = read_transaction_file(&object_store, &base_path, &file_name)
1156 .await
1157 .unwrap();
1158
1159 assert_eq!(transaction.read_version, read_transaction.read_version);
1160 assert_eq!(transaction.uuid, read_transaction.uuid);
1161 assert!(matches!(
1162 read_transaction.operation,
1163 Operation::Append { .. }
1164 ));
1165 assert_eq!(transaction.tag, read_transaction.tag);
1166 }
1167
1168 #[tokio::test]
1169 async fn test_concurrent_create_index() {
1170 let test_dir = TempStrDir::default();
1172 let test_uri = test_dir.as_str();
1173
1174 let dimension = 16;
1175 let schema = Arc::new(ArrowSchema::new(vec![
1176 ArrowField::new(
1177 "vector1",
1178 DataType::FixedSizeList(
1179 Arc::new(ArrowField::new("item", DataType::Float32, true)),
1180 dimension,
1181 ),
1182 false,
1183 ),
1184 ArrowField::new(
1185 "vector2",
1186 DataType::FixedSizeList(
1187 Arc::new(ArrowField::new("item", DataType::Float32, true)),
1188 dimension,
1189 ),
1190 false,
1191 ),
1192 ]));
1193 let float_arr = generate_random_array(512 * dimension as usize);
1194 let vectors = Arc::new(
1195 <arrow_array::FixedSizeListArray as FixedSizeListArrayExt>::try_new_from_values(
1196 float_arr, dimension,
1197 )
1198 .unwrap(),
1199 );
1200 let batches =
1201 vec![
1202 RecordBatch::try_new(schema.clone(), vec![vectors.clone(), vectors.clone()])
1203 .unwrap(),
1204 ];
1205
1206 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
1207 let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
1208 dataset.validate().await.unwrap();
1209
1210 let params = VectorIndexParams::ivf_pq(10, 8, 2, MetricType::L2, 50);
1213 let futures: Vec<_> = ["vector1", "vector1", "vector2"]
1214 .iter()
1215 .map(|col_name| {
1216 let mut dataset = dataset.clone();
1217 let params = params.clone();
1218 tokio::spawn(async move {
1219 dataset
1220 .create_index(&[col_name], IndexType::Vector, None, ¶ms, true)
1221 .await
1222 })
1223 })
1224 .collect();
1225
1226 let results = join_all(futures).await;
1227 for result in results {
1228 assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1229 }
1230
1231 let dataset = dataset.checkout_version(1).await.unwrap();
1233 assert!(dataset.load_indices().await.unwrap().is_empty());
1234
1235 let dataset = dataset.checkout_version(2).await.unwrap();
1236 assert_eq!(dataset.load_indices().await.unwrap().len(), 1);
1237
1238 let dataset = dataset.checkout_version(3).await.unwrap();
1239 let indices = dataset.load_indices().await.unwrap();
1240 assert!(!indices.is_empty() && indices.len() <= 2);
1241
1242 if indices.len() == 2 {
1245 let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
1246 fields.sort();
1247 assert_eq!(fields, vec![0, 1]);
1248 } else {
1249 assert_eq!(indices[0].fields, vec![0]);
1250 }
1251
1252 let dataset = dataset.checkout_version(4).await.unwrap();
1253 let indices = dataset.load_indices().await.unwrap();
1254 assert_eq!(indices.len(), 2);
1255 let mut fields: Vec<i32> = indices.iter().flat_map(|i| i.fields.clone()).collect();
1256 fields.sort();
1257 assert_eq!(fields, vec![0, 1]);
1258 }
1259
1260 #[tokio::test]
1261 async fn test_load_and_sort_new_transactions() {
1262 let mut dataset = lance_datagen::gen_batch()
1264 .col("i", lance_datagen::array::step::<Int32Type>())
1265 .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(10))
1266 .await
1267 .unwrap();
1268
1269 for i in 0..100 {
1271 dataset
1272 .update_config(vec![(format!("key_{}", i), format!("value_{}", i))])
1273 .await
1274 .unwrap();
1275 }
1276
1277 let dataset_v1 = dataset.checkout_version(1).await.unwrap();
1280 let (_, transactions) = load_and_sort_new_transactions(&dataset_v1).await.unwrap();
1281
1282 let versions: Vec<u64> = transactions.iter().map(|(v, _)| *v).collect();
1284 for i in 1..versions.len() {
1285 assert!(
1286 versions[i] > versions[i - 1],
1287 "Transactions not in order: version {} came after version {}",
1288 versions[i],
1289 versions[i - 1]
1290 );
1291 }
1292
1293 assert_eq!(transactions.len(), 100);
1295 assert_eq!(versions.first(), Some(&2));
1296 assert_eq!(versions.last(), Some(&101));
1297 }
1298
1299 #[tokio::test]
1300 async fn test_concurrent_writes() {
1301 for write_mode in [WriteMode::Append, WriteMode::Overwrite] {
1302 let test_dir = TempStrDir::default();
1304 let test_uri = test_dir.as_str();
1305
1306 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1307 "i",
1308 DataType::Int32,
1309 false,
1310 )]));
1311
1312 let dataset = Dataset::write(
1313 RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
1314 test_uri,
1315 None,
1316 )
1317 .await
1318 .unwrap();
1319
1320 let batch = RecordBatch::try_new(
1322 schema.clone(),
1323 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
1324 )
1325 .unwrap();
1326
1327 let futures: Vec<_> = (0..5)
1329 .map(|_| {
1330 let batch = batch.clone();
1331 let schema = schema.clone();
1332 let uri = test_uri.to_string();
1333 tokio::spawn(async move {
1334 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
1335 Dataset::write(
1336 reader,
1337 &uri,
1338 Some(WriteParams {
1339 mode: write_mode,
1340 ..Default::default()
1341 }),
1342 )
1343 .await
1344 })
1345 })
1346 .collect();
1347 let results = join_all(futures).await;
1348
1349 for result in results {
1351 assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1352 }
1353
1354 let dataset = dataset.checkout_version(6).await.unwrap();
1356
1357 match write_mode {
1358 WriteMode::Append => {
1359 assert_eq!(dataset.get_fragments().len(), 5);
1360 }
1361 WriteMode::Overwrite => {
1362 assert_eq!(dataset.get_fragments().len(), 1);
1363 }
1364 _ => unreachable!(),
1365 }
1366
1367 dataset.validate().await.unwrap()
1368 }
1369 }
1370
1371 async fn get_empty_dataset() -> (TempStrDir, Dataset) {
1372 let test_dir = TempStrDir::default();
1373 let test_uri = test_dir.as_str();
1374
1375 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
1376 "i",
1377 DataType::Int32,
1378 false,
1379 )]));
1380
1381 let ds = Dataset::write(
1382 RecordBatchIterator::new(vec![].into_iter().map(Ok), schema.clone()),
1383 test_uri,
1384 None,
1385 )
1386 .await
1387 .unwrap();
1388 (test_dir, ds)
1389 }
1390
1391 #[tokio::test]
1392 async fn test_good_concurrent_config_writes() {
1393 let (_tmpdir, dataset) = get_empty_dataset().await;
1394 let original_num_config_keys = dataset.manifest.config.len();
1395
1396 let futures: Vec<_> = ["key1", "key2", "key3", "key4", "key5"]
1398 .iter()
1399 .map(|key| {
1400 let mut dataset = dataset.clone();
1401 tokio::spawn(async move {
1402 dataset
1403 .update_config(HashMap::from([(
1404 key.to_string(),
1405 Some("value".to_string()),
1406 )]))
1407 .await
1408 })
1409 })
1410 .collect();
1411 let results = join_all(futures).await;
1412
1413 for result in results {
1415 assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1416 }
1417
1418 let dataset = dataset.checkout_version(6).await.unwrap();
1419 assert_eq!(dataset.manifest.config.len(), 5 + original_num_config_keys);
1420
1421 dataset.validate().await.unwrap();
1422
1423 let futures: Vec<_> = ["key1", "key1", "key1", "key2", "key2"]
1426 .iter()
1427 .map(|key| {
1428 let mut dataset = dataset.clone();
1429 tokio::spawn(async move {
1430 dataset
1431 .update_config(HashMap::from([(key.to_string(), None)]))
1432 .await
1433 })
1434 })
1435 .collect();
1436 let results = join_all(futures).await;
1437
1438 for result in results {
1440 assert!(matches!(result, Ok(Ok(_))), "{:?}", result);
1441 }
1442
1443 let dataset = dataset.checkout_version(11).await.unwrap();
1444
1445 assert_eq!(dataset.manifest.config.len(), 3 + original_num_config_keys);
1447
1448 dataset.validate().await.unwrap()
1449 }
1450
1451 #[tokio::test]
1452 async fn test_bad_concurrent_config_writes() {
1453 let (_tmpdir, dataset) = get_empty_dataset().await;
1456
1457 let futures: Vec<_> = ["key1", "key1", "key2", "key3", "key4"]
1458 .iter()
1459 .map(|key| {
1460 let mut dataset = dataset.clone();
1461 tokio::spawn(async move {
1462 dataset
1463 .update_config(HashMap::from([(
1464 key.to_string(),
1465 Some("value".to_string()),
1466 )]))
1467 .await
1468 })
1469 })
1470 .collect();
1471
1472 let results = join_all(futures).await;
1473
1474 let mut first_operation_failed = false;
1476 for (i, result) in results.into_iter().enumerate() {
1477 let result = result.unwrap();
1478 match i {
1479 0 => {
1480 if result.is_err() {
1481 first_operation_failed = true;
1482 assert!(
1483 matches!(&result, &Err(Error::CommitConflict { .. })),
1484 "{:?}",
1485 result,
1486 );
1487 }
1488 }
1489 1 => match first_operation_failed {
1490 true => assert!(result.is_ok(), "{:?}", result),
1491 false => {
1492 assert!(
1493 matches!(&result, &Err(Error::CommitConflict { .. })),
1494 "{:?}",
1495 result,
1496 );
1497 }
1498 },
1499 _ => assert!(result.is_ok(), "{:?}", result),
1500 }
1501 }
1502 }
1503
1504 #[test]
1505 fn test_fix_schema() {
1506 let mut field0 =
1509 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1510 field0.set_id(-1, &mut 0);
1511 let mut field2 =
1512 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1513 field2.set_id(-1, &mut 2);
1514
1515 let schema = Schema {
1516 fields: vec![field0.clone(), field2.clone()],
1517 metadata: Default::default(),
1518 };
1519 let fragments = vec![
1520 Fragment {
1521 id: 0,
1522 files: vec![
1523 DataFile::new_legacy_from_fields("path1", vec![0, 1, 2]),
1524 DataFile::new_legacy_from_fields("unused", vec![9]),
1525 ],
1526 deletion_file: None,
1527 row_id_meta: None,
1528 physical_rows: None,
1529 },
1530 Fragment {
1531 id: 1,
1532 files: vec![
1533 DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]),
1534 DataFile::new_legacy_from_fields("path3", vec![2]),
1535 ],
1536 deletion_file: None,
1537 row_id_meta: None,
1538 physical_rows: None,
1539 },
1540 ];
1541
1542 let mut manifest = Manifest::new(
1543 schema,
1544 Arc::new(fragments),
1545 DataStorageFormat::default(),
1546 None,
1547 HashMap::new(),
1548 );
1549
1550 fix_schema(&mut manifest).unwrap();
1551
1552 field2.id = 10;
1554 let expected_schema = Schema {
1555 fields: vec![field0, field2],
1556 metadata: Default::default(),
1557 };
1558 assert_eq!(manifest.schema, expected_schema);
1559
1560 let expected_fragments = vec![
1565 Fragment {
1566 id: 0,
1567 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 10])],
1568 deletion_file: None,
1569 row_id_meta: None,
1570 physical_rows: None,
1571 },
1572 Fragment {
1573 id: 1,
1574 files: vec![
1575 DataFile::new_legacy_from_fields("path2", vec![0, 1, 2]),
1576 DataFile::new_legacy_from_fields("path3", vec![10]),
1577 ],
1578 deletion_file: None,
1579 row_id_meta: None,
1580 physical_rows: None,
1581 },
1582 ];
1583 assert_eq!(manifest.fragments.as_ref(), &expected_fragments);
1584 }
1585}