deltalake_core/kernel/transaction/
mod.rs

1//! Add a commit entry to the Delta Table.
2//! This module provides a unified interface for modifying commit behavior and attributes
3//!
4//! [`CommitProperties`] provides an unified client interface for all Delta operations.
5//! Internally this is used to initialize a [`CommitBuilder`].
6//!
7//! For advanced use cases [`CommitBuilder`] can be used which allows
8//! finer control over the commit process. The builder can be converted
9//! into a future the yield either a [`PreparedCommit`] or a [`FinalizedCommit`].
10//!
11//! A [`PreparedCommit`] represents a temporary commit marker written to storage.
12//! To convert to a [`FinalizedCommit`] an atomic rename is attempted. If the rename fails
13//! then conflict resolution is performed and the atomic rename is tried for the latest version.
14//!
15//!<pre>
16//!                                          Client Interface
17//!        ┌─────────────────────────────┐
18//!        │      Commit Properties      │
19//!        │                             │
20//!        │ Public commit interface for │
21//!        │     all Delta Operations    │
22//!        │                             │
23//!        └─────────────┬───────────────┘
24//!                      │
25//! ─────────────────────┼────────────────────────────────────
26//!                      │
27//!                      ▼                  Advanced Interface
28//!        ┌─────────────────────────────┐
29//!        │       Commit Builder        │
30//!        │                             │
31//!        │   Advanced entry point for  │
32//!        │     creating a commit       │
33//!        └─────────────┬───────────────┘
34//!                      │
35//!                      ▼
36//!     ┌───────────────────────────────────┐
37//!     │                                   │
38//!     │ ┌───────────────────────────────┐ │
39//!     │ │        Prepared Commit        │ │
40//!     │ │                               │ │
41//!     │ │     Represents a temporary    │ │
42//!     │ │   commit marker written to    │ │
43//!     │ │           storage             │ │
44//!     │ └──────────────┬────────────────┘ │
45//!     │                │                  │
46//!     │                ▼                  │
47//!     │ ┌───────────────────────────────┐ │
48//!     │ │       Finalize Commit         │ │
49//!     │ │                               │ │
50//!     │ │   Convert the commit marker   │ │
51//!     │ │   to a commit using atomic    │ │
52//!     │ │         operations            │ │
53//!     │ │                               │ │
54//!     │ └───────────────────────────────┘ │
55//!     │                                   │
56//!     └────────────────┬──────────────────┘
57//!                      │
58//!                      ▼
59//!       ┌───────────────────────────────┐
60//!       │          Post Commit          │
61//!       │                               │
62//!       │ Commit that was materialized  │
63//!       │ to storage with post commit   │
64//!       │      hooks to be executed     │
65//!       └──────────────┬────────────────┘
66//!                      │
67//!                      ▼
68//!       ┌───────────────────────────────┐
69//!       │        Finalized Commit       │
70//!       │                               │
71//!       │ Commit that was materialized  │
72//!       │         to storage            │
73//!       │                               │
74//!       └───────────────────────────────┘
75//!</pre>
76use std::collections::HashMap;
77use std::sync::Arc;
78
79use bytes::Bytes;
80use chrono::Utc;
81use conflict_checker::ConflictChecker;
82use futures::future::BoxFuture;
83use object_store::path::Path;
84use object_store::Error as ObjectStoreError;
85use serde_json::Value;
86use tracing::*;
87use uuid::Uuid;
88
89use delta_kernel::table_features::{ReaderFeature, WriterFeature};
90use serde::{Deserialize, Serialize};
91
92use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
93use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
94use crate::errors::DeltaTableError;
95use crate::kernel::{Action, CommitInfo, EagerSnapshot, Metadata, Protocol, Transaction};
96use crate::logstore::ObjectStoreRef;
97use crate::logstore::{CommitOrBytes, LogStoreRef};
98use crate::operations::CustomExecuteHandler;
99use crate::protocol::DeltaOperation;
100use crate::table::config::TableConfig;
101use crate::table::state::DeltaTableState;
102use crate::{crate_version, DeltaResult};
103
104pub use self::conflict_checker::CommitConflictError;
105pub use self::protocol::INSTANCE as PROTOCOL;
106
107#[cfg(test)]
108pub(crate) mod application;
109mod conflict_checker;
110mod protocol;
111#[cfg(feature = "datafusion")]
112mod state;
113
114const DELTA_LOG_FOLDER: &str = "_delta_log";
115pub(crate) const DEFAULT_RETRIES: usize = 15;
116
117#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
118#[serde(rename_all = "camelCase")]
119pub struct CommitMetrics {
120    /// Number of retries before a successful commit
121    pub num_retries: u64,
122}
123
124#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
125#[serde(rename_all = "camelCase")]
126pub struct PostCommitMetrics {
127    /// Whether a new checkpoint was created as part of this commit
128    pub new_checkpoint_created: bool,
129
130    /// Number of log files cleaned up
131    pub num_log_files_cleaned_up: u64,
132}
133
134#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
135#[serde(rename_all = "camelCase")]
136pub struct Metrics {
137    /// Number of retries before a successful commit
138    pub num_retries: u64,
139
140    /// Whether a new checkpoint was created as part of this commit
141    pub new_checkpoint_created: bool,
142
143    /// Number of log files cleaned up
144    pub num_log_files_cleaned_up: u64,
145}
146
147/// Error raised while commititng transaction
148#[derive(thiserror::Error, Debug)]
149pub enum TransactionError {
150    /// Version already exists
151    #[error("Tried committing existing table version: {0}")]
152    VersionAlreadyExists(i64),
153
154    /// Error returned when reading the delta log object failed.
155    #[error("Error serializing commit log to json: {json_err}")]
156    SerializeLogJson {
157        /// Commit log record JSON serialization error.
158        json_err: serde_json::error::Error,
159    },
160
161    /// Error returned when reading the delta log object failed.
162    #[error("Log storage error: {}", .source)]
163    ObjectStore {
164        /// Storage error details when reading the delta log object failed.
165        #[from]
166        source: ObjectStoreError,
167    },
168
169    /// Error returned when a commit conflict occurred
170    #[error("Failed to commit transaction: {0}")]
171    CommitConflict(#[from] CommitConflictError),
172
173    /// Error returned when maximum number of commit trioals is exceeded
174    #[error("Failed to commit transaction: {0}")]
175    MaxCommitAttempts(i32),
176
177    /// The transaction includes Remove action with data change but Delta table is append-only
178    #[error(
179        "The transaction includes Remove action with data change but Delta table is append-only"
180    )]
181    DeltaTableAppendOnly,
182
183    /// Error returned when unsupported reader features are required
184    #[error("Unsupported reader features required: {0:?}")]
185    UnsupportedReaderFeatures(Vec<ReaderFeature>),
186
187    /// Error returned when unsupported writer features are required
188    #[error("Unsupported writer features required: {0:?}")]
189    UnsupportedWriterFeatures(Vec<WriterFeature>),
190
191    /// Error returned when writer features are required but not specified
192    #[error("Writer features must be specified for writerversion >= 7, please specify: {0:?}")]
193    WriterFeaturesRequired(WriterFeature),
194
195    /// Error returned when reader features are required but not specified
196    #[error("Reader features must be specified for reader version >= 3, please specify: {0:?}")]
197    ReaderFeaturesRequired(ReaderFeature),
198
199    /// The transaction failed to commit due to an error in an implementation-specific layer.
200    /// Currently used by DynamoDb-backed S3 log store when database operations fail.
201    #[error("Transaction failed: {msg}")]
202    LogStoreError {
203        /// Detailed message for the commit failure.
204        msg: String,
205        /// underlying error in the log store transactional layer.
206        source: Box<dyn std::error::Error + Send + Sync + 'static>,
207    },
208}
209
210impl From<TransactionError> for DeltaTableError {
211    fn from(err: TransactionError) -> Self {
212        match err {
213            TransactionError::VersionAlreadyExists(version) => {
214                DeltaTableError::VersionAlreadyExists(version)
215            }
216            TransactionError::SerializeLogJson { json_err } => {
217                DeltaTableError::SerializeLogJson { json_err }
218            }
219            TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
220            other => DeltaTableError::Transaction { source: other },
221        }
222    }
223}
224
225/// Error raised while commititng transaction
226#[derive(thiserror::Error, Debug)]
227pub enum CommitBuilderError {}
228
229impl From<CommitBuilderError> for DeltaTableError {
230    fn from(err: CommitBuilderError) -> Self {
231        DeltaTableError::CommitValidation { source: err }
232    }
233}
234
235/// Reference to some structure that contains mandatory attributes for performing a commit.
236pub trait TableReference: Send + Sync {
237    /// Well known table configuration
238    fn config(&self) -> TableConfig;
239
240    /// Get the table protocol of the snapshot
241    fn protocol(&self) -> &Protocol;
242
243    /// Get the table metadata of the snapshot
244    fn metadata(&self) -> &Metadata;
245
246    /// Try to cast this table reference to a `EagerSnapshot`
247    fn eager_snapshot(&self) -> &EagerSnapshot;
248}
249
250impl TableReference for EagerSnapshot {
251    fn protocol(&self) -> &Protocol {
252        EagerSnapshot::protocol(self)
253    }
254
255    fn metadata(&self) -> &Metadata {
256        EagerSnapshot::metadata(self)
257    }
258
259    fn config(&self) -> TableConfig {
260        self.table_config()
261    }
262
263    fn eager_snapshot(&self) -> &EagerSnapshot {
264        self
265    }
266}
267
268impl TableReference for DeltaTableState {
269    fn config(&self) -> TableConfig {
270        self.snapshot.config()
271    }
272
273    fn protocol(&self) -> &Protocol {
274        self.snapshot.protocol()
275    }
276
277    fn metadata(&self) -> &Metadata {
278        self.snapshot.metadata()
279    }
280
281    fn eager_snapshot(&self) -> &EagerSnapshot {
282        &self.snapshot
283    }
284}
285
286/// Data that was actually written to the log store.
287#[derive(Debug)]
288pub struct CommitData {
289    /// The actions
290    pub actions: Vec<Action>,
291    /// The Operation
292    pub operation: DeltaOperation,
293    /// The Metadata
294    pub app_metadata: HashMap<String, Value>,
295    /// Application specific transaction
296    pub app_transactions: Vec<Transaction>,
297}
298
299impl CommitData {
300    /// Create new data to be committed
301    pub fn new(
302        mut actions: Vec<Action>,
303        operation: DeltaOperation,
304        mut app_metadata: HashMap<String, Value>,
305        app_transactions: Vec<Transaction>,
306    ) -> Self {
307        if !actions.iter().any(|a| matches!(a, Action::CommitInfo(..))) {
308            let mut commit_info = operation.get_commit_info();
309            commit_info.timestamp = Some(Utc::now().timestamp_millis());
310            app_metadata.insert(
311                "clientVersion".to_string(),
312                Value::String(format!("delta-rs.{}", crate_version())),
313            );
314            app_metadata.extend(commit_info.info);
315            commit_info.info = app_metadata.clone();
316            actions.push(Action::CommitInfo(commit_info))
317        }
318
319        for txn in &app_transactions {
320            actions.push(Action::Txn(txn.clone()))
321        }
322
323        CommitData {
324            actions,
325            operation,
326            app_metadata,
327            app_transactions,
328        }
329    }
330
331    /// Obtain the byte representation of the commit.
332    pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
333        let mut jsons = Vec::<String>::new();
334        for action in &self.actions {
335            let json = serde_json::to_string(action)
336                .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
337            jsons.push(json);
338        }
339        Ok(bytes::Bytes::from(jsons.join("\n")))
340    }
341}
342
343#[derive(Clone, Debug, Copy)]
344/// Properties for post commit hook.
345pub struct PostCommitHookProperties {
346    create_checkpoint: bool,
347    /// Override the EnableExpiredLogCleanUp setting, if None config setting is used
348    cleanup_expired_logs: Option<bool>,
349}
350
351#[derive(Clone, Debug)]
352/// End user facing interface to be used by operations on the table.
353/// Enable controlling commit behaviour and modifying metadata that is written during a commit.
354pub struct CommitProperties {
355    pub(crate) app_metadata: HashMap<String, Value>,
356    pub(crate) app_transaction: Vec<Transaction>,
357    max_retries: usize,
358    create_checkpoint: bool,
359    cleanup_expired_logs: Option<bool>,
360}
361
362impl Default for CommitProperties {
363    fn default() -> Self {
364        Self {
365            app_metadata: Default::default(),
366            app_transaction: Vec::new(),
367            max_retries: DEFAULT_RETRIES,
368            create_checkpoint: true,
369            cleanup_expired_logs: None,
370        }
371    }
372}
373
374impl CommitProperties {
375    /// Specify metadata the be committed
376    pub fn with_metadata(
377        mut self,
378        metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
379    ) -> Self {
380        self.app_metadata = HashMap::from_iter(metadata);
381        self
382    }
383
384    /// Specify maximum number of times to retry the transaction before failing to commit
385    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
386        self.max_retries = max_retries;
387        self
388    }
389
390    /// Specify if it should create a checkpoint when the commit interval condition is met
391    pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
392        self.create_checkpoint = create_checkpoint;
393        self
394    }
395
396    /// Add an additional application transaction to the commit
397    pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
398        self.app_transaction.push(txn);
399        self
400    }
401
402    /// Override application transactions for the commit
403    pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
404        self.app_transaction = txn;
405        self
406    }
407
408    /// Specify if it should clean up the logs when the logRetentionDuration interval is met
409    pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
410        self.cleanup_expired_logs = cleanup_expired_logs;
411        self
412    }
413}
414
415impl From<CommitProperties> for CommitBuilder {
416    fn from(value: CommitProperties) -> Self {
417        CommitBuilder {
418            max_retries: value.max_retries,
419            app_metadata: value.app_metadata,
420            post_commit_hook: Some(PostCommitHookProperties {
421                create_checkpoint: value.create_checkpoint,
422                cleanup_expired_logs: value.cleanup_expired_logs,
423            }),
424            app_transaction: value.app_transaction,
425            ..Default::default()
426        }
427    }
428}
429
430/// Prepare data to be committed to the Delta log and control how the commit is performed
431pub struct CommitBuilder {
432    actions: Vec<Action>,
433    app_metadata: HashMap<String, Value>,
434    app_transaction: Vec<Transaction>,
435    max_retries: usize,
436    post_commit_hook: Option<PostCommitHookProperties>,
437    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
438    operation_id: Uuid,
439}
440
441impl Default for CommitBuilder {
442    fn default() -> Self {
443        CommitBuilder {
444            actions: Vec::new(),
445            app_metadata: HashMap::new(),
446            app_transaction: Vec::new(),
447            max_retries: DEFAULT_RETRIES,
448            post_commit_hook: None,
449            post_commit_hook_handler: None,
450            operation_id: Uuid::new_v4(),
451        }
452    }
453}
454
455impl<'a> CommitBuilder {
456    /// Actions to be included in the commit
457    pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
458        self.actions = actions;
459        self
460    }
461
462    /// Metadata for the operation performed like metrics, user, and notebook
463    pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
464        self.app_metadata = app_metadata;
465        self
466    }
467
468    /// Maximum number of times to retry the transaction before failing to commit
469    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
470        self.max_retries = max_retries;
471        self
472    }
473
474    /// Specify all the post commit hook properties
475    pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
476        self.post_commit_hook = Some(post_commit_hook);
477        self
478    }
479
480    /// Propagate operation id to log store
481    pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
482        self.operation_id = operation_id;
483        self
484    }
485
486    /// Set a custom execute handler, for pre and post execution
487    pub fn with_post_commit_hook_handler(
488        mut self,
489        handler: Option<Arc<dyn CustomExecuteHandler>>,
490    ) -> Self {
491        self.post_commit_hook_handler = handler;
492        self
493    }
494
495    /// Prepare a Commit operation using the configured builder
496    pub fn build(
497        self,
498        table_data: Option<&'a dyn TableReference>,
499        log_store: LogStoreRef,
500        operation: DeltaOperation,
501    ) -> PreCommit<'a> {
502        let data = CommitData::new(
503            self.actions,
504            operation,
505            self.app_metadata,
506            self.app_transaction,
507        );
508        PreCommit {
509            log_store,
510            table_data,
511            max_retries: self.max_retries,
512            data,
513            post_commit_hook: self.post_commit_hook,
514            post_commit_hook_handler: self.post_commit_hook_handler,
515            operation_id: self.operation_id,
516        }
517    }
518}
519
520/// Represents a commit that has not yet started but all details are finalized
521pub struct PreCommit<'a> {
522    log_store: LogStoreRef,
523    table_data: Option<&'a dyn TableReference>,
524    data: CommitData,
525    max_retries: usize,
526    post_commit_hook: Option<PostCommitHookProperties>,
527    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
528    operation_id: Uuid,
529}
530
531impl<'a> std::future::IntoFuture for PreCommit<'a> {
532    type Output = DeltaResult<FinalizedCommit>;
533    type IntoFuture = BoxFuture<'a, Self::Output>;
534
535    fn into_future(self) -> Self::IntoFuture {
536        Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
537    }
538}
539
540impl<'a> PreCommit<'a> {
541    /// Prepare the commit but do not finalize it
542    pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
543        let this = self;
544
545        // Write delta log entry as temporary file to storage. For the actual commit,
546        // the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
547        async fn write_tmp_commit(
548            log_entry: Bytes,
549            store: ObjectStoreRef,
550        ) -> DeltaResult<CommitOrBytes> {
551            let token = uuid::Uuid::new_v4().to_string();
552            let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
553            store.put(&path, log_entry.into()).await?;
554            Ok(CommitOrBytes::TmpCommit(path))
555        }
556
557        Box::pin(async move {
558            if let Some(table_reference) = this.table_data {
559                PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
560            }
561            let log_entry = this.data.get_bytes()?;
562
563            // With the DefaultLogStore & LakeFSLogstore, we just pass the bytes around, since we use conditionalPuts
564            // Other stores will use tmp_commits
565            let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
566                .contains(&this.log_store.name().as_str())
567            {
568                CommitOrBytes::LogBytes(log_entry)
569            } else {
570                write_tmp_commit(
571                    log_entry,
572                    this.log_store.object_store(Some(this.operation_id)),
573                )
574                .await?
575            };
576
577            Ok(PreparedCommit {
578                commit_or_bytes,
579                log_store: this.log_store,
580                table_data: this.table_data,
581                max_retries: this.max_retries,
582                data: this.data,
583                post_commit: this.post_commit_hook,
584                post_commit_hook_handler: this.post_commit_hook_handler,
585                operation_id: this.operation_id,
586            })
587        })
588    }
589}
590
591/// Represents a inflight commit
592pub struct PreparedCommit<'a> {
593    commit_or_bytes: CommitOrBytes,
594    log_store: LogStoreRef,
595    data: CommitData,
596    table_data: Option<&'a dyn TableReference>,
597    max_retries: usize,
598    post_commit: Option<PostCommitHookProperties>,
599    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
600    operation_id: Uuid,
601}
602
603impl PreparedCommit<'_> {
604    /// The temporary commit file created
605    pub fn commit_or_bytes(&self) -> &CommitOrBytes {
606        &self.commit_or_bytes
607    }
608}
609
610impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
611    type Output = DeltaResult<PostCommit>;
612    type IntoFuture = BoxFuture<'a, Self::Output>;
613
614    fn into_future(self) -> Self::IntoFuture {
615        let this = self;
616
617        Box::pin(async move {
618            let commit_or_bytes = this.commit_or_bytes;
619
620            if this.table_data.is_none() {
621                this.log_store
622                    .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
623                    .await?;
624                return Ok(PostCommit {
625                    version: 0,
626                    data: this.data,
627                    create_checkpoint: false,
628                    cleanup_expired_logs: None,
629                    log_store: this.log_store,
630                    table_data: None,
631                    custom_execute_handler: this.post_commit_hook_handler,
632                    metrics: CommitMetrics { num_retries: 0 },
633                });
634            }
635
636            // unwrap() is safe here due to the above check
637            let mut read_snapshot = this.table_data.unwrap().eager_snapshot().clone();
638
639            let mut attempt_number = 1;
640            let total_retries = this.max_retries + 1;
641            while attempt_number <= total_retries {
642                let latest_version = this
643                    .log_store
644                    .get_latest_version(read_snapshot.version())
645                    .await?;
646
647                if latest_version > read_snapshot.version() {
648                    // If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
649                    // and throw immediately
650                    if this.max_retries == 0 {
651                        return Err(
652                            TransactionError::MaxCommitAttempts(this.max_retries as i32).into()
653                        );
654                    }
655                    warn!("Attempting to write a transaction {} but the underlying table has been updated to {latest_version}\n{:?}", read_snapshot.version() + 1, this.log_store);
656                    let mut steps = latest_version - read_snapshot.version();
657
658                    // Need to check for conflicts with each version between the read_snapshot and
659                    // the latest!
660                    while steps != 0 {
661                        let summary = WinningCommitSummary::try_new(
662                            this.log_store.as_ref(),
663                            latest_version - steps,
664                            (latest_version - steps) + 1,
665                        )
666                        .await?;
667                        let transaction_info = TransactionInfo::try_new(
668                            &read_snapshot,
669                            this.data.operation.read_predicate(),
670                            &this.data.actions,
671                            this.data.operation.read_whole_table(),
672                        )?;
673                        let conflict_checker = ConflictChecker::new(
674                            transaction_info,
675                            summary,
676                            Some(&this.data.operation),
677                        );
678
679                        match conflict_checker.check_conflicts() {
680                            Ok(_) => {}
681                            Err(err) => {
682                                return Err(TransactionError::CommitConflict(err).into());
683                            }
684                        }
685                        steps -= 1;
686                    }
687                    // Update snapshot to latest version after successful conflict check
688                    read_snapshot
689                        .update(this.log_store.clone(), Some(latest_version))
690                        .await?;
691                }
692                let version: i64 = latest_version + 1;
693
694                match this
695                    .log_store
696                    .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
697                    .await
698                {
699                    Ok(()) => {
700                        return Ok(PostCommit {
701                            version,
702                            data: this.data,
703                            create_checkpoint: this
704                                .post_commit
705                                .map(|v| v.create_checkpoint)
706                                .unwrap_or_default(),
707                            cleanup_expired_logs: this
708                                .post_commit
709                                .map(|v| v.cleanup_expired_logs)
710                                .unwrap_or_default(),
711                            log_store: this.log_store,
712                            table_data: Some(Box::new(read_snapshot)),
713                            custom_execute_handler: this.post_commit_hook_handler,
714                            metrics: CommitMetrics {
715                                num_retries: attempt_number as u64 - 1,
716                            },
717                        });
718                    }
719                    Err(TransactionError::VersionAlreadyExists(version)) => {
720                        error!("The transaction {version} already exists, will retry!");
721                        // If the version already exists, loop through again and re-check
722                        // conflicts
723                        attempt_number += 1;
724                    }
725                    Err(err) => {
726                        this.log_store
727                            .abort_commit_entry(version, commit_or_bytes, this.operation_id)
728                            .await?;
729                        return Err(err.into());
730                    }
731                }
732            }
733
734            Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
735        })
736    }
737}
738
739/// Represents items for the post commit hook
740pub struct PostCommit {
741    /// The winning version number of the commit
742    pub version: i64,
743    /// The data that was committed to the log store
744    pub data: CommitData,
745    create_checkpoint: bool,
746    cleanup_expired_logs: Option<bool>,
747    log_store: LogStoreRef,
748    table_data: Option<Box<dyn TableReference>>,
749    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
750    metrics: CommitMetrics,
751}
752
753impl PostCommit {
754    /// Runs the post commit activities
755    async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
756        if let Some(table) = &self.table_data {
757            let post_commit_operation_id = Uuid::new_v4();
758            let mut snapshot = table.eager_snapshot().clone();
759            if self.version - snapshot.version() > 1 {
760                // This may only occur during concurrent write actions. We need to update the state first to - 1
761                // then we can advance.
762                snapshot
763                    .update(self.log_store.clone(), Some(self.version - 1))
764                    .await?;
765                snapshot.advance(vec![&self.data])?;
766            } else {
767                snapshot.advance(vec![&self.data])?;
768            }
769            let mut state = DeltaTableState { snapshot };
770
771            let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
772                cleanup_logs
773            } else {
774                state.table_config().enable_expired_log_cleanup()
775            };
776
777            // Run arbitrary before_post_commit_hook code
778            if let Some(custom_execute_handler) = &self.custom_execute_handler {
779                custom_execute_handler
780                    .before_post_commit_hook(
781                        &self.log_store,
782                        cleanup_logs || self.create_checkpoint,
783                        post_commit_operation_id,
784                    )
785                    .await?
786            }
787
788            let mut new_checkpoint_created = false;
789            if self.create_checkpoint {
790                // Execute create checkpoint hook
791                new_checkpoint_created = self
792                    .create_checkpoint(
793                        &state,
794                        &self.log_store,
795                        self.version,
796                        post_commit_operation_id,
797                    )
798                    .await?;
799            }
800
801            let mut num_log_files_cleaned_up: u64 = 0;
802            if cleanup_logs {
803                // Execute clean up logs hook
804                num_log_files_cleaned_up = cleanup_expired_logs_for(
805                    self.version,
806                    self.log_store.as_ref(),
807                    Utc::now().timestamp_millis()
808                        - state.table_config().log_retention_duration().as_millis() as i64,
809                    Some(post_commit_operation_id),
810                )
811                .await? as u64;
812                if num_log_files_cleaned_up > 0 {
813                    state = DeltaTableState::try_new(
814                        &state.snapshot().table_root(),
815                        self.log_store.object_store(None),
816                        state.load_config().clone(),
817                        Some(self.version),
818                    )
819                    .await?;
820                }
821            }
822
823            // Run arbitrary after_post_commit_hook code
824            if let Some(custom_execute_handler) = &self.custom_execute_handler {
825                custom_execute_handler
826                    .after_post_commit_hook(
827                        &self.log_store,
828                        cleanup_logs || self.create_checkpoint,
829                        post_commit_operation_id,
830                    )
831                    .await?
832            }
833            Ok((
834                state,
835                PostCommitMetrics {
836                    new_checkpoint_created,
837                    num_log_files_cleaned_up,
838                },
839            ))
840        } else {
841            let state = DeltaTableState::try_new(
842                &Path::default(),
843                self.log_store.object_store(None),
844                Default::default(),
845                Some(self.version),
846            )
847            .await?;
848            Ok((
849                state,
850                PostCommitMetrics {
851                    new_checkpoint_created: false,
852                    num_log_files_cleaned_up: 0,
853                },
854            ))
855        }
856    }
857    async fn create_checkpoint(
858        &self,
859        table_state: &DeltaTableState,
860        log_store: &LogStoreRef,
861        version: i64,
862        operation_id: Uuid,
863    ) -> DeltaResult<bool> {
864        if !table_state.load_config().require_files {
865            warn!("Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files.");
866            return Ok(false);
867        }
868
869        let checkpoint_interval = table_state.config().checkpoint_interval() as i64;
870        if ((version + 1) % checkpoint_interval) == 0 {
871            create_checkpoint_for(version, table_state, log_store.as_ref(), Some(operation_id))
872                .await?;
873            Ok(true)
874        } else {
875            Ok(false)
876        }
877    }
878}
879
880/// A commit that successfully completed
881pub struct FinalizedCommit {
882    /// The new table state after a commit
883    pub snapshot: DeltaTableState,
884
885    /// Version of the finalized commit
886    pub version: i64,
887
888    /// Metrics associated with the commit operation
889    pub metrics: Metrics,
890}
891
892impl FinalizedCommit {
893    /// The new table state after a commit
894    pub fn snapshot(&self) -> DeltaTableState {
895        self.snapshot.clone()
896    }
897    /// Version of the finalized commit
898    pub fn version(&self) -> i64 {
899        self.version
900    }
901}
902
903impl std::future::IntoFuture for PostCommit {
904    type Output = DeltaResult<FinalizedCommit>;
905    type IntoFuture = BoxFuture<'static, Self::Output>;
906
907    fn into_future(self) -> Self::IntoFuture {
908        let this = self;
909
910        Box::pin(async move {
911            match this.run_post_commit_hook().await {
912                Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
913                    snapshot,
914                    version: this.version,
915                    metrics: Metrics {
916                        num_retries: this.metrics.num_retries,
917                        new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
918                        num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
919                    },
920                }),
921                Err(err) => Err(err),
922            }
923        })
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use std::sync::Arc;
930
931    use super::*;
932    use crate::logstore::{commit_uri_from_version, default_logstore::DefaultLogStore, LogStore};
933    use object_store::{memory::InMemory, ObjectStore, PutPayload};
934    use url::Url;
935
936    #[test]
937    fn test_commit_uri_from_version() {
938        let version = commit_uri_from_version(0);
939        assert_eq!(version, Path::from("_delta_log/00000000000000000000.json"));
940        let version = commit_uri_from_version(123);
941        assert_eq!(version, Path::from("_delta_log/00000000000000000123.json"))
942    }
943
944    #[tokio::test]
945    async fn test_try_commit_transaction() {
946        let store = Arc::new(InMemory::new());
947        let url = Url::parse("mem://what/is/this").unwrap();
948        let log_store = DefaultLogStore::new(
949            store.clone(),
950            crate::logstore::LogStoreConfig {
951                location: url,
952                options: Default::default(),
953            },
954        );
955        let version_path = Path::from("_delta_log/00000000000000000000.json");
956        store.put(&version_path, PutPayload::new()).await.unwrap();
957
958        let res = log_store
959            .write_commit_entry(
960                0,
961                CommitOrBytes::LogBytes(PutPayload::new().into()),
962                Uuid::new_v4(),
963            )
964            .await;
965        // fails if file version already exists
966        assert!(res.is_err());
967
968        // succeeds for next version
969        log_store
970            .write_commit_entry(
971                1,
972                CommitOrBytes::LogBytes(PutPayload::new().into()),
973                Uuid::new_v4(),
974            )
975            .await
976            .unwrap();
977    }
978}