1use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use delta_kernel::table_properties::TableProperties;
83use futures::future::BoxFuture;
84use object_store::Error as ObjectStoreError;
85use object_store::ObjectStoreExt as _;
86use object_store::path::Path;
87use serde_json::Value;
88use tracing::*;
89use uuid::Uuid;
90
91use delta_kernel::table_features::TableFeature;
92use serde::{Deserialize, Serialize};
93
94use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
95use crate::errors::DeltaTableError;
96use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction, Version};
97use crate::logstore::ObjectStoreRef;
98use crate::logstore::{CommitOrBytes, LogStoreRef};
99use crate::operations::CustomExecuteHandler;
100use crate::protocol::DeltaOperation;
101use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
102use crate::table::config::TablePropertiesExt as _;
103use crate::table::state::DeltaTableState;
104use crate::{DeltaResult, crate_version};
105
106pub use self::conflict_checker::CommitConflictError;
107pub use self::protocol::INSTANCE as PROTOCOL;
108
109#[cfg(test)]
110pub(crate) mod application;
111mod conflict_checker;
112mod protocol;
113#[cfg(feature = "datafusion")]
114mod state;
115
116const DELTA_LOG_FOLDER: &str = "_delta_log";
117pub(crate) const DEFAULT_RETRIES: usize = 15;
118
119#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
120#[serde(rename_all = "camelCase")]
121pub struct CommitMetrics {
122 pub num_retries: u64,
124}
125
126#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct PostCommitMetrics {
129 pub new_checkpoint_created: bool,
131
132 pub num_log_files_cleaned_up: u64,
134}
135
136#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct Metrics {
139 pub num_retries: u64,
141
142 pub new_checkpoint_created: bool,
144
145 pub num_log_files_cleaned_up: u64,
147}
148
149#[derive(thiserror::Error, Debug)]
151pub enum TransactionError {
152 #[error("Tried committing existing table version: {0}")]
154 VersionAlreadyExists(Version),
155
156 #[error("Error serializing commit log to json: {json_err}")]
158 SerializeLogJson {
159 json_err: serde_json::error::Error,
161 },
162
163 #[error("Log storage error: {}", .source)]
165 ObjectStore {
166 #[from]
168 source: ObjectStoreError,
169 },
170
171 #[error("Failed to commit transaction: {0}")]
173 CommitConflict(#[from] CommitConflictError),
174
175 #[error("Failed to commit transaction: {0}")]
177 MaxCommitAttempts(i32),
178
179 #[error(
181 "The transaction includes Remove action with data change but Delta table is append-only"
182 )]
183 DeltaTableAppendOnly,
184
185 #[error("Unsupported table features required: {0:?}")]
187 UnsupportedTableFeatures(Vec<TableFeature>),
188
189 #[error("Table features must be specified, please specify: {0:?}")]
191 TableFeaturesRequired(TableFeature),
192
193 #[error("Transaction failed: {msg}")]
196 LogStoreError {
197 msg: String,
199 source: Box<dyn std::error::Error + Send + Sync + 'static>,
201 },
202}
203
204impl From<TransactionError> for DeltaTableError {
205 fn from(err: TransactionError) -> Self {
206 match err {
207 TransactionError::VersionAlreadyExists(version) => {
208 DeltaTableError::VersionAlreadyExists(version)
209 }
210 TransactionError::SerializeLogJson { json_err } => {
211 DeltaTableError::SerializeLogJson { json_err }
212 }
213 TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
214 other => DeltaTableError::Transaction { source: other },
215 }
216 }
217}
218
219#[derive(thiserror::Error, Debug)]
221pub enum CommitBuilderError {}
222
223impl From<CommitBuilderError> for DeltaTableError {
224 fn from(err: CommitBuilderError) -> Self {
225 DeltaTableError::CommitValidation { source: err }
226 }
227}
228
229pub trait TableReference: Send + Sync {
231 fn config(&self) -> &TableProperties;
233
234 fn protocol(&self) -> &Protocol;
236
237 fn metadata(&self) -> &Metadata;
239
240 fn eager_snapshot(&self) -> &EagerSnapshot;
242}
243
244impl TableReference for EagerSnapshot {
245 fn protocol(&self) -> &Protocol {
246 EagerSnapshot::protocol(self)
247 }
248
249 fn metadata(&self) -> &Metadata {
250 EagerSnapshot::metadata(self)
251 }
252
253 fn config(&self) -> &TableProperties {
254 self.table_properties()
255 }
256
257 fn eager_snapshot(&self) -> &EagerSnapshot {
258 self
259 }
260}
261
262impl TableReference for DeltaTableState {
263 fn config(&self) -> &TableProperties {
264 self.snapshot.config()
265 }
266
267 fn protocol(&self) -> &Protocol {
268 self.snapshot.protocol()
269 }
270
271 fn metadata(&self) -> &Metadata {
272 self.snapshot.metadata()
273 }
274
275 fn eager_snapshot(&self) -> &EagerSnapshot {
276 &self.snapshot
277 }
278}
279
280#[derive(Debug)]
282pub struct CommitData {
283 pub actions: Vec<Action>,
285 pub operation: DeltaOperation,
287 pub app_metadata: HashMap<String, Value>,
289 pub app_transactions: Vec<Transaction>,
291}
292
293impl CommitData {
294 pub fn new(
296 mut actions: Vec<Action>,
297 operation: DeltaOperation,
298 mut app_metadata: HashMap<String, Value>,
299 app_transactions: Vec<Transaction>,
300 ) -> Self {
301 if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
302 let mut commit_info = operation.get_commit_info();
303 commit_info.timestamp = Some(Utc::now().timestamp_millis());
304 app_metadata
305 .entry("clientVersion".to_string())
306 .or_insert(Value::String(format!("delta-rs.{}", crate_version())));
307 app_metadata.extend(commit_info.info);
308 commit_info.info = app_metadata.clone();
309 actions.insert(0, Action::CommitInfo(commit_info));
311 }
312
313 for txn in &app_transactions {
314 actions.push(Action::Txn(txn.clone()))
315 }
316
317 CommitData {
318 actions,
319 operation,
320 app_metadata,
321 app_transactions,
322 }
323 }
324
325 pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
327 let mut jsons = Vec::<String>::new();
328 for action in &self.actions {
329 let json = serde_json::to_string(action)
330 .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
331 jsons.push(json);
332 }
333 Ok(bytes::Bytes::from(jsons.join("\n")))
334 }
335}
336
337#[derive(Clone, Debug, Copy)]
338pub struct PostCommitHookProperties {
340 create_checkpoint: bool,
341 cleanup_expired_logs: Option<bool>,
343}
344
345#[derive(Clone, Debug)]
346pub struct CommitProperties {
349 pub(crate) app_metadata: HashMap<String, Value>,
350 pub(crate) app_transaction: Vec<Transaction>,
351 max_retries: usize,
352 create_checkpoint: bool,
353 cleanup_expired_logs: Option<bool>,
354}
355
356impl Default for CommitProperties {
357 fn default() -> Self {
358 Self {
359 app_metadata: Default::default(),
360 app_transaction: Vec::new(),
361 max_retries: DEFAULT_RETRIES,
362 create_checkpoint: true,
363 cleanup_expired_logs: None,
364 }
365 }
366}
367
368impl CommitProperties {
369 pub fn with_metadata(
371 mut self,
372 metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
373 ) -> Self {
374 self.app_metadata = HashMap::from_iter(metadata);
375 self
376 }
377
378 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
380 self.max_retries = max_retries;
381 self
382 }
383
384 pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
386 self.create_checkpoint = create_checkpoint;
387 self
388 }
389
390 pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
392 self.app_transaction.push(txn);
393 self
394 }
395
396 pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
398 self.app_transaction = txn;
399 self
400 }
401
402 pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
404 self.cleanup_expired_logs = cleanup_expired_logs;
405 self
406 }
407}
408
409impl From<CommitProperties> for CommitBuilder {
410 fn from(value: CommitProperties) -> Self {
411 CommitBuilder {
412 max_retries: value.max_retries,
413 app_metadata: value.app_metadata,
414 post_commit_hook: Some(PostCommitHookProperties {
415 create_checkpoint: value.create_checkpoint,
416 cleanup_expired_logs: value.cleanup_expired_logs,
417 }),
418 app_transaction: value.app_transaction,
419 ..Default::default()
420 }
421 }
422}
423
424pub struct CommitBuilder {
426 actions: Vec<Action>,
427 app_metadata: HashMap<String, Value>,
428 app_transaction: Vec<Transaction>,
429 max_retries: usize,
430 post_commit_hook: Option<PostCommitHookProperties>,
431 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
432 operation_id: Uuid,
433}
434
435impl Default for CommitBuilder {
436 fn default() -> Self {
437 CommitBuilder {
438 actions: Vec::new(),
439 app_metadata: HashMap::new(),
440 app_transaction: Vec::new(),
441 max_retries: DEFAULT_RETRIES,
442 post_commit_hook: None,
443 post_commit_hook_handler: None,
444 operation_id: Uuid::new_v4(),
445 }
446 }
447}
448
449impl<'a> CommitBuilder {
450 pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
452 self.actions = actions;
453 self
454 }
455
456 pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
458 self.app_metadata = app_metadata;
459 self
460 }
461
462 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
464 self.max_retries = max_retries;
465 self
466 }
467
468 pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
470 self.post_commit_hook = Some(post_commit_hook);
471 self
472 }
473
474 pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
476 self.operation_id = operation_id;
477 self
478 }
479
480 pub fn with_post_commit_hook_handler(
482 mut self,
483 handler: Option<Arc<dyn CustomExecuteHandler>>,
484 ) -> Self {
485 self.post_commit_hook_handler = handler;
486 self
487 }
488
489 pub fn build(
491 self,
492 table_data: Option<&'a dyn TableReference>,
493 log_store: LogStoreRef,
494 operation: DeltaOperation,
495 ) -> PreCommit<'a> {
496 let data = CommitData::new(
497 self.actions,
498 operation,
499 self.app_metadata,
500 self.app_transaction,
501 );
502 PreCommit {
503 log_store,
504 table_data,
505 max_retries: self.max_retries,
506 data,
507 post_commit_hook: self.post_commit_hook,
508 post_commit_hook_handler: self.post_commit_hook_handler,
509 operation_id: self.operation_id,
510 }
511 }
512}
513
514pub struct PreCommit<'a> {
516 log_store: LogStoreRef,
517 table_data: Option<&'a dyn TableReference>,
518 data: CommitData,
519 max_retries: usize,
520 post_commit_hook: Option<PostCommitHookProperties>,
521 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
522 operation_id: Uuid,
523}
524
525impl<'a> std::future::IntoFuture for PreCommit<'a> {
526 type Output = DeltaResult<FinalizedCommit>;
527 type IntoFuture = BoxFuture<'a, Self::Output>;
528
529 fn into_future(self) -> Self::IntoFuture {
530 Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
531 }
532}
533
534impl<'a> PreCommit<'a> {
535 pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
537 let this = self;
538
539 async fn write_tmp_commit(
542 log_entry: Bytes,
543 store: ObjectStoreRef,
544 ) -> DeltaResult<CommitOrBytes> {
545 let token = uuid::Uuid::new_v4().to_string();
546 let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
547 store.put(&path, log_entry.into()).await?;
548 Ok(CommitOrBytes::TmpCommit(path))
549 }
550
551 Box::pin(async move {
552 if let Some(table_reference) = this.table_data {
553 PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
554 }
555 let log_entry = this.data.get_bytes()?;
556
557 let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
560 .contains(&this.log_store.name().as_str())
561 {
562 CommitOrBytes::LogBytes(log_entry)
563 } else {
564 write_tmp_commit(
565 log_entry,
566 this.log_store.object_store(Some(this.operation_id)),
567 )
568 .await?
569 };
570
571 Ok(PreparedCommit {
572 commit_or_bytes,
573 log_store: this.log_store,
574 table_data: this.table_data,
575 max_retries: this.max_retries,
576 data: this.data,
577 post_commit: this.post_commit_hook,
578 post_commit_hook_handler: this.post_commit_hook_handler,
579 operation_id: this.operation_id,
580 })
581 })
582 }
583}
584
585pub struct PreparedCommit<'a> {
587 commit_or_bytes: CommitOrBytes,
588 log_store: LogStoreRef,
589 data: CommitData,
590 table_data: Option<&'a dyn TableReference>,
591 max_retries: usize,
592 post_commit: Option<PostCommitHookProperties>,
593 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
594 operation_id: Uuid,
595}
596
597impl PreparedCommit<'_> {
598 pub fn commit_or_bytes(&self) -> &CommitOrBytes {
600 &self.commit_or_bytes
601 }
602}
603
604impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
605 type Output = DeltaResult<PostCommit>;
606 type IntoFuture = BoxFuture<'a, Self::Output>;
607
608 fn into_future(self) -> Self::IntoFuture {
609 let this = self;
610
611 Box::pin(async move {
612 let commit_or_bytes = this.commit_or_bytes;
613
614 let mut attempt_number: usize = 1;
615
616 let read_snapshot: EagerSnapshot = if let Some(table_data) = this.table_data {
618 table_data.eager_snapshot().clone()
619 } else {
620 debug!("committing initial table version 0");
621 match this
622 .log_store
623 .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
624 .await
625 {
626 Ok(_) => {
627 return Ok(PostCommit {
628 version: 0,
629 data: this.data,
630 create_checkpoint: false,
631 cleanup_expired_logs: None,
632 log_store: this.log_store,
633 table_data: None,
634 custom_execute_handler: this.post_commit_hook_handler,
635 metrics: CommitMetrics { num_retries: 0 },
636 });
637 }
638 Err(TransactionError::VersionAlreadyExists(0)) => {
639 debug!("version 0 already exists, loading table state for retry");
642 attempt_number = 2;
643 let latest_version: Version = this.log_store.get_latest_version(0).await?;
644 EagerSnapshot::try_new(
645 this.log_store.as_ref(),
646 Default::default(),
647 Some(latest_version),
648 )
649 .await?
650 }
651 Err(e) => return Err(e.into()),
652 }
653 };
654
655 let mut read_snapshot = read_snapshot;
656
657 let commit_span = info_span!(
658 "commit_with_retries",
659 base_version = read_snapshot.version(),
660 max_retries = this.max_retries,
661 attempt = field::Empty,
662 target_version = field::Empty,
663 conflicts_checked = 0
664 );
665
666 async move {
667 let total_retries = this.max_retries + 1;
668 while attempt_number <= total_retries {
669 Span::current().record("attempt", attempt_number);
670 let latest_version = this
671 .log_store
672 .get_latest_version(read_snapshot.version())
673 .await?;
674
675 if latest_version > read_snapshot.version() {
676 if this.max_retries == 0 {
679 warn!(
680 base_version = read_snapshot.version(),
681 latest_version = latest_version,
682 "table updated but max_retries is 0, failing immediately"
683 );
684 return Err(TransactionError::MaxCommitAttempts(
685 this.max_retries as i32,
686 )
687 .into());
688 }
689 warn!(
690 base_version = read_snapshot.version(),
691 latest_version = latest_version,
692 versions_behind = latest_version - read_snapshot.version(),
693 "table updated during transaction, checking for conflicts"
694 );
695 let mut steps = latest_version - read_snapshot.version();
696 let mut conflicts_checked = 0;
697
698 while steps != 0 {
701 conflicts_checked += 1;
702 let summary = WinningCommitSummary::try_new(
703 this.log_store.as_ref(),
704 latest_version - steps,
705 (latest_version - steps) + 1,
706 )
707 .await?;
708 let transaction_info = TransactionInfo::try_new(
709 read_snapshot.log_data(),
710 this.data.operation.read_predicate(),
711 &this.data.actions,
712 this.data.operation.read_whole_table(),
713 )?;
714 let conflict_checker = ConflictChecker::new(
715 transaction_info,
716 summary,
717 Some(&this.data.operation),
718 );
719
720 match conflict_checker.check_conflicts() {
721 Ok(_) => {}
722 Err(err) => {
723 error!(
724 conflicts_checked = conflicts_checked,
725 error = %err,
726 "conflict detected, aborting transaction"
727 );
728 return Err(TransactionError::CommitConflict(err).into());
729 }
730 }
731 steps -= 1;
732 }
733 Span::current().record("conflicts_checked", conflicts_checked);
734 debug!(
735 conflicts_checked = conflicts_checked,
736 "all conflicts resolved, updating snapshot"
737 );
738 read_snapshot
740 .update(&this.log_store, Some(latest_version))
741 .await?;
742 }
743 let version: Version = latest_version + 1;
744 Span::current().record("target_version", version);
745
746 match this
747 .log_store
748 .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
749 .await
750 {
751 Ok(()) => {
752 info!(
753 version = version,
754 num_retries = attempt_number - 1,
755 "transaction committed successfully"
756 );
757 return Ok(PostCommit {
758 version,
759 data: this.data,
760 create_checkpoint: this
761 .post_commit
762 .map(|v| v.create_checkpoint)
763 .unwrap_or_default(),
764 cleanup_expired_logs: this
765 .post_commit
766 .map(|v| v.cleanup_expired_logs)
767 .unwrap_or_default(),
768 log_store: this.log_store,
769 table_data: Some(Box::new(read_snapshot)),
770 custom_execute_handler: this.post_commit_hook_handler,
771 metrics: CommitMetrics {
772 num_retries: (attempt_number - 1) as u64,
773 },
774 });
775 }
776 Err(TransactionError::VersionAlreadyExists(version)) => {
777 warn!(
778 version = version,
779 attempt = attempt_number,
780 "version already exists, will retry"
781 );
782 attempt_number += 1;
785 }
786 Err(err) => {
787 error!(
788 version = version,
789 error = %err,
790 "commit failed, aborting"
791 );
792 this.log_store
793 .abort_commit_entry(version, commit_or_bytes, this.operation_id)
794 .await?;
795 return Err(err.into());
796 }
797 }
798 }
799
800 error!(
801 max_retries = this.max_retries,
802 "exceeded maximum commit attempts"
803 );
804 Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
805 }
806 .instrument(commit_span)
807 .await
808 })
809 }
810}
811
812pub struct PostCommit {
814 pub version: Version,
816 pub data: CommitData,
818 create_checkpoint: bool,
819 cleanup_expired_logs: Option<bool>,
820 log_store: LogStoreRef,
821 table_data: Option<Box<dyn TableReference>>,
822 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
823 metrics: CommitMetrics,
824}
825
826impl PostCommit {
827 async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
829 if let Some(table) = &self.table_data {
830 let post_commit_operation_id = Uuid::new_v4();
831 let mut snapshot = table.eager_snapshot().clone();
832 if self.version != snapshot.version() {
833 snapshot.update(&self.log_store, Some(self.version)).await?;
834 }
835
836 let mut state = DeltaTableState { snapshot };
837
838 let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
839 cleanup_logs
840 } else {
841 state.table_config().enable_expired_log_cleanup()
842 };
843
844 if let Some(custom_execute_handler) = &self.custom_execute_handler {
846 custom_execute_handler
847 .before_post_commit_hook(
848 &self.log_store,
849 cleanup_logs || self.create_checkpoint,
850 post_commit_operation_id,
851 )
852 .await?
853 }
854
855 let mut new_checkpoint_created = false;
856 if self.create_checkpoint {
857 new_checkpoint_created = self
859 .create_checkpoint(
860 &state,
861 &self.log_store,
862 self.version,
863 post_commit_operation_id,
864 )
865 .await?;
866 }
867
868 let mut num_log_files_cleaned_up: u64 = 0;
869 if cleanup_logs {
870 num_log_files_cleaned_up = cleanup_expired_logs_for(
872 self.version,
873 self.log_store.as_ref(),
874 Utc::now().timestamp_millis()
875 - state.table_config().log_retention_duration().as_millis() as i64,
876 Some(post_commit_operation_id),
877 )
878 .await? as u64;
879 if num_log_files_cleaned_up > 0 {
880 state = DeltaTableState::try_new(
881 &self.log_store,
882 state.load_config().clone(),
883 Some(self.version),
884 )
885 .await?;
886 }
887 }
888
889 if let Some(custom_execute_handler) = &self.custom_execute_handler {
891 custom_execute_handler
892 .after_post_commit_hook(
893 &self.log_store,
894 cleanup_logs || self.create_checkpoint,
895 post_commit_operation_id,
896 )
897 .await?
898 }
899 Ok((
900 state,
901 PostCommitMetrics {
902 new_checkpoint_created,
903 num_log_files_cleaned_up,
904 },
905 ))
906 } else {
907 let state =
908 DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
909 .await?;
910 Ok((
911 state,
912 PostCommitMetrics {
913 new_checkpoint_created: false,
914 num_log_files_cleaned_up: 0,
915 },
916 ))
917 }
918 }
919 async fn create_checkpoint(
920 &self,
921 table_state: &DeltaTableState,
922 log_store: &LogStoreRef,
923 version: Version,
924 operation_id: Uuid,
925 ) -> DeltaResult<bool> {
926 if !table_state.load_config().require_files {
927 warn!(
928 "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
929 );
930 return Ok(false);
931 }
932
933 let checkpoint_interval = table_state.config().checkpoint_interval().get();
934 if (version + 1).is_multiple_of(checkpoint_interval) {
935 create_checkpoint_for(version, log_store.as_ref(), Some(operation_id)).await?;
936 Ok(true)
937 } else {
938 Ok(false)
939 }
940 }
941}
942
943pub struct FinalizedCommit {
945 pub snapshot: DeltaTableState,
947
948 pub version: Version,
950
951 pub metrics: Metrics,
953}
954
955impl std::fmt::Debug for FinalizedCommit {
956 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
957 f.debug_struct("FinalizedCommit")
958 .field("version", &self.version)
959 .field("metrics", &self.metrics)
960 .finish()
961 }
962}
963
964impl FinalizedCommit {
965 pub fn snapshot(&self) -> DeltaTableState {
967 self.snapshot.clone()
968 }
969 pub fn version(&self) -> Version {
971 self.version
972 }
973}
974
975impl std::future::IntoFuture for PostCommit {
976 type Output = DeltaResult<FinalizedCommit>;
977 type IntoFuture = BoxFuture<'static, Self::Output>;
978
979 fn into_future(self) -> Self::IntoFuture {
980 let this = self;
981
982 Box::pin(async move {
983 match this.run_post_commit_hook().await {
984 Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
985 snapshot,
986 version: this.version,
987 metrics: Metrics {
988 num_retries: this.metrics.num_retries,
989 new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
990 num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
991 },
992 }),
993 Err(err) => Err(err),
994 }
995 })
996 }
997}
998
999#[cfg(test)]
1000mod tests {
1001 use std::sync::Arc;
1002
1003 use super::*;
1004 use crate::logstore::{LogStore, StorageConfig, default_logstore::DefaultLogStore};
1005 use object_store::{ObjectStore, PutPayload, memory::InMemory};
1006 use serde_json::json;
1007 use url::Url;
1008
1009 #[tokio::test]
1010 async fn test_try_commit_transaction() {
1011 let store = Arc::new(InMemory::new());
1012 let url = Url::parse("mem://what/is/this").unwrap();
1013 let log_store = DefaultLogStore::new(
1014 store.clone(),
1015 store.clone(),
1016 crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1017 );
1018 let version_path = Path::from("_delta_log/00000000000000000000.json");
1019 store.put(&version_path, PutPayload::new()).await.unwrap();
1020
1021 let res = log_store
1022 .write_commit_entry(
1023 0,
1024 CommitOrBytes::LogBytes(PutPayload::new().into()),
1025 Uuid::new_v4(),
1026 )
1027 .await;
1028 assert!(res.is_err());
1030
1031 log_store
1033 .write_commit_entry(
1034 1,
1035 CommitOrBytes::LogBytes(PutPayload::new().into()),
1036 Uuid::new_v4(),
1037 )
1038 .await
1039 .unwrap();
1040 }
1041
1042 #[test]
1043 fn test_commit_with_retries_tracing_span() {
1044 let span = info_span!(
1045 "commit_with_retries",
1046 base_version = 5,
1047 max_retries = 10,
1048 attempt = field::Empty,
1049 target_version = field::Empty,
1050 conflicts_checked = 0
1051 );
1052
1053 let metadata = span.metadata().expect("span should have metadata");
1054 assert_eq!(metadata.name(), "commit_with_retries");
1055 assert_eq!(metadata.level(), &Level::INFO);
1056 assert!(metadata.is_span());
1057
1058 span.record("attempt", 1);
1059 span.record("target_version", 6);
1060 span.record("conflicts_checked", 2);
1061 }
1062
1063 #[test]
1064 fn test_commit_properties_with_retries() {
1065 let props = CommitProperties::default()
1066 .with_max_retries(5)
1067 .with_create_checkpoint(false);
1068
1069 assert_eq!(props.max_retries, 5);
1070 assert!(!props.create_checkpoint);
1071 }
1072
1073 #[test]
1074 fn test_commit_metrics() {
1075 let metrics = CommitMetrics { num_retries: 3 };
1076 assert_eq!(metrics.num_retries, 3);
1077 }
1078
1079 #[test]
1080 fn test_commit_data_client_version() {
1081 let no_metadata = CommitData::new(
1082 vec![],
1083 DeltaOperation::FileSystemCheck {},
1084 HashMap::new(),
1085 vec![],
1086 );
1087 assert_eq!(
1088 *no_metadata.app_metadata.get("clientVersion").unwrap(),
1089 json!(format!("delta-rs.{}", crate_version()))
1090 );
1091
1092 let with_metadata = CommitData::new(
1093 vec![],
1094 DeltaOperation::FileSystemCheck {},
1095 HashMap::from([("clientVersion".to_owned(), json!("test-client.0.0.1"))]),
1096 vec![],
1097 );
1098 assert_eq!(
1099 *with_metadata.app_metadata.get("clientVersion").unwrap(),
1100 json!("test-client.0.0.1")
1101 );
1102 }
1103}