Skip to main content

deltalake_core/kernel/transaction/
mod.rs

1//! Add a commit entry to the Delta Table.
2//! This module provides a unified interface for modifying commit behavior and attributes
3//!
4//! [`CommitProperties`] provides an unified client interface for all Delta operations.
5//! Internally this is used to initialize a [`CommitBuilder`].
6//!
7//! For advanced use cases [`CommitBuilder`] can be used which allows
8//! finer control over the commit process. The builder can be converted
9//! into a future the yield either a [`PreparedCommit`] or a [`FinalizedCommit`].
10//!
11//! A [`PreparedCommit`] represents a temporary commit marker written to storage.
12//! To convert to a [`FinalizedCommit`] an atomic rename is attempted. If the rename fails
13//! then conflict resolution is performed and the atomic rename is tried for the latest version.
14//!
15//!<pre>
16//!                                          Client Interface
17//!        ┌─────────────────────────────┐
18//!        │      Commit Properties      │
19//!        │                             │
20//!        │ Public commit interface for │
21//!        │     all Delta Operations    │
22//!        │                             │
23//!        └─────────────┬───────────────┘
24//!                      │
25//! ─────────────────────┼────────────────────────────────────
26//!                      │
27//!                      ▼                  Advanced Interface
28//!        ┌─────────────────────────────┐
29//!        │       Commit Builder        │
30//!        │                             │
31//!        │   Advanced entry point for  │
32//!        │     creating a commit       │
33//!        └─────────────┬───────────────┘
34//!                      │
35//!                      ▼
36//!     ┌───────────────────────────────────┐
37//!     │                                   │
38//!     │ ┌───────────────────────────────┐ │
39//!     │ │        Prepared Commit        │ │
40//!     │ │                               │ │
41//!     │ │     Represents a temporary    │ │
42//!     │ │   commit marker written to    │ │
43//!     │ │           storage             │ │
44//!     │ └──────────────┬────────────────┘ │
45//!     │                │                  │
46//!     │                ▼                  │
47//!     │ ┌───────────────────────────────┐ │
48//!     │ │       Finalize Commit         │ │
49//!     │ │                               │ │
50//!     │ │   Convert the commit marker   │ │
51//!     │ │   to a commit using atomic    │ │
52//!     │ │         operations            │ │
53//!     │ │                               │ │
54//!     │ └───────────────────────────────┘ │
55//!     │                                   │
56//!     └────────────────┬──────────────────┘
57//!                      │
58//!                      ▼
59//!       ┌───────────────────────────────┐
60//!       │          Post Commit          │
61//!       │                               │
62//!       │ Commit that was materialized  │
63//!       │ to storage with post commit   │
64//!       │      hooks to be executed     │
65//!       └──────────────┬────────────────┘
66//!                      │
67//!                      ▼
68//!       ┌───────────────────────────────┐
69//!       │        Finalized Commit       │
70//!       │                               │
71//!       │ Commit that was materialized  │
72//!       │         to storage            │
73//!       │                               │
74//!       └───────────────────────────────┘
75//!</pre>
76use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use delta_kernel::table_properties::TableProperties;
83use futures::future::BoxFuture;
84use object_store::Error as ObjectStoreError;
85use object_store::path::Path;
86use serde_json::Value;
87use tracing::*;
88use uuid::Uuid;
89
90use delta_kernel::table_features::TableFeature;
91use serde::{Deserialize, Serialize};
92
93use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
94use crate::errors::DeltaTableError;
95use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction};
96use crate::logstore::ObjectStoreRef;
97use crate::logstore::{CommitOrBytes, LogStoreRef};
98use crate::operations::CustomExecuteHandler;
99use crate::protocol::DeltaOperation;
100use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
101use crate::table::config::TablePropertiesExt as _;
102use crate::table::state::DeltaTableState;
103use crate::{DeltaResult, crate_version};
104
105pub use self::conflict_checker::CommitConflictError;
106pub use self::protocol::INSTANCE as PROTOCOL;
107
108#[cfg(test)]
109pub(crate) mod application;
110mod conflict_checker;
111mod protocol;
112#[cfg(feature = "datafusion")]
113mod state;
114
115const DELTA_LOG_FOLDER: &str = "_delta_log";
116pub(crate) const DEFAULT_RETRIES: usize = 15;
117
118#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
119#[serde(rename_all = "camelCase")]
120pub struct CommitMetrics {
121    /// Number of retries before a successful commit
122    pub num_retries: u64,
123}
124
125#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
126#[serde(rename_all = "camelCase")]
127pub struct PostCommitMetrics {
128    /// Whether a new checkpoint was created as part of this commit
129    pub new_checkpoint_created: bool,
130
131    /// Number of log files cleaned up
132    pub num_log_files_cleaned_up: u64,
133}
134
135#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
136#[serde(rename_all = "camelCase")]
137pub struct Metrics {
138    /// Number of retries before a successful commit
139    pub num_retries: u64,
140
141    /// Whether a new checkpoint was created as part of this commit
142    pub new_checkpoint_created: bool,
143
144    /// Number of log files cleaned up
145    pub num_log_files_cleaned_up: u64,
146}
147
148/// Error raised while commititng transaction
149#[derive(thiserror::Error, Debug)]
150pub enum TransactionError {
151    /// Version already exists
152    #[error("Tried committing existing table version: {0}")]
153    VersionAlreadyExists(i64),
154
155    /// Error returned when reading the delta log object failed.
156    #[error("Error serializing commit log to json: {json_err}")]
157    SerializeLogJson {
158        /// Commit log record JSON serialization error.
159        json_err: serde_json::error::Error,
160    },
161
162    /// Error returned when reading the delta log object failed.
163    #[error("Log storage error: {}", .source)]
164    ObjectStore {
165        /// Storage error details when reading the delta log object failed.
166        #[from]
167        source: ObjectStoreError,
168    },
169
170    /// Error returned when a commit conflict occurred
171    #[error("Failed to commit transaction: {0}")]
172    CommitConflict(#[from] CommitConflictError),
173
174    /// Error returned when maximum number of commit trioals is exceeded
175    #[error("Failed to commit transaction: {0}")]
176    MaxCommitAttempts(i32),
177
178    /// The transaction includes Remove action with data change but Delta table is append-only
179    #[error(
180        "The transaction includes Remove action with data change but Delta table is append-only"
181    )]
182    DeltaTableAppendOnly,
183
184    /// Error returned when unsupported table features are required
185    #[error("Unsupported table features required: {0:?}")]
186    UnsupportedTableFeatures(Vec<TableFeature>),
187
188    /// Error returned when table features are required but not specified
189    #[error("Table features must be specified, please specify: {0:?}")]
190    TableFeaturesRequired(TableFeature),
191
192    /// The transaction failed to commit due to an error in an implementation-specific layer.
193    /// Currently used by DynamoDb-backed S3 log store when database operations fail.
194    #[error("Transaction failed: {msg}")]
195    LogStoreError {
196        /// Detailed message for the commit failure.
197        msg: String,
198        /// underlying error in the log store transactional layer.
199        source: Box<dyn std::error::Error + Send + Sync + 'static>,
200    },
201}
202
203impl From<TransactionError> for DeltaTableError {
204    fn from(err: TransactionError) -> Self {
205        match err {
206            TransactionError::VersionAlreadyExists(version) => {
207                DeltaTableError::VersionAlreadyExists(version)
208            }
209            TransactionError::SerializeLogJson { json_err } => {
210                DeltaTableError::SerializeLogJson { json_err }
211            }
212            TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
213            other => DeltaTableError::Transaction { source: other },
214        }
215    }
216}
217
218/// Error raised while commititng transaction
219#[derive(thiserror::Error, Debug)]
220pub enum CommitBuilderError {}
221
222impl From<CommitBuilderError> for DeltaTableError {
223    fn from(err: CommitBuilderError) -> Self {
224        DeltaTableError::CommitValidation { source: err }
225    }
226}
227
228/// Reference to some structure that contains mandatory attributes for performing a commit.
229pub trait TableReference: Send + Sync {
230    /// Well known table configuration
231    fn config(&self) -> &TableProperties;
232
233    /// Get the table protocol of the snapshot
234    fn protocol(&self) -> &Protocol;
235
236    /// Get the table metadata of the snapshot
237    fn metadata(&self) -> &Metadata;
238
239    /// Try to cast this table reference to a `EagerSnapshot`
240    fn eager_snapshot(&self) -> &EagerSnapshot;
241}
242
243impl TableReference for EagerSnapshot {
244    fn protocol(&self) -> &Protocol {
245        EagerSnapshot::protocol(self)
246    }
247
248    fn metadata(&self) -> &Metadata {
249        EagerSnapshot::metadata(self)
250    }
251
252    fn config(&self) -> &TableProperties {
253        self.table_properties()
254    }
255
256    fn eager_snapshot(&self) -> &EagerSnapshot {
257        self
258    }
259}
260
261impl TableReference for DeltaTableState {
262    fn config(&self) -> &TableProperties {
263        self.snapshot.config()
264    }
265
266    fn protocol(&self) -> &Protocol {
267        self.snapshot.protocol()
268    }
269
270    fn metadata(&self) -> &Metadata {
271        self.snapshot.metadata()
272    }
273
274    fn eager_snapshot(&self) -> &EagerSnapshot {
275        &self.snapshot
276    }
277}
278
279/// Data that was actually written to the log store.
280#[derive(Debug)]
281pub struct CommitData {
282    /// The actions
283    pub actions: Vec<Action>,
284    /// The Operation
285    pub operation: DeltaOperation,
286    /// The Metadata
287    pub app_metadata: HashMap<String, Value>,
288    /// Application specific transaction
289    pub app_transactions: Vec<Transaction>,
290}
291
292impl CommitData {
293    /// Create new data to be committed
294    pub fn new(
295        mut actions: Vec<Action>,
296        operation: DeltaOperation,
297        mut app_metadata: HashMap<String, Value>,
298        app_transactions: Vec<Transaction>,
299    ) -> Self {
300        if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
301            let mut commit_info = operation.get_commit_info();
302            commit_info.timestamp = Some(Utc::now().timestamp_millis());
303            app_metadata.insert(
304                "clientVersion".to_string(),
305                Value::String(format!("delta-rs.{}", crate_version())),
306            );
307            app_metadata.extend(commit_info.info);
308            commit_info.info = app_metadata.clone();
309            actions.push(Action::CommitInfo(commit_info))
310        }
311
312        for txn in &app_transactions {
313            actions.push(Action::Txn(txn.clone()))
314        }
315
316        CommitData {
317            actions,
318            operation,
319            app_metadata,
320            app_transactions,
321        }
322    }
323
324    /// Obtain the byte representation of the commit.
325    pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
326        let mut jsons = Vec::<String>::new();
327        for action in &self.actions {
328            let json = serde_json::to_string(action)
329                .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
330            jsons.push(json);
331        }
332        Ok(bytes::Bytes::from(jsons.join("\n")))
333    }
334}
335
336#[derive(Clone, Debug, Copy)]
337/// Properties for post commit hook.
338pub struct PostCommitHookProperties {
339    create_checkpoint: bool,
340    /// Override the EnableExpiredLogCleanUp setting, if None config setting is used
341    cleanup_expired_logs: Option<bool>,
342}
343
344#[derive(Clone, Debug)]
345/// End user facing interface to be used by operations on the table.
346/// Enable controlling commit behaviour and modifying metadata that is written during a commit.
347pub struct CommitProperties {
348    pub(crate) app_metadata: HashMap<String, Value>,
349    pub(crate) app_transaction: Vec<Transaction>,
350    max_retries: usize,
351    create_checkpoint: bool,
352    cleanup_expired_logs: Option<bool>,
353}
354
355impl Default for CommitProperties {
356    fn default() -> Self {
357        Self {
358            app_metadata: Default::default(),
359            app_transaction: Vec::new(),
360            max_retries: DEFAULT_RETRIES,
361            create_checkpoint: true,
362            cleanup_expired_logs: None,
363        }
364    }
365}
366
367impl CommitProperties {
368    /// Specify metadata the be committed
369    pub fn with_metadata(
370        mut self,
371        metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
372    ) -> Self {
373        self.app_metadata = HashMap::from_iter(metadata);
374        self
375    }
376
377    /// Specify maximum number of times to retry the transaction before failing to commit
378    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
379        self.max_retries = max_retries;
380        self
381    }
382
383    /// Specify if it should create a checkpoint when the commit interval condition is met
384    pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
385        self.create_checkpoint = create_checkpoint;
386        self
387    }
388
389    /// Add an additional application transaction to the commit
390    pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
391        self.app_transaction.push(txn);
392        self
393    }
394
395    /// Override application transactions for the commit
396    pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
397        self.app_transaction = txn;
398        self
399    }
400
401    /// Specify if it should clean up the logs when the logRetentionDuration interval is met
402    pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
403        self.cleanup_expired_logs = cleanup_expired_logs;
404        self
405    }
406}
407
408impl From<CommitProperties> for CommitBuilder {
409    fn from(value: CommitProperties) -> Self {
410        CommitBuilder {
411            max_retries: value.max_retries,
412            app_metadata: value.app_metadata,
413            post_commit_hook: Some(PostCommitHookProperties {
414                create_checkpoint: value.create_checkpoint,
415                cleanup_expired_logs: value.cleanup_expired_logs,
416            }),
417            app_transaction: value.app_transaction,
418            ..Default::default()
419        }
420    }
421}
422
423/// Prepare data to be committed to the Delta log and control how the commit is performed
424pub struct CommitBuilder {
425    actions: Vec<Action>,
426    app_metadata: HashMap<String, Value>,
427    app_transaction: Vec<Transaction>,
428    max_retries: usize,
429    post_commit_hook: Option<PostCommitHookProperties>,
430    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
431    operation_id: Uuid,
432}
433
434impl Default for CommitBuilder {
435    fn default() -> Self {
436        CommitBuilder {
437            actions: Vec::new(),
438            app_metadata: HashMap::new(),
439            app_transaction: Vec::new(),
440            max_retries: DEFAULT_RETRIES,
441            post_commit_hook: None,
442            post_commit_hook_handler: None,
443            operation_id: Uuid::new_v4(),
444        }
445    }
446}
447
448impl<'a> CommitBuilder {
449    /// Actions to be included in the commit
450    pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
451        self.actions = actions;
452        self
453    }
454
455    /// Metadata for the operation performed like metrics, user, and notebook
456    pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
457        self.app_metadata = app_metadata;
458        self
459    }
460
461    /// Maximum number of times to retry the transaction before failing to commit
462    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
463        self.max_retries = max_retries;
464        self
465    }
466
467    /// Specify all the post commit hook properties
468    pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
469        self.post_commit_hook = Some(post_commit_hook);
470        self
471    }
472
473    /// Propagate operation id to log store
474    pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
475        self.operation_id = operation_id;
476        self
477    }
478
479    /// Set a custom execute handler, for pre and post execution
480    pub fn with_post_commit_hook_handler(
481        mut self,
482        handler: Option<Arc<dyn CustomExecuteHandler>>,
483    ) -> Self {
484        self.post_commit_hook_handler = handler;
485        self
486    }
487
488    /// Prepare a Commit operation using the configured builder
489    pub fn build(
490        self,
491        table_data: Option<&'a dyn TableReference>,
492        log_store: LogStoreRef,
493        operation: DeltaOperation,
494    ) -> PreCommit<'a> {
495        let data = CommitData::new(
496            self.actions,
497            operation,
498            self.app_metadata,
499            self.app_transaction,
500        );
501        PreCommit {
502            log_store,
503            table_data,
504            max_retries: self.max_retries,
505            data,
506            post_commit_hook: self.post_commit_hook,
507            post_commit_hook_handler: self.post_commit_hook_handler,
508            operation_id: self.operation_id,
509        }
510    }
511}
512
513/// Represents a commit that has not yet started but all details are finalized
514pub struct PreCommit<'a> {
515    log_store: LogStoreRef,
516    table_data: Option<&'a dyn TableReference>,
517    data: CommitData,
518    max_retries: usize,
519    post_commit_hook: Option<PostCommitHookProperties>,
520    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
521    operation_id: Uuid,
522}
523
524impl<'a> std::future::IntoFuture for PreCommit<'a> {
525    type Output = DeltaResult<FinalizedCommit>;
526    type IntoFuture = BoxFuture<'a, Self::Output>;
527
528    fn into_future(self) -> Self::IntoFuture {
529        Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
530    }
531}
532
533impl<'a> PreCommit<'a> {
534    /// Prepare the commit but do not finalize it
535    pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
536        let this = self;
537
538        // Write delta log entry as temporary file to storage. For the actual commit,
539        // the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
540        async fn write_tmp_commit(
541            log_entry: Bytes,
542            store: ObjectStoreRef,
543        ) -> DeltaResult<CommitOrBytes> {
544            let token = uuid::Uuid::new_v4().to_string();
545            let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
546            store.put(&path, log_entry.into()).await?;
547            Ok(CommitOrBytes::TmpCommit(path))
548        }
549
550        Box::pin(async move {
551            if let Some(table_reference) = this.table_data {
552                PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
553            }
554            let log_entry = this.data.get_bytes()?;
555
556            // With the DefaultLogStore & LakeFSLogstore, we just pass the bytes around, since we use conditionalPuts
557            // Other stores will use tmp_commits
558            let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
559                .contains(&this.log_store.name().as_str())
560            {
561                CommitOrBytes::LogBytes(log_entry)
562            } else {
563                write_tmp_commit(
564                    log_entry,
565                    this.log_store.object_store(Some(this.operation_id)),
566                )
567                .await?
568            };
569
570            Ok(PreparedCommit {
571                commit_or_bytes,
572                log_store: this.log_store,
573                table_data: this.table_data,
574                max_retries: this.max_retries,
575                data: this.data,
576                post_commit: this.post_commit_hook,
577                post_commit_hook_handler: this.post_commit_hook_handler,
578                operation_id: this.operation_id,
579            })
580        })
581    }
582}
583
584/// Represents a inflight commit
585pub struct PreparedCommit<'a> {
586    commit_or_bytes: CommitOrBytes,
587    log_store: LogStoreRef,
588    data: CommitData,
589    table_data: Option<&'a dyn TableReference>,
590    max_retries: usize,
591    post_commit: Option<PostCommitHookProperties>,
592    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
593    operation_id: Uuid,
594}
595
596impl PreparedCommit<'_> {
597    /// The temporary commit file created
598    pub fn commit_or_bytes(&self) -> &CommitOrBytes {
599        &self.commit_or_bytes
600    }
601}
602
603impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
604    type Output = DeltaResult<PostCommit>;
605    type IntoFuture = BoxFuture<'a, Self::Output>;
606
607    fn into_future(self) -> Self::IntoFuture {
608        let this = self;
609
610        Box::pin(async move {
611            let commit_or_bytes = this.commit_or_bytes;
612
613            let mut attempt_number: usize = 1;
614
615            // Handle the case where table doesn't exist yet (initial table creation)
616            let read_snapshot: EagerSnapshot = if this.table_data.is_none() {
617                debug!("committing initial table version 0");
618                match this
619                    .log_store
620                    .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
621                    .await
622                {
623                    Ok(_) => {
624                        return Ok(PostCommit {
625                            version: 0,
626                            data: this.data,
627                            create_checkpoint: false,
628                            cleanup_expired_logs: None,
629                            log_store: this.log_store,
630                            table_data: None,
631                            custom_execute_handler: this.post_commit_hook_handler,
632                            metrics: CommitMetrics { num_retries: 0 },
633                        });
634                    }
635                    Err(TransactionError::VersionAlreadyExists(0)) => {
636                        // Table was created by another writer since the `this.table_data.is_none()` check.
637                        // Load the current table state and continue with the retry loop.
638                        debug!("version 0 already exists, loading table state for retry");
639                        attempt_number = 2;
640                        let latest_version = this.log_store.get_latest_version(0).await?;
641                        EagerSnapshot::try_new(
642                            this.log_store.as_ref(),
643                            Default::default(),
644                            Some(latest_version),
645                        )
646                        .await?
647                    }
648                    Err(e) => return Err(e.into()),
649                }
650            } else {
651                this.table_data.unwrap().eager_snapshot().clone()
652            };
653
654            let mut read_snapshot = read_snapshot;
655
656            let commit_span = info_span!(
657                "commit_with_retries",
658                base_version = read_snapshot.version(),
659                max_retries = this.max_retries,
660                attempt = field::Empty,
661                target_version = field::Empty,
662                conflicts_checked = 0
663            );
664
665            async move {
666                let total_retries = this.max_retries + 1;
667                while attempt_number <= total_retries {
668                    Span::current().record("attempt", attempt_number);
669                    let latest_version = this
670                        .log_store
671                        .get_latest_version(read_snapshot.version())
672                        .await?;
673
674                    if latest_version > read_snapshot.version() {
675                        // If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
676                        // and throw immediately
677                        if this.max_retries == 0 {
678                            warn!(
679                                base_version = read_snapshot.version(),
680                                latest_version = latest_version,
681                                "table updated but max_retries is 0, failing immediately"
682                            );
683                            return Err(TransactionError::MaxCommitAttempts(
684                                this.max_retries as i32,
685                            )
686                            .into());
687                        }
688                        warn!(
689                            base_version = read_snapshot.version(),
690                            latest_version = latest_version,
691                            versions_behind = latest_version - read_snapshot.version(),
692                            "table updated during transaction, checking for conflicts"
693                        );
694                        let mut steps = latest_version - read_snapshot.version();
695                        let mut conflicts_checked = 0;
696
697                        // Need to check for conflicts with each version between the read_snapshot and
698                        // the latest!
699                        while steps != 0 {
700                            conflicts_checked += 1;
701                            let summary = WinningCommitSummary::try_new(
702                                this.log_store.as_ref(),
703                                latest_version - steps,
704                                (latest_version - steps) + 1,
705                            )
706                            .await?;
707                            let transaction_info = TransactionInfo::try_new(
708                                read_snapshot.log_data(),
709                                this.data.operation.read_predicate(),
710                                &this.data.actions,
711                                this.data.operation.read_whole_table(),
712                            )?;
713                            let conflict_checker = ConflictChecker::new(
714                                transaction_info,
715                                summary,
716                                Some(&this.data.operation),
717                            );
718
719                            match conflict_checker.check_conflicts() {
720                                Ok(_) => {}
721                                Err(err) => {
722                                    error!(
723                                        conflicts_checked = conflicts_checked,
724                                        error = %err,
725                                        "conflict detected, aborting transaction"
726                                    );
727                                    return Err(TransactionError::CommitConflict(err).into());
728                                }
729                            }
730                            steps -= 1;
731                        }
732                        Span::current().record("conflicts_checked", conflicts_checked);
733                        debug!(
734                            conflicts_checked = conflicts_checked,
735                            "all conflicts resolved, updating snapshot"
736                        );
737                        // Update snapshot to latest version after successful conflict check
738                        read_snapshot
739                            .update(&this.log_store, Some(latest_version as u64))
740                            .await?;
741                    }
742                    let version: i64 = latest_version + 1;
743                    Span::current().record("target_version", version);
744
745                    match this
746                        .log_store
747                        .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
748                        .await
749                    {
750                        Ok(()) => {
751                            info!(
752                                version = version,
753                                num_retries = attempt_number as u64 - 1,
754                                "transaction committed successfully"
755                            );
756                            return Ok(PostCommit {
757                                version,
758                                data: this.data,
759                                create_checkpoint: this
760                                    .post_commit
761                                    .map(|v| v.create_checkpoint)
762                                    .unwrap_or_default(),
763                                cleanup_expired_logs: this
764                                    .post_commit
765                                    .map(|v| v.cleanup_expired_logs)
766                                    .unwrap_or_default(),
767                                log_store: this.log_store,
768                                table_data: Some(Box::new(read_snapshot)),
769                                custom_execute_handler: this.post_commit_hook_handler,
770                                metrics: CommitMetrics {
771                                    num_retries: attempt_number as u64 - 1,
772                                },
773                            });
774                        }
775                        Err(TransactionError::VersionAlreadyExists(version)) => {
776                            warn!(
777                                version = version,
778                                attempt = attempt_number,
779                                "version already exists, will retry"
780                            );
781                            // If the version already exists, loop through again and re-check
782                            // conflicts
783                            attempt_number += 1;
784                        }
785                        Err(err) => {
786                            error!(
787                                version = version,
788                                error = %err,
789                                "commit failed, aborting"
790                            );
791                            this.log_store
792                                .abort_commit_entry(version, commit_or_bytes, this.operation_id)
793                                .await?;
794                            return Err(err.into());
795                        }
796                    }
797                }
798
799                error!(
800                    max_retries = this.max_retries,
801                    "exceeded maximum commit attempts"
802                );
803                Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
804            }
805            .instrument(commit_span)
806            .await
807        })
808    }
809}
810
811/// Represents items for the post commit hook
812pub struct PostCommit {
813    /// The winning version number of the commit
814    pub version: i64,
815    /// The data that was committed to the log store
816    pub data: CommitData,
817    create_checkpoint: bool,
818    cleanup_expired_logs: Option<bool>,
819    log_store: LogStoreRef,
820    table_data: Option<Box<dyn TableReference>>,
821    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
822    metrics: CommitMetrics,
823}
824
825impl PostCommit {
826    /// Runs the post commit activities
827    async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
828        if let Some(table) = &self.table_data {
829            let post_commit_operation_id = Uuid::new_v4();
830            let mut snapshot = table.eager_snapshot().clone();
831            if self.version != snapshot.version() {
832                snapshot
833                    .update(&self.log_store, Some(self.version as u64))
834                    .await?;
835            }
836
837            let mut state = DeltaTableState { snapshot };
838
839            let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
840                cleanup_logs
841            } else {
842                state.table_config().enable_expired_log_cleanup()
843            };
844
845            // Run arbitrary before_post_commit_hook code
846            if let Some(custom_execute_handler) = &self.custom_execute_handler {
847                custom_execute_handler
848                    .before_post_commit_hook(
849                        &self.log_store,
850                        cleanup_logs || self.create_checkpoint,
851                        post_commit_operation_id,
852                    )
853                    .await?
854            }
855
856            let mut new_checkpoint_created = false;
857            if self.create_checkpoint {
858                // Execute create checkpoint hook
859                new_checkpoint_created = self
860                    .create_checkpoint(
861                        &state,
862                        &self.log_store,
863                        self.version,
864                        post_commit_operation_id,
865                    )
866                    .await?;
867            }
868
869            let mut num_log_files_cleaned_up: u64 = 0;
870            if cleanup_logs {
871                // Execute clean up logs hook
872                num_log_files_cleaned_up = cleanup_expired_logs_for(
873                    self.version,
874                    self.log_store.as_ref(),
875                    Utc::now().timestamp_millis()
876                        - state.table_config().log_retention_duration().as_millis() as i64,
877                    Some(post_commit_operation_id),
878                )
879                .await? as u64;
880                if num_log_files_cleaned_up > 0 {
881                    state = DeltaTableState::try_new(
882                        &self.log_store,
883                        state.load_config().clone(),
884                        Some(self.version),
885                    )
886                    .await?;
887                }
888            }
889
890            // Run arbitrary after_post_commit_hook code
891            if let Some(custom_execute_handler) = &self.custom_execute_handler {
892                custom_execute_handler
893                    .after_post_commit_hook(
894                        &self.log_store,
895                        cleanup_logs || self.create_checkpoint,
896                        post_commit_operation_id,
897                    )
898                    .await?
899            }
900            Ok((
901                state,
902                PostCommitMetrics {
903                    new_checkpoint_created,
904                    num_log_files_cleaned_up,
905                },
906            ))
907        } else {
908            let state =
909                DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
910                    .await?;
911            Ok((
912                state,
913                PostCommitMetrics {
914                    new_checkpoint_created: false,
915                    num_log_files_cleaned_up: 0,
916                },
917            ))
918        }
919    }
920    async fn create_checkpoint(
921        &self,
922        table_state: &DeltaTableState,
923        log_store: &LogStoreRef,
924        version: i64,
925        operation_id: Uuid,
926    ) -> DeltaResult<bool> {
927        if !table_state.load_config().require_files {
928            warn!(
929                "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
930            );
931            return Ok(false);
932        }
933
934        let checkpoint_interval = table_state.config().checkpoint_interval().get() as i64;
935        if ((version + 1) % checkpoint_interval) == 0 {
936            create_checkpoint_for(version as u64, log_store.as_ref(), Some(operation_id)).await?;
937            Ok(true)
938        } else {
939            Ok(false)
940        }
941    }
942}
943
944/// A commit that successfully completed
945pub struct FinalizedCommit {
946    /// The new table state after a commit
947    pub snapshot: DeltaTableState,
948
949    /// Version of the finalized commit
950    pub version: i64,
951
952    /// Metrics associated with the commit operation
953    pub metrics: Metrics,
954}
955
956impl std::fmt::Debug for FinalizedCommit {
957    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
958        f.debug_struct("FinalizedCommit")
959            .field("version", &self.version)
960            .field("metrics", &self.metrics)
961            .finish()
962    }
963}
964
965impl FinalizedCommit {
966    /// The new table state after a commit
967    pub fn snapshot(&self) -> DeltaTableState {
968        self.snapshot.clone()
969    }
970    /// Version of the finalized commit
971    pub fn version(&self) -> i64 {
972        self.version
973    }
974}
975
976impl std::future::IntoFuture for PostCommit {
977    type Output = DeltaResult<FinalizedCommit>;
978    type IntoFuture = BoxFuture<'static, Self::Output>;
979
980    fn into_future(self) -> Self::IntoFuture {
981        let this = self;
982
983        Box::pin(async move {
984            match this.run_post_commit_hook().await {
985                Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
986                    snapshot,
987                    version: this.version,
988                    metrics: Metrics {
989                        num_retries: this.metrics.num_retries,
990                        new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
991                        num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
992                    },
993                }),
994                Err(err) => Err(err),
995            }
996        })
997    }
998}
999
1000#[cfg(test)]
1001mod tests {
1002    use std::sync::Arc;
1003
1004    use super::*;
1005    use crate::logstore::{
1006        LogStore, StorageConfig, commit_uri_from_version, default_logstore::DefaultLogStore,
1007    };
1008    use object_store::{ObjectStore, PutPayload, memory::InMemory};
1009    use url::Url;
1010
1011    #[test]
1012    fn test_commit_uri_from_version() {
1013        let version = commit_uri_from_version(0);
1014        assert_eq!(version, Path::from("_delta_log/00000000000000000000.json"));
1015        let version = commit_uri_from_version(123);
1016        assert_eq!(version, Path::from("_delta_log/00000000000000000123.json"))
1017    }
1018
1019    #[tokio::test]
1020    async fn test_try_commit_transaction() {
1021        let store = Arc::new(InMemory::new());
1022        let url = Url::parse("mem://what/is/this").unwrap();
1023        let log_store = DefaultLogStore::new(
1024            store.clone(),
1025            store.clone(),
1026            crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1027        );
1028        let version_path = Path::from("_delta_log/00000000000000000000.json");
1029        store.put(&version_path, PutPayload::new()).await.unwrap();
1030
1031        let res = log_store
1032            .write_commit_entry(
1033                0,
1034                CommitOrBytes::LogBytes(PutPayload::new().into()),
1035                Uuid::new_v4(),
1036            )
1037            .await;
1038        // fails if file version already exists
1039        assert!(res.is_err());
1040
1041        // succeeds for next version
1042        log_store
1043            .write_commit_entry(
1044                1,
1045                CommitOrBytes::LogBytes(PutPayload::new().into()),
1046                Uuid::new_v4(),
1047            )
1048            .await
1049            .unwrap();
1050    }
1051
1052    #[test]
1053    fn test_commit_with_retries_tracing_span() {
1054        let span = info_span!(
1055            "commit_with_retries",
1056            base_version = 5,
1057            max_retries = 10,
1058            attempt = field::Empty,
1059            target_version = field::Empty,
1060            conflicts_checked = 0
1061        );
1062
1063        let metadata = span.metadata().expect("span should have metadata");
1064        assert_eq!(metadata.name(), "commit_with_retries");
1065        assert_eq!(metadata.level(), &Level::INFO);
1066        assert!(metadata.is_span());
1067
1068        span.record("attempt", 1);
1069        span.record("target_version", 6);
1070        span.record("conflicts_checked", 2);
1071    }
1072
1073    #[test]
1074    fn test_commit_properties_with_retries() {
1075        let props = CommitProperties::default()
1076            .with_max_retries(5)
1077            .with_create_checkpoint(false);
1078
1079        assert_eq!(props.max_retries, 5);
1080        assert!(!props.create_checkpoint);
1081    }
1082
1083    #[test]
1084    fn test_commit_metrics() {
1085        let metrics = CommitMetrics { num_retries: 3 };
1086        assert_eq!(metrics.num_retries, 3);
1087    }
1088}