Skip to main content

deltalake_core/kernel/transaction/
mod.rs

1//! Add a commit entry to the Delta Table.
2//! This module provides a unified interface for modifying commit behavior and attributes
3//!
4//! [`CommitProperties`] provides an unified client interface for all Delta operations.
5//! Internally this is used to initialize a [`CommitBuilder`].
6//!
7//! For advanced use cases [`CommitBuilder`] can be used which allows
8//! finer control over the commit process. The builder can be converted
9//! into a future the yield either a [`PreparedCommit`] or a [`FinalizedCommit`].
10//!
11//! A [`PreparedCommit`] represents a temporary commit marker written to storage.
12//! To convert to a [`FinalizedCommit`] an atomic rename is attempted. If the rename fails
13//! then conflict resolution is performed and the atomic rename is tried for the latest version.
14//!
15//!<pre>
16//!                                          Client Interface
17//!        ┌─────────────────────────────┐
18//!        │      Commit Properties      │
19//!        │                             │
20//!        │ Public commit interface for │
21//!        │     all Delta Operations    │
22//!        │                             │
23//!        └─────────────┬───────────────┘
24//!                      │
25//! ─────────────────────┼────────────────────────────────────
26//!                      │
27//!                      ▼                  Advanced Interface
28//!        ┌─────────────────────────────┐
29//!        │       Commit Builder        │
30//!        │                             │
31//!        │   Advanced entry point for  │
32//!        │     creating a commit       │
33//!        └─────────────┬───────────────┘
34//!                      │
35//!                      ▼
36//!     ┌───────────────────────────────────┐
37//!     │                                   │
38//!     │ ┌───────────────────────────────┐ │
39//!     │ │        Prepared Commit        │ │
40//!     │ │                               │ │
41//!     │ │     Represents a temporary    │ │
42//!     │ │   commit marker written to    │ │
43//!     │ │           storage             │ │
44//!     │ └──────────────┬────────────────┘ │
45//!     │                │                  │
46//!     │                ▼                  │
47//!     │ ┌───────────────────────────────┐ │
48//!     │ │       Finalize Commit         │ │
49//!     │ │                               │ │
50//!     │ │   Convert the commit marker   │ │
51//!     │ │   to a commit using atomic    │ │
52//!     │ │         operations            │ │
53//!     │ │                               │ │
54//!     │ └───────────────────────────────┘ │
55//!     │                                   │
56//!     └────────────────┬──────────────────┘
57//!                      │
58//!                      ▼
59//!       ┌───────────────────────────────┐
60//!       │          Post Commit          │
61//!       │                               │
62//!       │ Commit that was materialized  │
63//!       │ to storage with post commit   │
64//!       │      hooks to be executed     │
65//!       └──────────────┬────────────────┘
66//!                      │
67//!                      ▼
68//!       ┌───────────────────────────────┐
69//!       │        Finalized Commit       │
70//!       │                               │
71//!       │ Commit that was materialized  │
72//!       │         to storage            │
73//!       │                               │
74//!       └───────────────────────────────┘
75//!</pre>
76use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use delta_kernel::table_properties::TableProperties;
83use futures::future::BoxFuture;
84use object_store::Error as ObjectStoreError;
85use object_store::ObjectStoreExt as _;
86use object_store::path::Path;
87use serde_json::Value;
88use tracing::*;
89use uuid::Uuid;
90
91use delta_kernel::table_features::TableFeature;
92use serde::{Deserialize, Serialize};
93
94use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
95use crate::errors::DeltaTableError;
96use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction, Version};
97use crate::logstore::ObjectStoreRef;
98use crate::logstore::{CommitOrBytes, LogStoreRef};
99use crate::operations::CustomExecuteHandler;
100use crate::protocol::DeltaOperation;
101use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
102use crate::table::config::TablePropertiesExt as _;
103use crate::table::state::DeltaTableState;
104use crate::{DeltaResult, crate_version};
105
106pub use self::conflict_checker::CommitConflictError;
107pub use self::protocol::INSTANCE as PROTOCOL;
108
109#[cfg(test)]
110pub(crate) mod application;
111mod conflict_checker;
112mod protocol;
113#[cfg(feature = "datafusion")]
114mod state;
115
116const DELTA_LOG_FOLDER: &str = "_delta_log";
117pub(crate) const DEFAULT_RETRIES: usize = 15;
118
119#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
120#[serde(rename_all = "camelCase")]
121pub struct CommitMetrics {
122    /// Number of retries before a successful commit
123    pub num_retries: u64,
124}
125
126#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
127#[serde(rename_all = "camelCase")]
128pub struct PostCommitMetrics {
129    /// Whether a new checkpoint was created as part of this commit
130    pub new_checkpoint_created: bool,
131
132    /// Number of log files cleaned up
133    pub num_log_files_cleaned_up: u64,
134}
135
136#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct Metrics {
139    /// Number of retries before a successful commit
140    pub num_retries: u64,
141
142    /// Whether a new checkpoint was created as part of this commit
143    pub new_checkpoint_created: bool,
144
145    /// Number of log files cleaned up
146    pub num_log_files_cleaned_up: u64,
147}
148
149/// Error raised while commititng transaction
150#[derive(thiserror::Error, Debug)]
151pub enum TransactionError {
152    /// Version already exists
153    #[error("Tried committing existing table version: {0}")]
154    VersionAlreadyExists(Version),
155
156    /// Error returned when reading the delta log object failed.
157    #[error("Error serializing commit log to json: {json_err}")]
158    SerializeLogJson {
159        /// Commit log record JSON serialization error.
160        json_err: serde_json::error::Error,
161    },
162
163    /// Error returned when reading the delta log object failed.
164    #[error("Log storage error: {}", .source)]
165    ObjectStore {
166        /// Storage error details when reading the delta log object failed.
167        #[from]
168        source: ObjectStoreError,
169    },
170
171    /// Error returned when a commit conflict occurred
172    #[error("Failed to commit transaction: {0}")]
173    CommitConflict(#[from] CommitConflictError),
174
175    /// Error returned when maximum number of commit trioals is exceeded
176    #[error("Failed to commit transaction: {0}")]
177    MaxCommitAttempts(i32),
178
179    /// The transaction includes Remove action with data change but Delta table is append-only
180    #[error(
181        "The transaction includes Remove action with data change but Delta table is append-only"
182    )]
183    DeltaTableAppendOnly,
184
185    /// Error returned when unsupported table features are required
186    #[error("Unsupported table features required: {0:?}")]
187    UnsupportedTableFeatures(Vec<TableFeature>),
188
189    /// Error returned when table features are required but not specified
190    #[error("Table features must be specified, please specify: {0:?}")]
191    TableFeaturesRequired(TableFeature),
192
193    /// The transaction failed to commit due to an error in an implementation-specific layer.
194    /// Currently used by DynamoDb-backed S3 log store when database operations fail.
195    #[error("Transaction failed: {msg}")]
196    LogStoreError {
197        /// Detailed message for the commit failure.
198        msg: String,
199        /// underlying error in the log store transactional layer.
200        source: Box<dyn std::error::Error + Send + Sync + 'static>,
201    },
202}
203
204impl From<TransactionError> for DeltaTableError {
205    fn from(err: TransactionError) -> Self {
206        match err {
207            TransactionError::VersionAlreadyExists(version) => {
208                DeltaTableError::VersionAlreadyExists(version)
209            }
210            TransactionError::SerializeLogJson { json_err } => {
211                DeltaTableError::SerializeLogJson { json_err }
212            }
213            TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
214            other => DeltaTableError::Transaction { source: other },
215        }
216    }
217}
218
219/// Error raised while commititng transaction
220#[derive(thiserror::Error, Debug)]
221pub enum CommitBuilderError {}
222
223impl From<CommitBuilderError> for DeltaTableError {
224    fn from(err: CommitBuilderError) -> Self {
225        DeltaTableError::CommitValidation { source: err }
226    }
227}
228
229/// Reference to some structure that contains mandatory attributes for performing a commit.
230pub trait TableReference: Send + Sync {
231    /// Well known table configuration
232    fn config(&self) -> &TableProperties;
233
234    /// Get the table protocol of the snapshot
235    fn protocol(&self) -> &Protocol;
236
237    /// Get the table metadata of the snapshot
238    fn metadata(&self) -> &Metadata;
239
240    /// Try to cast this table reference to a `EagerSnapshot`
241    fn eager_snapshot(&self) -> &EagerSnapshot;
242}
243
244impl TableReference for EagerSnapshot {
245    fn protocol(&self) -> &Protocol {
246        EagerSnapshot::protocol(self)
247    }
248
249    fn metadata(&self) -> &Metadata {
250        EagerSnapshot::metadata(self)
251    }
252
253    fn config(&self) -> &TableProperties {
254        self.table_properties()
255    }
256
257    fn eager_snapshot(&self) -> &EagerSnapshot {
258        self
259    }
260}
261
262impl TableReference for DeltaTableState {
263    fn config(&self) -> &TableProperties {
264        self.snapshot.config()
265    }
266
267    fn protocol(&self) -> &Protocol {
268        self.snapshot.protocol()
269    }
270
271    fn metadata(&self) -> &Metadata {
272        self.snapshot.metadata()
273    }
274
275    fn eager_snapshot(&self) -> &EagerSnapshot {
276        &self.snapshot
277    }
278}
279
280/// Data that was actually written to the log store.
281#[derive(Debug)]
282pub struct CommitData {
283    /// The actions
284    pub actions: Vec<Action>,
285    /// The Operation
286    pub operation: DeltaOperation,
287    /// The Metadata
288    pub app_metadata: HashMap<String, Value>,
289    /// Application specific transaction
290    pub app_transactions: Vec<Transaction>,
291}
292
293impl CommitData {
294    /// Create new data to be committed
295    pub fn new(
296        mut actions: Vec<Action>,
297        operation: DeltaOperation,
298        mut app_metadata: HashMap<String, Value>,
299        app_transactions: Vec<Transaction>,
300    ) -> Self {
301        if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
302            let mut commit_info = operation.get_commit_info();
303            commit_info.timestamp = Some(Utc::now().timestamp_millis());
304            app_metadata
305                .entry("clientVersion".to_string())
306                .or_insert(Value::String(format!("delta-rs.{}", crate_version())));
307            app_metadata.extend(commit_info.info);
308            commit_info.info = app_metadata.clone();
309            // commit info should be the first action to support in-commit timestamps.
310            actions.insert(0, Action::CommitInfo(commit_info));
311        }
312
313        for txn in &app_transactions {
314            actions.push(Action::Txn(txn.clone()))
315        }
316
317        CommitData {
318            actions,
319            operation,
320            app_metadata,
321            app_transactions,
322        }
323    }
324
325    /// Obtain the byte representation of the commit.
326    pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
327        let mut jsons = Vec::<String>::new();
328        for action in &self.actions {
329            let json = serde_json::to_string(action)
330                .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
331            jsons.push(json);
332        }
333        Ok(bytes::Bytes::from(jsons.join("\n")))
334    }
335}
336
337#[derive(Clone, Debug, Copy)]
338/// Properties for post commit hook.
339pub struct PostCommitHookProperties {
340    create_checkpoint: bool,
341    /// Override the EnableExpiredLogCleanUp setting, if None config setting is used
342    cleanup_expired_logs: Option<bool>,
343}
344
345#[derive(Clone, Debug)]
346/// End user facing interface to be used by operations on the table.
347/// Enable controlling commit behaviour and modifying metadata that is written during a commit.
348pub struct CommitProperties {
349    pub(crate) app_metadata: HashMap<String, Value>,
350    pub(crate) app_transaction: Vec<Transaction>,
351    max_retries: usize,
352    create_checkpoint: bool,
353    cleanup_expired_logs: Option<bool>,
354}
355
356impl Default for CommitProperties {
357    fn default() -> Self {
358        Self {
359            app_metadata: Default::default(),
360            app_transaction: Vec::new(),
361            max_retries: DEFAULT_RETRIES,
362            create_checkpoint: true,
363            cleanup_expired_logs: None,
364        }
365    }
366}
367
368impl CommitProperties {
369    /// Specify metadata the be committed
370    pub fn with_metadata(
371        mut self,
372        metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
373    ) -> Self {
374        self.app_metadata = HashMap::from_iter(metadata);
375        self
376    }
377
378    /// Specify maximum number of times to retry the transaction before failing to commit
379    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
380        self.max_retries = max_retries;
381        self
382    }
383
384    /// Specify if it should create a checkpoint when the commit interval condition is met
385    pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
386        self.create_checkpoint = create_checkpoint;
387        self
388    }
389
390    /// Add an additional application transaction to the commit
391    pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
392        self.app_transaction.push(txn);
393        self
394    }
395
396    /// Override application transactions for the commit
397    pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
398        self.app_transaction = txn;
399        self
400    }
401
402    /// Specify if it should clean up the logs when the logRetentionDuration interval is met
403    pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
404        self.cleanup_expired_logs = cleanup_expired_logs;
405        self
406    }
407}
408
409impl From<CommitProperties> for CommitBuilder {
410    fn from(value: CommitProperties) -> Self {
411        CommitBuilder {
412            max_retries: value.max_retries,
413            app_metadata: value.app_metadata,
414            post_commit_hook: Some(PostCommitHookProperties {
415                create_checkpoint: value.create_checkpoint,
416                cleanup_expired_logs: value.cleanup_expired_logs,
417            }),
418            app_transaction: value.app_transaction,
419            ..Default::default()
420        }
421    }
422}
423
424/// Prepare data to be committed to the Delta log and control how the commit is performed
425pub struct CommitBuilder {
426    actions: Vec<Action>,
427    app_metadata: HashMap<String, Value>,
428    app_transaction: Vec<Transaction>,
429    max_retries: usize,
430    post_commit_hook: Option<PostCommitHookProperties>,
431    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
432    operation_id: Uuid,
433}
434
435impl Default for CommitBuilder {
436    fn default() -> Self {
437        CommitBuilder {
438            actions: Vec::new(),
439            app_metadata: HashMap::new(),
440            app_transaction: Vec::new(),
441            max_retries: DEFAULT_RETRIES,
442            post_commit_hook: None,
443            post_commit_hook_handler: None,
444            operation_id: Uuid::new_v4(),
445        }
446    }
447}
448
449impl<'a> CommitBuilder {
450    /// Actions to be included in the commit
451    pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
452        self.actions = actions;
453        self
454    }
455
456    /// Metadata for the operation performed like metrics, user, and notebook
457    pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
458        self.app_metadata = app_metadata;
459        self
460    }
461
462    /// Maximum number of times to retry the transaction before failing to commit
463    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
464        self.max_retries = max_retries;
465        self
466    }
467
468    /// Specify all the post commit hook properties
469    pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
470        self.post_commit_hook = Some(post_commit_hook);
471        self
472    }
473
474    /// Propagate operation id to log store
475    pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
476        self.operation_id = operation_id;
477        self
478    }
479
480    /// Set a custom execute handler, for pre and post execution
481    pub fn with_post_commit_hook_handler(
482        mut self,
483        handler: Option<Arc<dyn CustomExecuteHandler>>,
484    ) -> Self {
485        self.post_commit_hook_handler = handler;
486        self
487    }
488
489    /// Prepare a Commit operation using the configured builder
490    pub fn build(
491        self,
492        table_data: Option<&'a dyn TableReference>,
493        log_store: LogStoreRef,
494        operation: DeltaOperation,
495    ) -> PreCommit<'a> {
496        let data = CommitData::new(
497            self.actions,
498            operation,
499            self.app_metadata,
500            self.app_transaction,
501        );
502        PreCommit {
503            log_store,
504            table_data,
505            max_retries: self.max_retries,
506            data,
507            post_commit_hook: self.post_commit_hook,
508            post_commit_hook_handler: self.post_commit_hook_handler,
509            operation_id: self.operation_id,
510        }
511    }
512}
513
514/// Represents a commit that has not yet started but all details are finalized
515pub struct PreCommit<'a> {
516    log_store: LogStoreRef,
517    table_data: Option<&'a dyn TableReference>,
518    data: CommitData,
519    max_retries: usize,
520    post_commit_hook: Option<PostCommitHookProperties>,
521    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
522    operation_id: Uuid,
523}
524
525impl<'a> std::future::IntoFuture for PreCommit<'a> {
526    type Output = DeltaResult<FinalizedCommit>;
527    type IntoFuture = BoxFuture<'a, Self::Output>;
528
529    fn into_future(self) -> Self::IntoFuture {
530        Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
531    }
532}
533
534impl<'a> PreCommit<'a> {
535    /// Prepare the commit but do not finalize it
536    pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
537        let this = self;
538
539        // Write delta log entry as temporary file to storage. For the actual commit,
540        // the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
541        async fn write_tmp_commit(
542            log_entry: Bytes,
543            store: ObjectStoreRef,
544        ) -> DeltaResult<CommitOrBytes> {
545            let token = uuid::Uuid::new_v4().to_string();
546            let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
547            store.put(&path, log_entry.into()).await?;
548            Ok(CommitOrBytes::TmpCommit(path))
549        }
550
551        Box::pin(async move {
552            if let Some(table_reference) = this.table_data {
553                PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
554            }
555            let log_entry = this.data.get_bytes()?;
556
557            // With the DefaultLogStore & LakeFSLogstore, we just pass the bytes around, since we use conditionalPuts
558            // Other stores will use tmp_commits
559            let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
560                .contains(&this.log_store.name().as_str())
561            {
562                CommitOrBytes::LogBytes(log_entry)
563            } else {
564                write_tmp_commit(
565                    log_entry,
566                    this.log_store.object_store(Some(this.operation_id)),
567                )
568                .await?
569            };
570
571            Ok(PreparedCommit {
572                commit_or_bytes,
573                log_store: this.log_store,
574                table_data: this.table_data,
575                max_retries: this.max_retries,
576                data: this.data,
577                post_commit: this.post_commit_hook,
578                post_commit_hook_handler: this.post_commit_hook_handler,
579                operation_id: this.operation_id,
580            })
581        })
582    }
583}
584
585/// Represents a inflight commit
586pub struct PreparedCommit<'a> {
587    commit_or_bytes: CommitOrBytes,
588    log_store: LogStoreRef,
589    data: CommitData,
590    table_data: Option<&'a dyn TableReference>,
591    max_retries: usize,
592    post_commit: Option<PostCommitHookProperties>,
593    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
594    operation_id: Uuid,
595}
596
597impl PreparedCommit<'_> {
598    /// The temporary commit file created
599    pub fn commit_or_bytes(&self) -> &CommitOrBytes {
600        &self.commit_or_bytes
601    }
602}
603
604impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
605    type Output = DeltaResult<PostCommit>;
606    type IntoFuture = BoxFuture<'a, Self::Output>;
607
608    fn into_future(self) -> Self::IntoFuture {
609        let this = self;
610
611        Box::pin(async move {
612            let commit_or_bytes = this.commit_or_bytes;
613
614            let mut attempt_number: usize = 1;
615
616            // Handle the case where table doesn't exist yet (initial table creation)
617            let read_snapshot: EagerSnapshot = if let Some(table_data) = this.table_data {
618                table_data.eager_snapshot().clone()
619            } else {
620                debug!("committing initial table version 0");
621                match this
622                    .log_store
623                    .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
624                    .await
625                {
626                    Ok(_) => {
627                        return Ok(PostCommit {
628                            version: 0,
629                            data: this.data,
630                            create_checkpoint: false,
631                            cleanup_expired_logs: None,
632                            log_store: this.log_store,
633                            table_data: None,
634                            custom_execute_handler: this.post_commit_hook_handler,
635                            metrics: CommitMetrics { num_retries: 0 },
636                        });
637                    }
638                    Err(TransactionError::VersionAlreadyExists(0)) => {
639                        // Table was created by another writer since the `this.table_data.is_none()` check.
640                        // Load the current table state and continue with the retry loop.
641                        debug!("version 0 already exists, loading table state for retry");
642                        attempt_number = 2;
643                        let latest_version: Version = this.log_store.get_latest_version(0).await?;
644                        EagerSnapshot::try_new(
645                            this.log_store.as_ref(),
646                            Default::default(),
647                            Some(latest_version),
648                        )
649                        .await?
650                    }
651                    Err(e) => return Err(e.into()),
652                }
653            };
654
655            let mut read_snapshot = read_snapshot;
656
657            let commit_span = info_span!(
658                "commit_with_retries",
659                base_version = read_snapshot.version(),
660                max_retries = this.max_retries,
661                attempt = field::Empty,
662                target_version = field::Empty,
663                conflicts_checked = 0
664            );
665
666            async move {
667                let total_retries = this.max_retries + 1;
668                while attempt_number <= total_retries {
669                    Span::current().record("attempt", attempt_number);
670                    let latest_version = this
671                        .log_store
672                        .get_latest_version(read_snapshot.version())
673                        .await?;
674
675                    if latest_version > read_snapshot.version() {
676                        // If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
677                        // and throw immediately
678                        if this.max_retries == 0 {
679                            warn!(
680                                base_version = read_snapshot.version(),
681                                latest_version = latest_version,
682                                "table updated but max_retries is 0, failing immediately"
683                            );
684                            return Err(TransactionError::MaxCommitAttempts(
685                                this.max_retries as i32,
686                            )
687                            .into());
688                        }
689                        warn!(
690                            base_version = read_snapshot.version(),
691                            latest_version = latest_version,
692                            versions_behind = latest_version - read_snapshot.version(),
693                            "table updated during transaction, checking for conflicts"
694                        );
695                        let mut steps = latest_version - read_snapshot.version();
696                        let mut conflicts_checked = 0;
697
698                        // Need to check for conflicts with each version between the read_snapshot and
699                        // the latest!
700                        while steps != 0 {
701                            conflicts_checked += 1;
702                            let summary = WinningCommitSummary::try_new(
703                                this.log_store.as_ref(),
704                                latest_version - steps,
705                                (latest_version - steps) + 1,
706                            )
707                            .await?;
708                            let transaction_info = TransactionInfo::try_new(
709                                read_snapshot.log_data(),
710                                this.data.operation.read_predicate(),
711                                &this.data.actions,
712                                this.data.operation.read_whole_table(),
713                            )?;
714                            let conflict_checker = ConflictChecker::new(
715                                transaction_info,
716                                summary,
717                                Some(&this.data.operation),
718                            );
719
720                            match conflict_checker.check_conflicts() {
721                                Ok(_) => {}
722                                Err(err) => {
723                                    error!(
724                                        conflicts_checked = conflicts_checked,
725                                        error = %err,
726                                        "conflict detected, aborting transaction"
727                                    );
728                                    return Err(TransactionError::CommitConflict(err).into());
729                                }
730                            }
731                            steps -= 1;
732                        }
733                        Span::current().record("conflicts_checked", conflicts_checked);
734                        debug!(
735                            conflicts_checked = conflicts_checked,
736                            "all conflicts resolved, updating snapshot"
737                        );
738                        // Update snapshot to latest version after successful conflict check
739                        read_snapshot
740                            .update(&this.log_store, Some(latest_version))
741                            .await?;
742                    }
743                    let version: Version = latest_version + 1;
744                    Span::current().record("target_version", version);
745
746                    match this
747                        .log_store
748                        .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
749                        .await
750                    {
751                        Ok(()) => {
752                            info!(
753                                version = version,
754                                num_retries = attempt_number - 1,
755                                "transaction committed successfully"
756                            );
757                            return Ok(PostCommit {
758                                version,
759                                data: this.data,
760                                create_checkpoint: this
761                                    .post_commit
762                                    .map(|v| v.create_checkpoint)
763                                    .unwrap_or_default(),
764                                cleanup_expired_logs: this
765                                    .post_commit
766                                    .map(|v| v.cleanup_expired_logs)
767                                    .unwrap_or_default(),
768                                log_store: this.log_store,
769                                table_data: Some(Box::new(read_snapshot)),
770                                custom_execute_handler: this.post_commit_hook_handler,
771                                metrics: CommitMetrics {
772                                    num_retries: (attempt_number - 1) as u64,
773                                },
774                            });
775                        }
776                        Err(TransactionError::VersionAlreadyExists(version)) => {
777                            warn!(
778                                version = version,
779                                attempt = attempt_number,
780                                "version already exists, will retry"
781                            );
782                            // If the version already exists, loop through again and re-check
783                            // conflicts
784                            attempt_number += 1;
785                        }
786                        Err(err) => {
787                            error!(
788                                version = version,
789                                error = %err,
790                                "commit failed, aborting"
791                            );
792                            this.log_store
793                                .abort_commit_entry(version, commit_or_bytes, this.operation_id)
794                                .await?;
795                            return Err(err.into());
796                        }
797                    }
798                }
799
800                error!(
801                    max_retries = this.max_retries,
802                    "exceeded maximum commit attempts"
803                );
804                Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
805            }
806            .instrument(commit_span)
807            .await
808        })
809    }
810}
811
812/// Represents items for the post commit hook
813pub struct PostCommit {
814    /// The winning version number of the commit
815    pub version: Version,
816    /// The data that was committed to the log store
817    pub data: CommitData,
818    create_checkpoint: bool,
819    cleanup_expired_logs: Option<bool>,
820    log_store: LogStoreRef,
821    table_data: Option<Box<dyn TableReference>>,
822    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
823    metrics: CommitMetrics,
824}
825
826impl PostCommit {
827    /// Runs the post commit activities
828    async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
829        if let Some(table) = &self.table_data {
830            let post_commit_operation_id = Uuid::new_v4();
831            let mut snapshot = table.eager_snapshot().clone();
832            if self.version != snapshot.version() {
833                snapshot.update(&self.log_store, Some(self.version)).await?;
834            }
835
836            let mut state = DeltaTableState { snapshot };
837
838            let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
839                cleanup_logs
840            } else {
841                state.table_config().enable_expired_log_cleanup()
842            };
843
844            // Run arbitrary before_post_commit_hook code
845            if let Some(custom_execute_handler) = &self.custom_execute_handler {
846                custom_execute_handler
847                    .before_post_commit_hook(
848                        &self.log_store,
849                        cleanup_logs || self.create_checkpoint,
850                        post_commit_operation_id,
851                    )
852                    .await?
853            }
854
855            let mut new_checkpoint_created = false;
856            if self.create_checkpoint {
857                // Execute create checkpoint hook
858                new_checkpoint_created = self
859                    .create_checkpoint(
860                        &state,
861                        &self.log_store,
862                        self.version,
863                        post_commit_operation_id,
864                    )
865                    .await?;
866            }
867
868            let mut num_log_files_cleaned_up: u64 = 0;
869            if cleanup_logs {
870                // Execute clean up logs hook
871                num_log_files_cleaned_up = cleanup_expired_logs_for(
872                    self.version,
873                    self.log_store.as_ref(),
874                    Utc::now().timestamp_millis()
875                        - state.table_config().log_retention_duration().as_millis() as i64,
876                    Some(post_commit_operation_id),
877                )
878                .await? as u64;
879                if num_log_files_cleaned_up > 0 {
880                    state = DeltaTableState::try_new(
881                        &self.log_store,
882                        state.load_config().clone(),
883                        Some(self.version),
884                    )
885                    .await?;
886                }
887            }
888
889            // Run arbitrary after_post_commit_hook code
890            if let Some(custom_execute_handler) = &self.custom_execute_handler {
891                custom_execute_handler
892                    .after_post_commit_hook(
893                        &self.log_store,
894                        cleanup_logs || self.create_checkpoint,
895                        post_commit_operation_id,
896                    )
897                    .await?
898            }
899            Ok((
900                state,
901                PostCommitMetrics {
902                    new_checkpoint_created,
903                    num_log_files_cleaned_up,
904                },
905            ))
906        } else {
907            let state =
908                DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
909                    .await?;
910            Ok((
911                state,
912                PostCommitMetrics {
913                    new_checkpoint_created: false,
914                    num_log_files_cleaned_up: 0,
915                },
916            ))
917        }
918    }
919    async fn create_checkpoint(
920        &self,
921        table_state: &DeltaTableState,
922        log_store: &LogStoreRef,
923        version: Version,
924        operation_id: Uuid,
925    ) -> DeltaResult<bool> {
926        if !table_state.load_config().require_files {
927            warn!(
928                "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
929            );
930            return Ok(false);
931        }
932
933        let checkpoint_interval = table_state.config().checkpoint_interval().get();
934        if (version + 1).is_multiple_of(checkpoint_interval) {
935            create_checkpoint_for(version, log_store.as_ref(), Some(operation_id)).await?;
936            Ok(true)
937        } else {
938            Ok(false)
939        }
940    }
941}
942
943/// A commit that successfully completed
944pub struct FinalizedCommit {
945    /// The new table state after a commit
946    pub snapshot: DeltaTableState,
947
948    /// Version of the finalized commit
949    pub version: Version,
950
951    /// Metrics associated with the commit operation
952    pub metrics: Metrics,
953}
954
955impl std::fmt::Debug for FinalizedCommit {
956    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
957        f.debug_struct("FinalizedCommit")
958            .field("version", &self.version)
959            .field("metrics", &self.metrics)
960            .finish()
961    }
962}
963
964impl FinalizedCommit {
965    /// The new table state after a commit
966    pub fn snapshot(&self) -> DeltaTableState {
967        self.snapshot.clone()
968    }
969    /// Version of the finalized commit
970    pub fn version(&self) -> Version {
971        self.version
972    }
973}
974
975impl std::future::IntoFuture for PostCommit {
976    type Output = DeltaResult<FinalizedCommit>;
977    type IntoFuture = BoxFuture<'static, Self::Output>;
978
979    fn into_future(self) -> Self::IntoFuture {
980        let this = self;
981
982        Box::pin(async move {
983            match this.run_post_commit_hook().await {
984                Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
985                    snapshot,
986                    version: this.version,
987                    metrics: Metrics {
988                        num_retries: this.metrics.num_retries,
989                        new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
990                        num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
991                    },
992                }),
993                Err(err) => Err(err),
994            }
995        })
996    }
997}
998
999#[cfg(test)]
1000mod tests {
1001    use std::sync::Arc;
1002
1003    use super::*;
1004    use crate::logstore::{LogStore, StorageConfig, default_logstore::DefaultLogStore};
1005    use object_store::{ObjectStore, PutPayload, memory::InMemory};
1006    use serde_json::json;
1007    use url::Url;
1008
1009    #[tokio::test]
1010    async fn test_try_commit_transaction() {
1011        let store = Arc::new(InMemory::new());
1012        let url = Url::parse("mem://what/is/this").unwrap();
1013        let log_store = DefaultLogStore::new(
1014            store.clone(),
1015            store.clone(),
1016            crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1017        );
1018        let version_path = Path::from("_delta_log/00000000000000000000.json");
1019        store.put(&version_path, PutPayload::new()).await.unwrap();
1020
1021        let res = log_store
1022            .write_commit_entry(
1023                0,
1024                CommitOrBytes::LogBytes(PutPayload::new().into()),
1025                Uuid::new_v4(),
1026            )
1027            .await;
1028        // fails if file version already exists
1029        assert!(res.is_err());
1030
1031        // succeeds for next version
1032        log_store
1033            .write_commit_entry(
1034                1,
1035                CommitOrBytes::LogBytes(PutPayload::new().into()),
1036                Uuid::new_v4(),
1037            )
1038            .await
1039            .unwrap();
1040    }
1041
1042    #[test]
1043    fn test_commit_with_retries_tracing_span() {
1044        let span = info_span!(
1045            "commit_with_retries",
1046            base_version = 5,
1047            max_retries = 10,
1048            attempt = field::Empty,
1049            target_version = field::Empty,
1050            conflicts_checked = 0
1051        );
1052
1053        let metadata = span.metadata().expect("span should have metadata");
1054        assert_eq!(metadata.name(), "commit_with_retries");
1055        assert_eq!(metadata.level(), &Level::INFO);
1056        assert!(metadata.is_span());
1057
1058        span.record("attempt", 1);
1059        span.record("target_version", 6);
1060        span.record("conflicts_checked", 2);
1061    }
1062
1063    #[test]
1064    fn test_commit_properties_with_retries() {
1065        let props = CommitProperties::default()
1066            .with_max_retries(5)
1067            .with_create_checkpoint(false);
1068
1069        assert_eq!(props.max_retries, 5);
1070        assert!(!props.create_checkpoint);
1071    }
1072
1073    #[test]
1074    fn test_commit_metrics() {
1075        let metrics = CommitMetrics { num_retries: 3 };
1076        assert_eq!(metrics.num_retries, 3);
1077    }
1078
1079    #[test]
1080    fn test_commit_data_client_version() {
1081        let no_metadata = CommitData::new(
1082            vec![],
1083            DeltaOperation::FileSystemCheck {},
1084            HashMap::new(),
1085            vec![],
1086        );
1087        assert_eq!(
1088            *no_metadata.app_metadata.get("clientVersion").unwrap(),
1089            json!(format!("delta-rs.{}", crate_version()))
1090        );
1091
1092        let with_metadata = CommitData::new(
1093            vec![],
1094            DeltaOperation::FileSystemCheck {},
1095            HashMap::from([("clientVersion".to_owned(), json!("test-client.0.0.1"))]),
1096            vec![],
1097        );
1098        assert_eq!(
1099            *with_metadata.app_metadata.get("clientVersion").unwrap(),
1100            json!("test-client.0.0.1")
1101        );
1102    }
1103}