1use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use delta_kernel::table_properties::TableProperties;
83use futures::future::BoxFuture;
84use object_store::Error as ObjectStoreError;
85use object_store::path::Path;
86use serde_json::Value;
87use tracing::*;
88use uuid::Uuid;
89
90use delta_kernel::table_features::TableFeature;
91use serde::{Deserialize, Serialize};
92
93use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
94use crate::errors::DeltaTableError;
95use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction};
96use crate::logstore::ObjectStoreRef;
97use crate::logstore::{CommitOrBytes, LogStoreRef};
98use crate::operations::CustomExecuteHandler;
99use crate::protocol::DeltaOperation;
100use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
101use crate::table::config::TablePropertiesExt as _;
102use crate::table::state::DeltaTableState;
103use crate::{DeltaResult, crate_version};
104
105pub use self::conflict_checker::CommitConflictError;
106pub use self::protocol::INSTANCE as PROTOCOL;
107
108#[cfg(test)]
109pub(crate) mod application;
110mod conflict_checker;
111mod protocol;
112#[cfg(feature = "datafusion")]
113mod state;
114
115const DELTA_LOG_FOLDER: &str = "_delta_log";
116pub(crate) const DEFAULT_RETRIES: usize = 15;
117
118#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
119#[serde(rename_all = "camelCase")]
120pub struct CommitMetrics {
121 pub num_retries: u64,
123}
124
125#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub struct PostCommitMetrics {
128 pub new_checkpoint_created: bool,
130
131 pub num_log_files_cleaned_up: u64,
133}
134
135#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct Metrics {
138 pub num_retries: u64,
140
141 pub new_checkpoint_created: bool,
143
144 pub num_log_files_cleaned_up: u64,
146}
147
148#[derive(thiserror::Error, Debug)]
150pub enum TransactionError {
151 #[error("Tried committing existing table version: {0}")]
153 VersionAlreadyExists(i64),
154
155 #[error("Error serializing commit log to json: {json_err}")]
157 SerializeLogJson {
158 json_err: serde_json::error::Error,
160 },
161
162 #[error("Log storage error: {}", .source)]
164 ObjectStore {
165 #[from]
167 source: ObjectStoreError,
168 },
169
170 #[error("Failed to commit transaction: {0}")]
172 CommitConflict(#[from] CommitConflictError),
173
174 #[error("Failed to commit transaction: {0}")]
176 MaxCommitAttempts(i32),
177
178 #[error(
180 "The transaction includes Remove action with data change but Delta table is append-only"
181 )]
182 DeltaTableAppendOnly,
183
184 #[error("Unsupported table features required: {0:?}")]
186 UnsupportedTableFeatures(Vec<TableFeature>),
187
188 #[error("Table features must be specified, please specify: {0:?}")]
190 TableFeaturesRequired(TableFeature),
191
192 #[error("Transaction failed: {msg}")]
195 LogStoreError {
196 msg: String,
198 source: Box<dyn std::error::Error + Send + Sync + 'static>,
200 },
201}
202
203impl From<TransactionError> for DeltaTableError {
204 fn from(err: TransactionError) -> Self {
205 match err {
206 TransactionError::VersionAlreadyExists(version) => {
207 DeltaTableError::VersionAlreadyExists(version)
208 }
209 TransactionError::SerializeLogJson { json_err } => {
210 DeltaTableError::SerializeLogJson { json_err }
211 }
212 TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
213 other => DeltaTableError::Transaction { source: other },
214 }
215 }
216}
217
218#[derive(thiserror::Error, Debug)]
220pub enum CommitBuilderError {}
221
222impl From<CommitBuilderError> for DeltaTableError {
223 fn from(err: CommitBuilderError) -> Self {
224 DeltaTableError::CommitValidation { source: err }
225 }
226}
227
228pub trait TableReference: Send + Sync {
230 fn config(&self) -> &TableProperties;
232
233 fn protocol(&self) -> &Protocol;
235
236 fn metadata(&self) -> &Metadata;
238
239 fn eager_snapshot(&self) -> &EagerSnapshot;
241}
242
243impl TableReference for EagerSnapshot {
244 fn protocol(&self) -> &Protocol {
245 EagerSnapshot::protocol(self)
246 }
247
248 fn metadata(&self) -> &Metadata {
249 EagerSnapshot::metadata(self)
250 }
251
252 fn config(&self) -> &TableProperties {
253 self.table_properties()
254 }
255
256 fn eager_snapshot(&self) -> &EagerSnapshot {
257 self
258 }
259}
260
261impl TableReference for DeltaTableState {
262 fn config(&self) -> &TableProperties {
263 self.snapshot.config()
264 }
265
266 fn protocol(&self) -> &Protocol {
267 self.snapshot.protocol()
268 }
269
270 fn metadata(&self) -> &Metadata {
271 self.snapshot.metadata()
272 }
273
274 fn eager_snapshot(&self) -> &EagerSnapshot {
275 &self.snapshot
276 }
277}
278
279#[derive(Debug)]
281pub struct CommitData {
282 pub actions: Vec<Action>,
284 pub operation: DeltaOperation,
286 pub app_metadata: HashMap<String, Value>,
288 pub app_transactions: Vec<Transaction>,
290}
291
292impl CommitData {
293 pub fn new(
295 mut actions: Vec<Action>,
296 operation: DeltaOperation,
297 mut app_metadata: HashMap<String, Value>,
298 app_transactions: Vec<Transaction>,
299 ) -> Self {
300 if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
301 let mut commit_info = operation.get_commit_info();
302 commit_info.timestamp = Some(Utc::now().timestamp_millis());
303 app_metadata.insert(
304 "clientVersion".to_string(),
305 Value::String(format!("delta-rs.{}", crate_version())),
306 );
307 app_metadata.extend(commit_info.info);
308 commit_info.info = app_metadata.clone();
309 actions.push(Action::CommitInfo(commit_info))
310 }
311
312 for txn in &app_transactions {
313 actions.push(Action::Txn(txn.clone()))
314 }
315
316 CommitData {
317 actions,
318 operation,
319 app_metadata,
320 app_transactions,
321 }
322 }
323
324 pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
326 let mut jsons = Vec::<String>::new();
327 for action in &self.actions {
328 let json = serde_json::to_string(action)
329 .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
330 jsons.push(json);
331 }
332 Ok(bytes::Bytes::from(jsons.join("\n")))
333 }
334}
335
336#[derive(Clone, Debug, Copy)]
337pub struct PostCommitHookProperties {
339 create_checkpoint: bool,
340 cleanup_expired_logs: Option<bool>,
342}
343
344#[derive(Clone, Debug)]
345pub struct CommitProperties {
348 pub(crate) app_metadata: HashMap<String, Value>,
349 pub(crate) app_transaction: Vec<Transaction>,
350 max_retries: usize,
351 create_checkpoint: bool,
352 cleanup_expired_logs: Option<bool>,
353}
354
355impl Default for CommitProperties {
356 fn default() -> Self {
357 Self {
358 app_metadata: Default::default(),
359 app_transaction: Vec::new(),
360 max_retries: DEFAULT_RETRIES,
361 create_checkpoint: true,
362 cleanup_expired_logs: None,
363 }
364 }
365}
366
367impl CommitProperties {
368 pub fn with_metadata(
370 mut self,
371 metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
372 ) -> Self {
373 self.app_metadata = HashMap::from_iter(metadata);
374 self
375 }
376
377 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
379 self.max_retries = max_retries;
380 self
381 }
382
383 pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
385 self.create_checkpoint = create_checkpoint;
386 self
387 }
388
389 pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
391 self.app_transaction.push(txn);
392 self
393 }
394
395 pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
397 self.app_transaction = txn;
398 self
399 }
400
401 pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
403 self.cleanup_expired_logs = cleanup_expired_logs;
404 self
405 }
406}
407
408impl From<CommitProperties> for CommitBuilder {
409 fn from(value: CommitProperties) -> Self {
410 CommitBuilder {
411 max_retries: value.max_retries,
412 app_metadata: value.app_metadata,
413 post_commit_hook: Some(PostCommitHookProperties {
414 create_checkpoint: value.create_checkpoint,
415 cleanup_expired_logs: value.cleanup_expired_logs,
416 }),
417 app_transaction: value.app_transaction,
418 ..Default::default()
419 }
420 }
421}
422
423pub struct CommitBuilder {
425 actions: Vec<Action>,
426 app_metadata: HashMap<String, Value>,
427 app_transaction: Vec<Transaction>,
428 max_retries: usize,
429 post_commit_hook: Option<PostCommitHookProperties>,
430 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
431 operation_id: Uuid,
432}
433
434impl Default for CommitBuilder {
435 fn default() -> Self {
436 CommitBuilder {
437 actions: Vec::new(),
438 app_metadata: HashMap::new(),
439 app_transaction: Vec::new(),
440 max_retries: DEFAULT_RETRIES,
441 post_commit_hook: None,
442 post_commit_hook_handler: None,
443 operation_id: Uuid::new_v4(),
444 }
445 }
446}
447
448impl<'a> CommitBuilder {
449 pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
451 self.actions = actions;
452 self
453 }
454
455 pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
457 self.app_metadata = app_metadata;
458 self
459 }
460
461 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
463 self.max_retries = max_retries;
464 self
465 }
466
467 pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
469 self.post_commit_hook = Some(post_commit_hook);
470 self
471 }
472
473 pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
475 self.operation_id = operation_id;
476 self
477 }
478
479 pub fn with_post_commit_hook_handler(
481 mut self,
482 handler: Option<Arc<dyn CustomExecuteHandler>>,
483 ) -> Self {
484 self.post_commit_hook_handler = handler;
485 self
486 }
487
488 pub fn build(
490 self,
491 table_data: Option<&'a dyn TableReference>,
492 log_store: LogStoreRef,
493 operation: DeltaOperation,
494 ) -> PreCommit<'a> {
495 let data = CommitData::new(
496 self.actions,
497 operation,
498 self.app_metadata,
499 self.app_transaction,
500 );
501 PreCommit {
502 log_store,
503 table_data,
504 max_retries: self.max_retries,
505 data,
506 post_commit_hook: self.post_commit_hook,
507 post_commit_hook_handler: self.post_commit_hook_handler,
508 operation_id: self.operation_id,
509 }
510 }
511}
512
513pub struct PreCommit<'a> {
515 log_store: LogStoreRef,
516 table_data: Option<&'a dyn TableReference>,
517 data: CommitData,
518 max_retries: usize,
519 post_commit_hook: Option<PostCommitHookProperties>,
520 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
521 operation_id: Uuid,
522}
523
524impl<'a> std::future::IntoFuture for PreCommit<'a> {
525 type Output = DeltaResult<FinalizedCommit>;
526 type IntoFuture = BoxFuture<'a, Self::Output>;
527
528 fn into_future(self) -> Self::IntoFuture {
529 Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
530 }
531}
532
533impl<'a> PreCommit<'a> {
534 pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
536 let this = self;
537
538 async fn write_tmp_commit(
541 log_entry: Bytes,
542 store: ObjectStoreRef,
543 ) -> DeltaResult<CommitOrBytes> {
544 let token = uuid::Uuid::new_v4().to_string();
545 let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
546 store.put(&path, log_entry.into()).await?;
547 Ok(CommitOrBytes::TmpCommit(path))
548 }
549
550 Box::pin(async move {
551 if let Some(table_reference) = this.table_data {
552 PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
553 }
554 let log_entry = this.data.get_bytes()?;
555
556 let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
559 .contains(&this.log_store.name().as_str())
560 {
561 CommitOrBytes::LogBytes(log_entry)
562 } else {
563 write_tmp_commit(
564 log_entry,
565 this.log_store.object_store(Some(this.operation_id)),
566 )
567 .await?
568 };
569
570 Ok(PreparedCommit {
571 commit_or_bytes,
572 log_store: this.log_store,
573 table_data: this.table_data,
574 max_retries: this.max_retries,
575 data: this.data,
576 post_commit: this.post_commit_hook,
577 post_commit_hook_handler: this.post_commit_hook_handler,
578 operation_id: this.operation_id,
579 })
580 })
581 }
582}
583
584pub struct PreparedCommit<'a> {
586 commit_or_bytes: CommitOrBytes,
587 log_store: LogStoreRef,
588 data: CommitData,
589 table_data: Option<&'a dyn TableReference>,
590 max_retries: usize,
591 post_commit: Option<PostCommitHookProperties>,
592 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
593 operation_id: Uuid,
594}
595
596impl PreparedCommit<'_> {
597 pub fn commit_or_bytes(&self) -> &CommitOrBytes {
599 &self.commit_or_bytes
600 }
601}
602
603impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
604 type Output = DeltaResult<PostCommit>;
605 type IntoFuture = BoxFuture<'a, Self::Output>;
606
607 fn into_future(self) -> Self::IntoFuture {
608 let this = self;
609
610 Box::pin(async move {
611 let commit_or_bytes = this.commit_or_bytes;
612
613 let mut attempt_number: usize = 1;
614
615 let read_snapshot: EagerSnapshot = if this.table_data.is_none() {
617 debug!("committing initial table version 0");
618 match this
619 .log_store
620 .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
621 .await
622 {
623 Ok(_) => {
624 return Ok(PostCommit {
625 version: 0,
626 data: this.data,
627 create_checkpoint: false,
628 cleanup_expired_logs: None,
629 log_store: this.log_store,
630 table_data: None,
631 custom_execute_handler: this.post_commit_hook_handler,
632 metrics: CommitMetrics { num_retries: 0 },
633 });
634 }
635 Err(TransactionError::VersionAlreadyExists(0)) => {
636 debug!("version 0 already exists, loading table state for retry");
639 attempt_number = 2;
640 let latest_version = this.log_store.get_latest_version(0).await?;
641 EagerSnapshot::try_new(
642 this.log_store.as_ref(),
643 Default::default(),
644 Some(latest_version),
645 )
646 .await?
647 }
648 Err(e) => return Err(e.into()),
649 }
650 } else {
651 this.table_data.unwrap().eager_snapshot().clone()
652 };
653
654 let mut read_snapshot = read_snapshot;
655
656 let commit_span = info_span!(
657 "commit_with_retries",
658 base_version = read_snapshot.version(),
659 max_retries = this.max_retries,
660 attempt = field::Empty,
661 target_version = field::Empty,
662 conflicts_checked = 0
663 );
664
665 async move {
666 let total_retries = this.max_retries + 1;
667 while attempt_number <= total_retries {
668 Span::current().record("attempt", attempt_number);
669 let latest_version = this
670 .log_store
671 .get_latest_version(read_snapshot.version())
672 .await?;
673
674 if latest_version > read_snapshot.version() {
675 if this.max_retries == 0 {
678 warn!(
679 base_version = read_snapshot.version(),
680 latest_version = latest_version,
681 "table updated but max_retries is 0, failing immediately"
682 );
683 return Err(TransactionError::MaxCommitAttempts(
684 this.max_retries as i32,
685 )
686 .into());
687 }
688 warn!(
689 base_version = read_snapshot.version(),
690 latest_version = latest_version,
691 versions_behind = latest_version - read_snapshot.version(),
692 "table updated during transaction, checking for conflicts"
693 );
694 let mut steps = latest_version - read_snapshot.version();
695 let mut conflicts_checked = 0;
696
697 while steps != 0 {
700 conflicts_checked += 1;
701 let summary = WinningCommitSummary::try_new(
702 this.log_store.as_ref(),
703 latest_version - steps,
704 (latest_version - steps) + 1,
705 )
706 .await?;
707 let transaction_info = TransactionInfo::try_new(
708 read_snapshot.log_data(),
709 this.data.operation.read_predicate(),
710 &this.data.actions,
711 this.data.operation.read_whole_table(),
712 )?;
713 let conflict_checker = ConflictChecker::new(
714 transaction_info,
715 summary,
716 Some(&this.data.operation),
717 );
718
719 match conflict_checker.check_conflicts() {
720 Ok(_) => {}
721 Err(err) => {
722 error!(
723 conflicts_checked = conflicts_checked,
724 error = %err,
725 "conflict detected, aborting transaction"
726 );
727 return Err(TransactionError::CommitConflict(err).into());
728 }
729 }
730 steps -= 1;
731 }
732 Span::current().record("conflicts_checked", conflicts_checked);
733 debug!(
734 conflicts_checked = conflicts_checked,
735 "all conflicts resolved, updating snapshot"
736 );
737 read_snapshot
739 .update(&this.log_store, Some(latest_version as u64))
740 .await?;
741 }
742 let version: i64 = latest_version + 1;
743 Span::current().record("target_version", version);
744
745 match this
746 .log_store
747 .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
748 .await
749 {
750 Ok(()) => {
751 info!(
752 version = version,
753 num_retries = attempt_number as u64 - 1,
754 "transaction committed successfully"
755 );
756 return Ok(PostCommit {
757 version,
758 data: this.data,
759 create_checkpoint: this
760 .post_commit
761 .map(|v| v.create_checkpoint)
762 .unwrap_or_default(),
763 cleanup_expired_logs: this
764 .post_commit
765 .map(|v| v.cleanup_expired_logs)
766 .unwrap_or_default(),
767 log_store: this.log_store,
768 table_data: Some(Box::new(read_snapshot)),
769 custom_execute_handler: this.post_commit_hook_handler,
770 metrics: CommitMetrics {
771 num_retries: attempt_number as u64 - 1,
772 },
773 });
774 }
775 Err(TransactionError::VersionAlreadyExists(version)) => {
776 warn!(
777 version = version,
778 attempt = attempt_number,
779 "version already exists, will retry"
780 );
781 attempt_number += 1;
784 }
785 Err(err) => {
786 error!(
787 version = version,
788 error = %err,
789 "commit failed, aborting"
790 );
791 this.log_store
792 .abort_commit_entry(version, commit_or_bytes, this.operation_id)
793 .await?;
794 return Err(err.into());
795 }
796 }
797 }
798
799 error!(
800 max_retries = this.max_retries,
801 "exceeded maximum commit attempts"
802 );
803 Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
804 }
805 .instrument(commit_span)
806 .await
807 })
808 }
809}
810
811pub struct PostCommit {
813 pub version: i64,
815 pub data: CommitData,
817 create_checkpoint: bool,
818 cleanup_expired_logs: Option<bool>,
819 log_store: LogStoreRef,
820 table_data: Option<Box<dyn TableReference>>,
821 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
822 metrics: CommitMetrics,
823}
824
825impl PostCommit {
826 async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
828 if let Some(table) = &self.table_data {
829 let post_commit_operation_id = Uuid::new_v4();
830 let mut snapshot = table.eager_snapshot().clone();
831 if self.version != snapshot.version() {
832 snapshot
833 .update(&self.log_store, Some(self.version as u64))
834 .await?;
835 }
836
837 let mut state = DeltaTableState { snapshot };
838
839 let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
840 cleanup_logs
841 } else {
842 state.table_config().enable_expired_log_cleanup()
843 };
844
845 if let Some(custom_execute_handler) = &self.custom_execute_handler {
847 custom_execute_handler
848 .before_post_commit_hook(
849 &self.log_store,
850 cleanup_logs || self.create_checkpoint,
851 post_commit_operation_id,
852 )
853 .await?
854 }
855
856 let mut new_checkpoint_created = false;
857 if self.create_checkpoint {
858 new_checkpoint_created = self
860 .create_checkpoint(
861 &state,
862 &self.log_store,
863 self.version,
864 post_commit_operation_id,
865 )
866 .await?;
867 }
868
869 let mut num_log_files_cleaned_up: u64 = 0;
870 if cleanup_logs {
871 num_log_files_cleaned_up = cleanup_expired_logs_for(
873 self.version,
874 self.log_store.as_ref(),
875 Utc::now().timestamp_millis()
876 - state.table_config().log_retention_duration().as_millis() as i64,
877 Some(post_commit_operation_id),
878 )
879 .await? as u64;
880 if num_log_files_cleaned_up > 0 {
881 state = DeltaTableState::try_new(
882 &self.log_store,
883 state.load_config().clone(),
884 Some(self.version),
885 )
886 .await?;
887 }
888 }
889
890 if let Some(custom_execute_handler) = &self.custom_execute_handler {
892 custom_execute_handler
893 .after_post_commit_hook(
894 &self.log_store,
895 cleanup_logs || self.create_checkpoint,
896 post_commit_operation_id,
897 )
898 .await?
899 }
900 Ok((
901 state,
902 PostCommitMetrics {
903 new_checkpoint_created,
904 num_log_files_cleaned_up,
905 },
906 ))
907 } else {
908 let state =
909 DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
910 .await?;
911 Ok((
912 state,
913 PostCommitMetrics {
914 new_checkpoint_created: false,
915 num_log_files_cleaned_up: 0,
916 },
917 ))
918 }
919 }
920 async fn create_checkpoint(
921 &self,
922 table_state: &DeltaTableState,
923 log_store: &LogStoreRef,
924 version: i64,
925 operation_id: Uuid,
926 ) -> DeltaResult<bool> {
927 if !table_state.load_config().require_files {
928 warn!(
929 "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
930 );
931 return Ok(false);
932 }
933
934 let checkpoint_interval = table_state.config().checkpoint_interval().get() as i64;
935 if ((version + 1) % checkpoint_interval) == 0 {
936 create_checkpoint_for(version as u64, log_store.as_ref(), Some(operation_id)).await?;
937 Ok(true)
938 } else {
939 Ok(false)
940 }
941 }
942}
943
944pub struct FinalizedCommit {
946 pub snapshot: DeltaTableState,
948
949 pub version: i64,
951
952 pub metrics: Metrics,
954}
955
956impl std::fmt::Debug for FinalizedCommit {
957 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
958 f.debug_struct("FinalizedCommit")
959 .field("version", &self.version)
960 .field("metrics", &self.metrics)
961 .finish()
962 }
963}
964
965impl FinalizedCommit {
966 pub fn snapshot(&self) -> DeltaTableState {
968 self.snapshot.clone()
969 }
970 pub fn version(&self) -> i64 {
972 self.version
973 }
974}
975
976impl std::future::IntoFuture for PostCommit {
977 type Output = DeltaResult<FinalizedCommit>;
978 type IntoFuture = BoxFuture<'static, Self::Output>;
979
980 fn into_future(self) -> Self::IntoFuture {
981 let this = self;
982
983 Box::pin(async move {
984 match this.run_post_commit_hook().await {
985 Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
986 snapshot,
987 version: this.version,
988 metrics: Metrics {
989 num_retries: this.metrics.num_retries,
990 new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
991 num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
992 },
993 }),
994 Err(err) => Err(err),
995 }
996 })
997 }
998}
999
1000#[cfg(test)]
1001mod tests {
1002 use std::sync::Arc;
1003
1004 use super::*;
1005 use crate::logstore::{
1006 LogStore, StorageConfig, commit_uri_from_version, default_logstore::DefaultLogStore,
1007 };
1008 use object_store::{ObjectStore, PutPayload, memory::InMemory};
1009 use url::Url;
1010
1011 #[test]
1012 fn test_commit_uri_from_version() {
1013 let version = commit_uri_from_version(0);
1014 assert_eq!(version, Path::from("_delta_log/00000000000000000000.json"));
1015 let version = commit_uri_from_version(123);
1016 assert_eq!(version, Path::from("_delta_log/00000000000000000123.json"))
1017 }
1018
1019 #[tokio::test]
1020 async fn test_try_commit_transaction() {
1021 let store = Arc::new(InMemory::new());
1022 let url = Url::parse("mem://what/is/this").unwrap();
1023 let log_store = DefaultLogStore::new(
1024 store.clone(),
1025 store.clone(),
1026 crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1027 );
1028 let version_path = Path::from("_delta_log/00000000000000000000.json");
1029 store.put(&version_path, PutPayload::new()).await.unwrap();
1030
1031 let res = log_store
1032 .write_commit_entry(
1033 0,
1034 CommitOrBytes::LogBytes(PutPayload::new().into()),
1035 Uuid::new_v4(),
1036 )
1037 .await;
1038 assert!(res.is_err());
1040
1041 log_store
1043 .write_commit_entry(
1044 1,
1045 CommitOrBytes::LogBytes(PutPayload::new().into()),
1046 Uuid::new_v4(),
1047 )
1048 .await
1049 .unwrap();
1050 }
1051
1052 #[test]
1053 fn test_commit_with_retries_tracing_span() {
1054 let span = info_span!(
1055 "commit_with_retries",
1056 base_version = 5,
1057 max_retries = 10,
1058 attempt = field::Empty,
1059 target_version = field::Empty,
1060 conflicts_checked = 0
1061 );
1062
1063 let metadata = span.metadata().expect("span should have metadata");
1064 assert_eq!(metadata.name(), "commit_with_retries");
1065 assert_eq!(metadata.level(), &Level::INFO);
1066 assert!(metadata.is_span());
1067
1068 span.record("attempt", 1);
1069 span.record("target_version", 6);
1070 span.record("conflicts_checked", 2);
1071 }
1072
1073 #[test]
1074 fn test_commit_properties_with_retries() {
1075 let props = CommitProperties::default()
1076 .with_max_retries(5)
1077 .with_create_checkpoint(false);
1078
1079 assert_eq!(props.max_retries, 5);
1080 assert!(!props.create_checkpoint);
1081 }
1082
1083 #[test]
1084 fn test_commit_metrics() {
1085 let metrics = CommitMetrics { num_retries: 3 };
1086 assert_eq!(metrics.num_retries, 3);
1087 }
1088}