1use std::collections::HashMap;
77use std::str::FromStr;
78use std::sync::Arc;
79
80use bytes::Bytes;
81use chrono::Utc;
82use conflict_checker::ConflictChecker;
83use delta_kernel::table_properties::TableProperties;
84use futures::future::BoxFuture;
85use object_store::Error as ObjectStoreError;
86use object_store::ObjectStoreExt as _;
87use object_store::path::Path;
88use serde_json::Value;
89use tracing::*;
90use uuid::Uuid;
91
92use delta_kernel::table_features::TableFeature;
93use serde::{Deserialize, Serialize};
94
95use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
96use crate::errors::DeltaTableError;
97use crate::kernel::{
98 Action, CommitInfo, EagerSnapshot, IsolationLevel, Metadata, Protocol, Transaction, Version,
99};
100use crate::logstore::ObjectStoreRef;
101use crate::logstore::{CommitOrBytes, LogStoreRef};
102use crate::operations::CustomExecuteHandler;
103use crate::protocol::{DeltaOperation, operation_parameter_value};
104use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
105use crate::table::config::TablePropertiesExt as _;
106use crate::table::state::DeltaTableState;
107use crate::{DeltaResult, crate_version};
108
109pub use self::conflict_checker::CommitConflictError;
110pub use self::protocol::INSTANCE as PROTOCOL;
111
112#[cfg(test)]
113pub(crate) mod application;
114mod conflict_checker;
115mod protocol;
116#[cfg(feature = "datafusion")]
117mod state;
118
119const DELTA_LOG_FOLDER: &str = "_delta_log";
120pub(crate) const DEFAULT_RETRIES: usize = 15;
121const RESERVED_COMMIT_INFO_KEYS: &[&str] = &[
125 "timestamp",
126 "userId",
127 "userName",
128 "operation",
129 "operationParameters",
130 "readVersion",
131 "isolationLevel",
132 "isBlindAppend",
133 "engineInfo",
134 "userMetadata",
135];
136
137#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct CommitMetrics {
140 pub num_retries: u64,
142}
143
144#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct PostCommitMetrics {
147 pub new_checkpoint_created: bool,
149
150 pub num_log_files_cleaned_up: u64,
152}
153
154#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub struct Metrics {
157 pub num_retries: u64,
159
160 pub new_checkpoint_created: bool,
162
163 pub num_log_files_cleaned_up: u64,
165}
166
167#[derive(thiserror::Error, Debug)]
169pub enum TransactionError {
170 #[error("Tried committing existing table version: {0}")]
172 VersionAlreadyExists(Version),
173
174 #[error("Error serializing commit log to json: {json_err}")]
176 SerializeLogJson {
177 json_err: serde_json::error::Error,
179 },
180
181 #[error("Log storage error: {}", .source)]
183 ObjectStore {
184 #[from]
186 source: ObjectStoreError,
187 },
188
189 #[error("Failed to commit transaction: {0}")]
191 CommitConflict(#[from] CommitConflictError),
192
193 #[error("Failed to commit transaction: {0}")]
195 MaxCommitAttempts(i32),
196
197 #[error(
199 "The transaction includes Remove action with data change but Delta table is append-only"
200 )]
201 DeltaTableAppendOnly,
202
203 #[error("Unsupported table features required: {0:?}")]
205 UnsupportedTableFeatures(Vec<TableFeature>),
206
207 #[error("Table features must be specified, please specify: {0:?}")]
209 TableFeaturesRequired(TableFeature),
210
211 #[error("Transaction failed: {msg}")]
214 LogStoreError {
215 msg: String,
217 source: Box<dyn std::error::Error + Send + Sync + 'static>,
219 },
220}
221
222impl From<TransactionError> for DeltaTableError {
223 fn from(err: TransactionError) -> Self {
224 match err {
225 TransactionError::VersionAlreadyExists(version) => {
226 DeltaTableError::VersionAlreadyExists(version)
227 }
228 TransactionError::SerializeLogJson { json_err } => {
229 DeltaTableError::SerializeLogJson { json_err }
230 }
231 TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
232 other => DeltaTableError::Transaction { source: other },
233 }
234 }
235}
236
237#[derive(thiserror::Error, Debug)]
239pub enum CommitBuilderError {}
240
241impl From<CommitBuilderError> for DeltaTableError {
242 fn from(err: CommitBuilderError) -> Self {
243 DeltaTableError::CommitValidation { source: err }
244 }
245}
246
247pub trait TableReference: Send + Sync {
249 fn config(&self) -> &TableProperties;
251
252 fn protocol(&self) -> &Protocol;
254
255 fn metadata(&self) -> &Metadata;
257
258 fn eager_snapshot(&self) -> &EagerSnapshot;
260}
261
262impl TableReference for EagerSnapshot {
263 fn protocol(&self) -> &Protocol {
264 EagerSnapshot::protocol(self)
265 }
266
267 fn metadata(&self) -> &Metadata {
268 EagerSnapshot::metadata(self)
269 }
270
271 fn config(&self) -> &TableProperties {
272 self.table_properties()
273 }
274
275 fn eager_snapshot(&self) -> &EagerSnapshot {
276 self
277 }
278}
279
280impl TableReference for DeltaTableState {
281 fn config(&self) -> &TableProperties {
282 self.snapshot.config()
283 }
284
285 fn protocol(&self) -> &Protocol {
286 self.snapshot.protocol()
287 }
288
289 fn metadata(&self) -> &Metadata {
290 self.snapshot.metadata()
291 }
292
293 fn eager_snapshot(&self) -> &EagerSnapshot {
294 &self.snapshot
295 }
296}
297
298#[derive(Debug)]
300pub struct CommitData {
301 pub actions: Vec<Action>,
303 pub operation: DeltaOperation,
305 pub app_metadata: HashMap<String, Value>,
307 pub app_transactions: Vec<Transaction>,
309}
310
311fn normalize_reserved_commit_metadata(
319 commit_info: &mut CommitInfo,
320 app_metadata: &mut HashMap<String, Value>,
321) {
322 match app_metadata.remove("operationParameters") {
323 Some(Value::Object(operation_parameters)) => {
324 let generated_parameters = commit_info
325 .operation_parameters
326 .get_or_insert_with(HashMap::new);
327 for (key, value) in operation_parameters {
328 generated_parameters
329 .entry(key)
330 .or_insert_with(|| operation_parameter_value(value));
331 }
332 }
333 Some(value) => log_unexpected_reserved_metadata_type(
334 "operationParameters",
335 &value,
336 "object with string-compatible values",
337 ),
338 None => {}
339 }
340
341 if let Some(value) = app_metadata.remove("readVersion") {
342 if let Some(value) = value.as_u64() {
343 if commit_info.read_version.is_none() {
344 commit_info.read_version = Some(value);
345 }
346 } else {
347 log_unexpected_reserved_metadata_type("readVersion", &value, "non-negative integer");
348 }
349 }
350
351 promote_string_reserved_metadata(app_metadata, "userId", &mut commit_info.user_id);
352 promote_string_reserved_metadata(app_metadata, "userName", &mut commit_info.user_name);
353 promote_string_reserved_metadata(app_metadata, "userMetadata", &mut commit_info.user_metadata);
354
355 if let Some(value) = app_metadata.remove("isolationLevel") {
356 if let Some(value) = value
357 .as_str()
358 .and_then(|value| IsolationLevel::from_str(value).ok())
359 {
360 if commit_info.isolation_level.is_none() {
361 commit_info.isolation_level = Some(value);
362 }
363 } else {
364 log_unexpected_reserved_metadata_type(
365 "isolationLevel",
366 &value,
367 "valid IsolationLevel string",
368 );
369 }
370 }
371
372 if let Some(value) = app_metadata.remove("isBlindAppend") {
373 if let Some(value) = value.as_bool() {
374 if commit_info.is_blind_append.is_none() {
375 commit_info.is_blind_append = Some(value);
376 }
377 } else {
378 log_unexpected_reserved_metadata_type("isBlindAppend", &value, "boolean");
379 }
380 }
381
382 for key in RESERVED_COMMIT_INFO_KEYS {
383 app_metadata.remove(*key);
384 }
385}
386
387fn promote_string_reserved_metadata(
388 metadata: &mut HashMap<String, Value>,
389 key: &'static str,
390 target: &mut Option<String>,
391) {
392 let Some(value) = metadata.remove(key) else {
393 return;
394 };
395
396 if let Value::String(value) = value {
397 if target.is_none() {
398 *target = Some(value);
399 }
400 } else {
401 log_unexpected_reserved_metadata_type(key, &value, "string");
402 }
403}
404
405fn log_unexpected_reserved_metadata_type(key: &str, value: &Value, expected: &str) {
406 debug!(
407 key,
408 expected,
409 actual = json_value_kind(value),
410 "Ignoring reserved commit metadata key with unexpected type"
411 );
412}
413
414fn json_value_kind(value: &Value) -> &'static str {
415 match value {
416 Value::Null => "null",
417 Value::Bool(_) => "boolean",
418 Value::Number(_) => "number",
419 Value::String(_) => "string",
420 Value::Array(_) => "array",
421 Value::Object(_) => "object",
422 }
423}
424
425fn assign_commit_info_metadata(
426 commit_info: &mut CommitInfo,
427 app_metadata: &mut HashMap<String, Value>,
428) {
429 normalize_reserved_commit_metadata(commit_info, app_metadata);
430
431 let mut existing_info = std::mem::take(&mut commit_info.info);
432 normalize_reserved_commit_metadata(commit_info, &mut existing_info);
433 for (key, value) in existing_info {
436 app_metadata.entry(key).or_insert(value);
437 }
438 debug_assert!(
439 RESERVED_COMMIT_INFO_KEYS
440 .iter()
441 .all(|key| !app_metadata.contains_key(*key))
442 );
443
444 commit_info.info = app_metadata.clone();
445}
446
447impl CommitData {
448 pub fn new(
450 mut actions: Vec<Action>,
451 operation: DeltaOperation,
452 mut app_metadata: HashMap<String, Value>,
453 app_transactions: Vec<Transaction>,
454 ) -> Self {
455 if let Some(Action::CommitInfo(commit_info)) = actions
456 .iter_mut()
457 .find(|action| matches!(action, Action::CommitInfo(..)))
458 {
459 assign_commit_info_metadata(commit_info, &mut app_metadata);
463 } else {
464 let mut commit_info = operation.get_commit_info();
465 commit_info.timestamp = Some(Utc::now().timestamp_millis());
466 app_metadata
469 .entry("clientVersion".to_string())
470 .or_insert(Value::String(format!("delta-rs.{}", crate_version())));
471 assign_commit_info_metadata(&mut commit_info, &mut app_metadata);
472 actions.insert(0, Action::CommitInfo(commit_info));
474 }
475
476 for txn in &app_transactions {
477 actions.push(Action::Txn(txn.clone()))
478 }
479
480 CommitData {
481 actions,
482 operation,
483 app_metadata,
484 app_transactions,
485 }
486 }
487
488 pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
490 let mut jsons = Vec::<String>::new();
491 for action in &self.actions {
492 let json = serde_json::to_string(action)
493 .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
494 jsons.push(json);
495 }
496 Ok(bytes::Bytes::from(jsons.join("\n")))
497 }
498}
499
500#[derive(Clone, Debug, Copy)]
501pub struct PostCommitHookProperties {
503 create_checkpoint: bool,
504 cleanup_expired_logs: Option<bool>,
506}
507
508#[derive(Clone, Debug)]
509pub struct CommitProperties {
512 pub(crate) app_metadata: HashMap<String, Value>,
513 pub(crate) app_transaction: Vec<Transaction>,
514 max_retries: usize,
515 create_checkpoint: bool,
516 cleanup_expired_logs: Option<bool>,
517}
518
519impl Default for CommitProperties {
520 fn default() -> Self {
521 Self {
522 app_metadata: Default::default(),
523 app_transaction: Vec::new(),
524 max_retries: DEFAULT_RETRIES,
525 create_checkpoint: true,
526 cleanup_expired_logs: None,
527 }
528 }
529}
530
531impl CommitProperties {
532 pub fn with_metadata(
534 mut self,
535 metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
536 ) -> Self {
537 self.app_metadata = HashMap::from_iter(metadata);
538 self
539 }
540
541 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
543 self.max_retries = max_retries;
544 self
545 }
546
547 pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
549 self.create_checkpoint = create_checkpoint;
550 self
551 }
552
553 pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
555 self.app_transaction.push(txn);
556 self
557 }
558
559 pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
561 self.app_transaction = txn;
562 self
563 }
564
565 pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
567 self.cleanup_expired_logs = cleanup_expired_logs;
568 self
569 }
570}
571
572impl From<CommitProperties> for CommitBuilder {
573 fn from(value: CommitProperties) -> Self {
574 CommitBuilder {
575 max_retries: value.max_retries,
576 app_metadata: value.app_metadata,
577 post_commit_hook: Some(PostCommitHookProperties {
578 create_checkpoint: value.create_checkpoint,
579 cleanup_expired_logs: value.cleanup_expired_logs,
580 }),
581 app_transaction: value.app_transaction,
582 ..Default::default()
583 }
584 }
585}
586
587pub struct CommitBuilder {
589 actions: Vec<Action>,
590 app_metadata: HashMap<String, Value>,
591 app_transaction: Vec<Transaction>,
592 max_retries: usize,
593 post_commit_hook: Option<PostCommitHookProperties>,
594 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
595 operation_id: Uuid,
596}
597
598impl Default for CommitBuilder {
599 fn default() -> Self {
600 CommitBuilder {
601 actions: Vec::new(),
602 app_metadata: HashMap::new(),
603 app_transaction: Vec::new(),
604 max_retries: DEFAULT_RETRIES,
605 post_commit_hook: None,
606 post_commit_hook_handler: None,
607 operation_id: Uuid::new_v4(),
608 }
609 }
610}
611
612impl<'a> CommitBuilder {
613 pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
615 self.actions = actions;
616 self
617 }
618
619 pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
621 self.app_metadata = app_metadata;
622 self
623 }
624
625 pub fn with_max_retries(mut self, max_retries: usize) -> Self {
627 self.max_retries = max_retries;
628 self
629 }
630
631 pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
633 self.post_commit_hook = Some(post_commit_hook);
634 self
635 }
636
637 pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
639 self.operation_id = operation_id;
640 self
641 }
642
643 pub fn with_post_commit_hook_handler(
645 mut self,
646 handler: Option<Arc<dyn CustomExecuteHandler>>,
647 ) -> Self {
648 self.post_commit_hook_handler = handler;
649 self
650 }
651
652 pub fn build(
654 self,
655 table_data: Option<&'a dyn TableReference>,
656 log_store: LogStoreRef,
657 operation: DeltaOperation,
658 ) -> PreCommit<'a> {
659 let data = CommitData::new(
660 self.actions,
661 operation,
662 self.app_metadata,
663 self.app_transaction,
664 );
665 PreCommit {
666 log_store,
667 table_data,
668 max_retries: self.max_retries,
669 data,
670 post_commit_hook: self.post_commit_hook,
671 post_commit_hook_handler: self.post_commit_hook_handler,
672 operation_id: self.operation_id,
673 }
674 }
675}
676
677pub struct PreCommit<'a> {
679 log_store: LogStoreRef,
680 table_data: Option<&'a dyn TableReference>,
681 data: CommitData,
682 max_retries: usize,
683 post_commit_hook: Option<PostCommitHookProperties>,
684 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
685 operation_id: Uuid,
686}
687
688impl<'a> std::future::IntoFuture for PreCommit<'a> {
689 type Output = DeltaResult<FinalizedCommit>;
690 type IntoFuture = BoxFuture<'a, Self::Output>;
691
692 fn into_future(self) -> Self::IntoFuture {
693 Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
694 }
695}
696
697impl<'a> PreCommit<'a> {
698 pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
700 let this = self;
701
702 async fn write_tmp_commit(
705 log_entry: Bytes,
706 store: ObjectStoreRef,
707 ) -> DeltaResult<CommitOrBytes> {
708 let token = uuid::Uuid::new_v4().to_string();
709 let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
710 store.put(&path, log_entry.into()).await?;
711 Ok(CommitOrBytes::TmpCommit(path))
712 }
713
714 Box::pin(async move {
715 if let Some(table_reference) = this.table_data {
716 PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
717 }
718 let log_entry = this.data.get_bytes()?;
719
720 let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
723 .contains(&this.log_store.name().as_str())
724 {
725 CommitOrBytes::LogBytes(log_entry)
726 } else {
727 write_tmp_commit(
728 log_entry,
729 this.log_store.object_store(Some(this.operation_id)),
730 )
731 .await?
732 };
733
734 Ok(PreparedCommit {
735 commit_or_bytes,
736 log_store: this.log_store,
737 table_data: this.table_data,
738 max_retries: this.max_retries,
739 data: this.data,
740 post_commit: this.post_commit_hook,
741 post_commit_hook_handler: this.post_commit_hook_handler,
742 operation_id: this.operation_id,
743 })
744 })
745 }
746}
747
748pub struct PreparedCommit<'a> {
750 commit_or_bytes: CommitOrBytes,
751 log_store: LogStoreRef,
752 data: CommitData,
753 table_data: Option<&'a dyn TableReference>,
754 max_retries: usize,
755 post_commit: Option<PostCommitHookProperties>,
756 post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
757 operation_id: Uuid,
758}
759
760impl PreparedCommit<'_> {
761 pub fn commit_or_bytes(&self) -> &CommitOrBytes {
763 &self.commit_or_bytes
764 }
765}
766
767impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
768 type Output = DeltaResult<PostCommit>;
769 type IntoFuture = BoxFuture<'a, Self::Output>;
770
771 fn into_future(self) -> Self::IntoFuture {
772 let this = self;
773
774 Box::pin(async move {
775 let commit_or_bytes = this.commit_or_bytes;
776
777 let mut attempt_number: usize = 1;
778
779 let read_snapshot: EagerSnapshot = if let Some(table_data) = this.table_data {
781 table_data.eager_snapshot().clone()
782 } else {
783 debug!("committing initial table version 0");
784 match this
785 .log_store
786 .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
787 .await
788 {
789 Ok(_) => {
790 return Ok(PostCommit {
791 version: 0,
792 data: this.data,
793 create_checkpoint: false,
794 cleanup_expired_logs: None,
795 log_store: this.log_store,
796 table_data: None,
797 custom_execute_handler: this.post_commit_hook_handler,
798 metrics: CommitMetrics { num_retries: 0 },
799 });
800 }
801 Err(TransactionError::VersionAlreadyExists(0)) => {
802 debug!("version 0 already exists, loading table state for retry");
805 attempt_number = 2;
806 let latest_version: Version = this.log_store.get_latest_version(0).await?;
807 EagerSnapshot::try_new(
808 this.log_store.as_ref(),
809 Default::default(),
810 Some(latest_version),
811 )
812 .await?
813 }
814 Err(e) => return Err(e.into()),
815 }
816 };
817
818 let mut read_snapshot = read_snapshot;
819
820 let commit_span = info_span!(
821 "commit_with_retries",
822 base_version = read_snapshot.version(),
823 max_retries = this.max_retries,
824 attempt = field::Empty,
825 target_version = field::Empty,
826 conflicts_checked = 0
827 );
828
829 async move {
830 let total_retries = this.max_retries + 1;
831 while attempt_number <= total_retries {
832 Span::current().record("attempt", attempt_number);
833 let latest_version = this
834 .log_store
835 .get_latest_version(read_snapshot.version())
836 .await?;
837
838 if latest_version > read_snapshot.version() {
839 if this.max_retries == 0 {
842 warn!(
843 base_version = read_snapshot.version(),
844 latest_version = latest_version,
845 "table updated but max_retries is 0, failing immediately"
846 );
847 return Err(TransactionError::MaxCommitAttempts(
848 this.max_retries as i32,
849 )
850 .into());
851 }
852 warn!(
853 base_version = read_snapshot.version(),
854 latest_version = latest_version,
855 versions_behind = latest_version - read_snapshot.version(),
856 "table updated during transaction, checking for conflicts"
857 );
858 let mut steps = latest_version - read_snapshot.version();
859 let mut conflicts_checked = 0;
860
861 while steps != 0 {
864 conflicts_checked += 1;
865 let summary = WinningCommitSummary::try_new(
866 this.log_store.as_ref(),
867 latest_version - steps,
868 (latest_version - steps) + 1,
869 )
870 .await?;
871 let transaction_info = TransactionInfo::try_new(
872 read_snapshot.log_data(),
873 this.data.operation.read_predicate(),
874 &this.data.actions,
875 this.data.operation.read_whole_table(),
876 )?;
877 let conflict_checker = ConflictChecker::new(
878 transaction_info,
879 summary,
880 Some(&this.data.operation),
881 );
882
883 match conflict_checker.check_conflicts() {
884 Ok(_) => {}
885 Err(err) => {
886 error!(
887 conflicts_checked = conflicts_checked,
888 error = %err,
889 "conflict detected, aborting transaction"
890 );
891 return Err(TransactionError::CommitConflict(err).into());
892 }
893 }
894 steps -= 1;
895 }
896 Span::current().record("conflicts_checked", conflicts_checked);
897 debug!(
898 conflicts_checked = conflicts_checked,
899 "all conflicts resolved, updating snapshot"
900 );
901 read_snapshot
903 .update(&this.log_store, Some(latest_version))
904 .await?;
905 }
906 let version: Version = latest_version + 1;
907 Span::current().record("target_version", version);
908
909 match this
910 .log_store
911 .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
912 .await
913 {
914 Ok(()) => {
915 info!(
916 version = version,
917 num_retries = attempt_number - 1,
918 "transaction committed successfully"
919 );
920 return Ok(PostCommit {
921 version,
922 data: this.data,
923 create_checkpoint: this
924 .post_commit
925 .map(|v| v.create_checkpoint)
926 .unwrap_or_default(),
927 cleanup_expired_logs: this
928 .post_commit
929 .map(|v| v.cleanup_expired_logs)
930 .unwrap_or_default(),
931 log_store: this.log_store,
932 table_data: Some(Box::new(read_snapshot)),
933 custom_execute_handler: this.post_commit_hook_handler,
934 metrics: CommitMetrics {
935 num_retries: (attempt_number - 1) as u64,
936 },
937 });
938 }
939 Err(TransactionError::VersionAlreadyExists(version)) => {
940 warn!(
941 version = version,
942 attempt = attempt_number,
943 "version already exists, will retry"
944 );
945 attempt_number += 1;
948 }
949 Err(err) => {
950 error!(
951 version = version,
952 error = %err,
953 "commit failed, aborting"
954 );
955 this.log_store
956 .abort_commit_entry(version, commit_or_bytes, this.operation_id)
957 .await?;
958 return Err(err.into());
959 }
960 }
961 }
962
963 error!(
964 max_retries = this.max_retries,
965 "exceeded maximum commit attempts"
966 );
967 Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
968 }
969 .instrument(commit_span)
970 .await
971 })
972 }
973}
974
975pub struct PostCommit {
977 pub version: Version,
979 pub data: CommitData,
981 create_checkpoint: bool,
982 cleanup_expired_logs: Option<bool>,
983 log_store: LogStoreRef,
984 table_data: Option<Box<dyn TableReference>>,
985 custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
986 metrics: CommitMetrics,
987}
988
989impl PostCommit {
990 async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
992 if let Some(table) = &self.table_data {
993 let post_commit_operation_id = Uuid::new_v4();
994 let mut snapshot = table.eager_snapshot().clone();
995 if self.version != snapshot.version() {
996 snapshot.update(&self.log_store, Some(self.version)).await?;
997 }
998
999 let mut state = DeltaTableState { snapshot };
1000
1001 let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
1002 cleanup_logs
1003 } else {
1004 state.table_config().enable_expired_log_cleanup()
1005 };
1006
1007 if let Some(custom_execute_handler) = &self.custom_execute_handler {
1009 custom_execute_handler
1010 .before_post_commit_hook(
1011 &self.log_store,
1012 cleanup_logs || self.create_checkpoint,
1013 post_commit_operation_id,
1014 )
1015 .await?
1016 }
1017
1018 let mut new_checkpoint_created = false;
1019 if self.create_checkpoint {
1020 new_checkpoint_created = self
1022 .create_checkpoint(
1023 &state,
1024 &self.log_store,
1025 self.version,
1026 post_commit_operation_id,
1027 )
1028 .await?;
1029 }
1030
1031 let mut num_log_files_cleaned_up: u64 = 0;
1032 if cleanup_logs {
1033 num_log_files_cleaned_up = cleanup_expired_logs_for(
1035 self.version,
1036 self.log_store.as_ref(),
1037 Utc::now().timestamp_millis()
1038 - state.table_config().log_retention_duration().as_millis() as i64,
1039 Some(post_commit_operation_id),
1040 )
1041 .await? as u64;
1042 if num_log_files_cleaned_up > 0 {
1043 state = DeltaTableState::try_new(
1044 &self.log_store,
1045 state.load_config().clone(),
1046 Some(self.version),
1047 )
1048 .await?;
1049 }
1050 }
1051
1052 if let Some(custom_execute_handler) = &self.custom_execute_handler {
1054 custom_execute_handler
1055 .after_post_commit_hook(
1056 &self.log_store,
1057 cleanup_logs || self.create_checkpoint,
1058 post_commit_operation_id,
1059 )
1060 .await?
1061 }
1062 Ok((
1063 state,
1064 PostCommitMetrics {
1065 new_checkpoint_created,
1066 num_log_files_cleaned_up,
1067 },
1068 ))
1069 } else {
1070 let state =
1071 DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
1072 .await?;
1073 Ok((
1074 state,
1075 PostCommitMetrics {
1076 new_checkpoint_created: false,
1077 num_log_files_cleaned_up: 0,
1078 },
1079 ))
1080 }
1081 }
1082 async fn create_checkpoint(
1083 &self,
1084 table_state: &DeltaTableState,
1085 log_store: &LogStoreRef,
1086 version: Version,
1087 operation_id: Uuid,
1088 ) -> DeltaResult<bool> {
1089 if !table_state.load_config().require_files {
1090 warn!(
1091 "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
1092 );
1093 return Ok(false);
1094 }
1095
1096 let checkpoint_interval = table_state.config().checkpoint_interval().get();
1097 if (version + 1).is_multiple_of(checkpoint_interval) {
1098 create_checkpoint_for(version, log_store.as_ref(), Some(operation_id)).await?;
1099 Ok(true)
1100 } else {
1101 Ok(false)
1102 }
1103 }
1104}
1105
1106pub struct FinalizedCommit {
1108 pub snapshot: DeltaTableState,
1110
1111 pub version: Version,
1113
1114 pub metrics: Metrics,
1116}
1117
1118impl std::fmt::Debug for FinalizedCommit {
1119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1120 f.debug_struct("FinalizedCommit")
1121 .field("version", &self.version)
1122 .field("metrics", &self.metrics)
1123 .finish()
1124 }
1125}
1126
1127impl FinalizedCommit {
1128 pub fn snapshot(&self) -> DeltaTableState {
1130 self.snapshot.clone()
1131 }
1132 pub fn version(&self) -> Version {
1134 self.version
1135 }
1136}
1137
1138impl std::future::IntoFuture for PostCommit {
1139 type Output = DeltaResult<FinalizedCommit>;
1140 type IntoFuture = BoxFuture<'static, Self::Output>;
1141
1142 fn into_future(self) -> Self::IntoFuture {
1143 let this = self;
1144
1145 Box::pin(async move {
1146 match this.run_post_commit_hook().await {
1147 Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
1148 snapshot,
1149 version: this.version,
1150 metrics: Metrics {
1151 num_retries: this.metrics.num_retries,
1152 new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
1153 num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
1154 },
1155 }),
1156 Err(err) => Err(err),
1157 }
1158 })
1159 }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164 use std::sync::Arc;
1165
1166 use super::*;
1167 use crate::kernel::IsolationLevel;
1168 use crate::logstore::{LogStore, StorageConfig, default_logstore::DefaultLogStore};
1169 use crate::protocol::SaveMode;
1170 use object_store::{PutPayload, memory::InMemory};
1171 use serde_json::json;
1172 use url::Url;
1173
1174 #[tokio::test]
1175 async fn test_try_commit_transaction() {
1176 let store = Arc::new(InMemory::new());
1177 let url = Url::parse("mem://what/is/this").unwrap();
1178 let log_store = DefaultLogStore::new(
1179 store.clone(),
1180 store.clone(),
1181 crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1182 );
1183 let version_path = Path::from("_delta_log/00000000000000000000.json");
1184 store.put(&version_path, PutPayload::new()).await.unwrap();
1185
1186 let res = log_store
1187 .write_commit_entry(
1188 0,
1189 CommitOrBytes::LogBytes(PutPayload::new().into()),
1190 Uuid::new_v4(),
1191 )
1192 .await;
1193 assert!(res.is_err());
1195
1196 log_store
1198 .write_commit_entry(
1199 1,
1200 CommitOrBytes::LogBytes(PutPayload::new().into()),
1201 Uuid::new_v4(),
1202 )
1203 .await
1204 .unwrap();
1205 }
1206
1207 #[test]
1208 fn test_commit_with_retries_tracing_span() {
1209 let span = info_span!(
1210 "commit_with_retries",
1211 base_version = 5,
1212 max_retries = 10,
1213 attempt = field::Empty,
1214 target_version = field::Empty,
1215 conflicts_checked = 0
1216 );
1217
1218 let metadata = span.metadata().expect("span should have metadata");
1219 assert_eq!(metadata.name(), "commit_with_retries");
1220 assert_eq!(metadata.level(), &Level::INFO);
1221 assert!(metadata.is_span());
1222
1223 span.record("attempt", 1);
1224 span.record("target_version", 6);
1225 span.record("conflicts_checked", 2);
1226 }
1227
1228 #[test]
1229 fn test_commit_properties_with_retries() {
1230 let props = CommitProperties::default()
1231 .with_max_retries(5)
1232 .with_create_checkpoint(false);
1233
1234 assert_eq!(props.max_retries, 5);
1235 assert!(!props.create_checkpoint);
1236 }
1237
1238 #[test]
1239 fn test_commit_metrics() {
1240 let metrics = CommitMetrics { num_retries: 3 };
1241 assert_eq!(metrics.num_retries, 3);
1242 }
1243
1244 #[test]
1245 fn test_commit_data_client_version() {
1246 let no_metadata = CommitData::new(
1247 vec![],
1248 DeltaOperation::FileSystemCheck {},
1249 HashMap::new(),
1250 vec![],
1251 );
1252 assert_eq!(
1253 *no_metadata.app_metadata.get("clientVersion").unwrap(),
1254 json!(format!("delta-rs.{}", crate_version()))
1255 );
1256
1257 let with_metadata = CommitData::new(
1258 vec![],
1259 DeltaOperation::FileSystemCheck {},
1260 HashMap::from([("clientVersion".to_owned(), json!("test-client.0.0.1"))]),
1261 vec![],
1262 );
1263 assert_eq!(
1264 *with_metadata.app_metadata.get("clientVersion").unwrap(),
1265 json!("test-client.0.0.1")
1266 );
1267 }
1268
1269 fn commit_info(data: &CommitData) -> &CommitInfo {
1270 match &data.actions[0] {
1271 Action::CommitInfo(info) => info,
1272 action => panic!("expected first action to be commitInfo, got {action:?}"),
1273 }
1274 }
1275
1276 #[test]
1277 fn test_commit_data_strips_and_promotes_reserved_metadata() {
1278 let data = CommitData::new(
1279 vec![],
1280 DeltaOperation::FileSystemCheck {},
1281 HashMap::from([
1282 ("readVersion".to_owned(), json!(7)),
1283 ("userId".to_owned(), json!("user-1")),
1284 ("userName".to_owned(), json!("Jane Doe")),
1285 ("userMetadata".to_owned(), json!("metadata")),
1286 ("isolationLevel".to_owned(), json!("SnapshotIsolation")),
1287 ("isBlindAppend".to_owned(), json!(true)),
1288 ("timestamp".to_owned(), json!(1)),
1289 ("operation".to_owned(), json!("CUSTOM OPERATION")),
1290 ("engineInfo".to_owned(), json!("custom-engine")),
1291 ("custom".to_owned(), json!({"kept": true})),
1292 ]),
1293 vec![],
1294 );
1295
1296 let info = commit_info(&data);
1297 assert_eq!(info.read_version, Some(7));
1298 assert_eq!(info.user_id.as_deref(), Some("user-1"));
1299 assert_eq!(info.user_name.as_deref(), Some("Jane Doe"));
1300 assert_eq!(info.user_metadata.as_deref(), Some("metadata"));
1301 assert_eq!(
1302 info.isolation_level,
1303 Some(IsolationLevel::SnapshotIsolation)
1304 );
1305 assert_eq!(info.is_blind_append, Some(true));
1306 assert_eq!(info.operation.as_deref(), Some("FSCK"));
1307 assert_ne!(info.engine_info.as_deref(), Some("custom-engine"));
1308 assert_eq!(info.info.get("custom"), Some(&json!({"kept": true})));
1309
1310 for key in [
1311 "timestamp",
1312 "userId",
1313 "userName",
1314 "operation",
1315 "operationParameters",
1316 "readVersion",
1317 "isolationLevel",
1318 "isBlindAppend",
1319 "engineInfo",
1320 "userMetadata",
1321 ] {
1322 assert!(
1323 !info.info.contains_key(key),
1324 "reserved key {key} must not remain in flattened commit info"
1325 );
1326 }
1327 }
1328
1329 #[test]
1330 fn test_commit_data_merges_operation_parameters_generated_keys_win() {
1331 let data = CommitData::new(
1332 vec![],
1333 DeltaOperation::Write {
1334 mode: SaveMode::Overwrite,
1335 partition_by: Some(vec!["id".to_owned()]),
1336 predicate: None,
1337 },
1338 HashMap::from([(
1339 "operationParameters".to_owned(),
1340 json!({
1341 "mode": "custom-mode",
1342 "partitionBy": "custom-partitioning",
1343 "customParameter": {"kept": true},
1344 "customBoolean": true,
1345 "customNumber": 7,
1346 }),
1347 )]),
1348 vec![],
1349 );
1350
1351 let info = commit_info(&data);
1352 let operation_parameters = info
1353 .operation_parameters
1354 .as_ref()
1355 .expect("operation parameters should be present");
1356 assert_eq!(operation_parameters.get("mode"), Some(&json!("Overwrite")));
1357 assert_eq!(
1358 operation_parameters.get("partitionBy"),
1359 Some(&json!("[\"id\"]"))
1360 );
1361 assert_eq!(
1362 operation_parameters.get("customParameter"),
1363 Some(&json!("{\"kept\":true}"))
1364 );
1365 assert_eq!(
1366 operation_parameters.get("customBoolean"),
1367 Some(&json!("true"))
1368 );
1369 assert_eq!(operation_parameters.get("customNumber"), Some(&json!("7")));
1370 assert!(!info.info.contains_key("operationParameters"));
1371 }
1372
1373 #[test]
1374 fn test_commit_data_normalizes_reserved_keys_from_existing_commit_info_info() {
1375 let data = CommitData::new(
1376 vec![Action::CommitInfo(CommitInfo {
1377 info: HashMap::from([
1378 ("userName".to_owned(), json!("shadow-user")),
1379 (
1380 "operationParameters".to_owned(),
1381 json!({"custom": {"kept": true}}),
1382 ),
1383 ("custom".to_owned(), json!("kept")),
1384 ]),
1385 ..Default::default()
1386 })],
1387 DeltaOperation::FileSystemCheck {},
1388 HashMap::from([("custom".to_owned(), json!("metadata-wins"))]),
1389 vec![],
1390 );
1391
1392 let info = commit_info(&data);
1393 assert_eq!(info.user_name.as_deref(), Some("shadow-user"));
1394 let operation_parameters = info
1395 .operation_parameters
1396 .as_ref()
1397 .expect("operation parameters should be promoted");
1398 assert_eq!(
1399 operation_parameters.get("custom"),
1400 Some(&json!("{\"kept\":true}"))
1401 );
1402 assert_eq!(info.info.get("custom"), Some(&json!("metadata-wins")));
1403 assert!(!info.info.contains_key("userName"));
1404 assert!(!info.info.contains_key("operationParameters"));
1405 }
1406
1407 #[test]
1408 fn test_commit_data_does_not_promote_reserved_metadata_over_typed_fields() {
1409 let data = CommitData::new(
1410 vec![Action::CommitInfo(CommitInfo {
1411 user_name: Some("typed-user".to_owned()),
1412 read_version: Some(10),
1413 ..Default::default()
1414 })],
1415 DeltaOperation::FileSystemCheck {},
1416 HashMap::from([
1417 ("userName".to_owned(), json!("metadata-user")),
1418 ("readVersion".to_owned(), json!(11)),
1419 ("custom".to_owned(), json!("kept")),
1420 ]),
1421 vec![],
1422 );
1423
1424 let info = commit_info(&data);
1425 assert_eq!(info.user_name.as_deref(), Some("typed-user"));
1426 assert_eq!(info.read_version, Some(10));
1427 assert!(!info.info.contains_key("userName"));
1428 assert!(!info.info.contains_key("readVersion"));
1429 assert_eq!(info.info.get("custom"), Some(&json!("kept")));
1430 }
1431
1432 #[test]
1433 fn test_commit_data_strips_wrong_typed_reserved_metadata_without_clobbering_typed_fields() {
1434 let data = CommitData::new(
1435 vec![Action::CommitInfo(CommitInfo {
1436 user_name: Some("typed-user".to_owned()),
1437 read_version: Some(10),
1438 ..Default::default()
1439 })],
1440 DeltaOperation::FileSystemCheck {},
1441 HashMap::from([
1442 ("userName".to_owned(), json!(123)),
1443 ("readVersion".to_owned(), json!("abc")),
1444 ("isBlindAppend".to_owned(), json!("not-a-bool")),
1445 ("custom".to_owned(), json!("kept")),
1446 ]),
1447 vec![],
1448 );
1449
1450 let info = commit_info(&data);
1451 assert_eq!(info.user_name.as_deref(), Some("typed-user"));
1452 assert_eq!(info.read_version, Some(10));
1453 assert_eq!(info.info.get("custom"), Some(&json!("kept")));
1454 assert!(!info.info.contains_key("userName"));
1455 assert!(!info.info.contains_key("readVersion"));
1456 assert!(!info.info.contains_key("isBlindAppend"));
1457 }
1458}