Skip to main content

deltalake_core/kernel/transaction/
mod.rs

1//! Add a commit entry to the Delta Table.
2//! This module provides a unified interface for modifying commit behavior and attributes
3//!
4//! [`CommitProperties`] provides an unified client interface for all Delta operations.
5//! Internally this is used to initialize a [`CommitBuilder`].
6//!
7//! For advanced use cases [`CommitBuilder`] can be used which allows
8//! finer control over the commit process. The builder can be converted
9//! into a future the yield either a [`PreparedCommit`] or a [`FinalizedCommit`].
10//!
11//! A [`PreparedCommit`] represents a temporary commit marker written to storage.
12//! To convert to a [`FinalizedCommit`] an atomic rename is attempted. If the rename fails
13//! then conflict resolution is performed and the atomic rename is tried for the latest version.
14//!
15//!<pre>
16//!                                          Client Interface
17//!        ┌─────────────────────────────┐
18//!        │      Commit Properties      │
19//!        │                             │
20//!        │ Public commit interface for │
21//!        │     all Delta Operations    │
22//!        │                             │
23//!        └─────────────┬───────────────┘
24//!                      │
25//! ─────────────────────┼────────────────────────────────────
26//!                      │
27//!                      ▼                  Advanced Interface
28//!        ┌─────────────────────────────┐
29//!        │       Commit Builder        │
30//!        │                             │
31//!        │   Advanced entry point for  │
32//!        │     creating a commit       │
33//!        └─────────────┬───────────────┘
34//!                      │
35//!                      ▼
36//!     ┌───────────────────────────────────┐
37//!     │                                   │
38//!     │ ┌───────────────────────────────┐ │
39//!     │ │        Prepared Commit        │ │
40//!     │ │                               │ │
41//!     │ │     Represents a temporary    │ │
42//!     │ │   commit marker written to    │ │
43//!     │ │           storage             │ │
44//!     │ └──────────────┬────────────────┘ │
45//!     │                │                  │
46//!     │                ▼                  │
47//!     │ ┌───────────────────────────────┐ │
48//!     │ │       Finalize Commit         │ │
49//!     │ │                               │ │
50//!     │ │   Convert the commit marker   │ │
51//!     │ │   to a commit using atomic    │ │
52//!     │ │         operations            │ │
53//!     │ │                               │ │
54//!     │ └───────────────────────────────┘ │
55//!     │                                   │
56//!     └────────────────┬──────────────────┘
57//!                      │
58//!                      ▼
59//!       ┌───────────────────────────────┐
60//!       │          Post Commit          │
61//!       │                               │
62//!       │ Commit that was materialized  │
63//!       │ to storage with post commit   │
64//!       │      hooks to be executed     │
65//!       └──────────────┬────────────────┘
66//!                      │
67//!                      ▼
68//!       ┌───────────────────────────────┐
69//!       │        Finalized Commit       │
70//!       │                               │
71//!       │ Commit that was materialized  │
72//!       │         to storage            │
73//!       │                               │
74//!       └───────────────────────────────┘
75//!</pre>
76use std::collections::HashMap;
77use std::str::FromStr;
78use std::sync::Arc;
79
80use bytes::Bytes;
81use chrono::Utc;
82use conflict_checker::ConflictChecker;
83use delta_kernel::table_properties::TableProperties;
84use futures::future::BoxFuture;
85use object_store::Error as ObjectStoreError;
86use object_store::ObjectStoreExt as _;
87use object_store::path::Path;
88use serde_json::Value;
89use tracing::*;
90use uuid::Uuid;
91
92use delta_kernel::table_features::TableFeature;
93use serde::{Deserialize, Serialize};
94
95use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
96use crate::errors::DeltaTableError;
97use crate::kernel::{
98    Action, CommitInfo, EagerSnapshot, IsolationLevel, Metadata, Protocol, Transaction, Version,
99};
100use crate::logstore::ObjectStoreRef;
101use crate::logstore::{CommitOrBytes, LogStoreRef};
102use crate::operations::CustomExecuteHandler;
103use crate::protocol::{DeltaOperation, operation_parameter_value};
104use crate::protocol::{cleanup_expired_logs_for, create_checkpoint_for};
105use crate::table::config::TablePropertiesExt as _;
106use crate::table::state::DeltaTableState;
107use crate::{DeltaResult, crate_version};
108
109pub use self::conflict_checker::CommitConflictError;
110pub use self::protocol::INSTANCE as PROTOCOL;
111
112#[cfg(test)]
113pub(crate) mod application;
114mod conflict_checker;
115mod protocol;
116#[cfg(feature = "datafusion")]
117mod state;
118
119const DELTA_LOG_FOLDER: &str = "_delta_log";
120pub(crate) const DEFAULT_RETRIES: usize = 15;
121// These keys map to typed CommitInfo fields used by common Spark compatible writers. They are not
122// required by the protocol, but leaving them in flattened metadata can produce duplicate JSON keys.
123// Keep this list aligned with validate_reserved_commit_metadata in python/src/lib.rs.
124const RESERVED_COMMIT_INFO_KEYS: &[&str] = &[
125    "timestamp",
126    "userId",
127    "userName",
128    "operation",
129    "operationParameters",
130    "readVersion",
131    "isolationLevel",
132    "isBlindAppend",
133    "engineInfo",
134    "userMetadata",
135];
136
137#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
138#[serde(rename_all = "camelCase")]
139pub struct CommitMetrics {
140    /// Number of retries before a successful commit
141    pub num_retries: u64,
142}
143
144#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
145#[serde(rename_all = "camelCase")]
146pub struct PostCommitMetrics {
147    /// Whether a new checkpoint was created as part of this commit
148    pub new_checkpoint_created: bool,
149
150    /// Number of log files cleaned up
151    pub num_log_files_cleaned_up: u64,
152}
153
154#[derive(Default, Debug, PartialEq, Clone, Serialize, Deserialize)]
155#[serde(rename_all = "camelCase")]
156pub struct Metrics {
157    /// Number of retries before a successful commit
158    pub num_retries: u64,
159
160    /// Whether a new checkpoint was created as part of this commit
161    pub new_checkpoint_created: bool,
162
163    /// Number of log files cleaned up
164    pub num_log_files_cleaned_up: u64,
165}
166
167/// Error raised while commititng transaction
168#[derive(thiserror::Error, Debug)]
169pub enum TransactionError {
170    /// Version already exists
171    #[error("Tried committing existing table version: {0}")]
172    VersionAlreadyExists(Version),
173
174    /// Error returned when reading the delta log object failed.
175    #[error("Error serializing commit log to json: {json_err}")]
176    SerializeLogJson {
177        /// Commit log record JSON serialization error.
178        json_err: serde_json::error::Error,
179    },
180
181    /// Error returned when reading the delta log object failed.
182    #[error("Log storage error: {}", .source)]
183    ObjectStore {
184        /// Storage error details when reading the delta log object failed.
185        #[from]
186        source: ObjectStoreError,
187    },
188
189    /// Error returned when a commit conflict occurred
190    #[error("Failed to commit transaction: {0}")]
191    CommitConflict(#[from] CommitConflictError),
192
193    /// Error returned when maximum number of commit trioals is exceeded
194    #[error("Failed to commit transaction: {0}")]
195    MaxCommitAttempts(i32),
196
197    /// The transaction includes Remove action with data change but Delta table is append-only
198    #[error(
199        "The transaction includes Remove action with data change but Delta table is append-only"
200    )]
201    DeltaTableAppendOnly,
202
203    /// Error returned when unsupported table features are required
204    #[error("Unsupported table features required: {0:?}")]
205    UnsupportedTableFeatures(Vec<TableFeature>),
206
207    /// Error returned when table features are required but not specified
208    #[error("Table features must be specified, please specify: {0:?}")]
209    TableFeaturesRequired(TableFeature),
210
211    /// The transaction failed to commit due to an error in an implementation-specific layer.
212    /// Currently used by DynamoDb-backed S3 log store when database operations fail.
213    #[error("Transaction failed: {msg}")]
214    LogStoreError {
215        /// Detailed message for the commit failure.
216        msg: String,
217        /// underlying error in the log store transactional layer.
218        source: Box<dyn std::error::Error + Send + Sync + 'static>,
219    },
220}
221
222impl From<TransactionError> for DeltaTableError {
223    fn from(err: TransactionError) -> Self {
224        match err {
225            TransactionError::VersionAlreadyExists(version) => {
226                DeltaTableError::VersionAlreadyExists(version)
227            }
228            TransactionError::SerializeLogJson { json_err } => {
229                DeltaTableError::SerializeLogJson { json_err }
230            }
231            TransactionError::ObjectStore { source } => DeltaTableError::ObjectStore { source },
232            other => DeltaTableError::Transaction { source: other },
233        }
234    }
235}
236
237/// Error raised while commititng transaction
238#[derive(thiserror::Error, Debug)]
239pub enum CommitBuilderError {}
240
241impl From<CommitBuilderError> for DeltaTableError {
242    fn from(err: CommitBuilderError) -> Self {
243        DeltaTableError::CommitValidation { source: err }
244    }
245}
246
247/// Reference to some structure that contains mandatory attributes for performing a commit.
248pub trait TableReference: Send + Sync {
249    /// Well known table configuration
250    fn config(&self) -> &TableProperties;
251
252    /// Get the table protocol of the snapshot
253    fn protocol(&self) -> &Protocol;
254
255    /// Get the table metadata of the snapshot
256    fn metadata(&self) -> &Metadata;
257
258    /// Try to cast this table reference to a `EagerSnapshot`
259    fn eager_snapshot(&self) -> &EagerSnapshot;
260}
261
262impl TableReference for EagerSnapshot {
263    fn protocol(&self) -> &Protocol {
264        EagerSnapshot::protocol(self)
265    }
266
267    fn metadata(&self) -> &Metadata {
268        EagerSnapshot::metadata(self)
269    }
270
271    fn config(&self) -> &TableProperties {
272        self.table_properties()
273    }
274
275    fn eager_snapshot(&self) -> &EagerSnapshot {
276        self
277    }
278}
279
280impl TableReference for DeltaTableState {
281    fn config(&self) -> &TableProperties {
282        self.snapshot.config()
283    }
284
285    fn protocol(&self) -> &Protocol {
286        self.snapshot.protocol()
287    }
288
289    fn metadata(&self) -> &Metadata {
290        self.snapshot.metadata()
291    }
292
293    fn eager_snapshot(&self) -> &EagerSnapshot {
294        &self.snapshot
295    }
296}
297
298/// Data that was actually written to the log store.
299#[derive(Debug)]
300pub struct CommitData {
301    /// The actions
302    pub actions: Vec<Action>,
303    /// The Operation
304    pub operation: DeltaOperation,
305    /// The Metadata
306    pub app_metadata: HashMap<String, Value>,
307    /// Application specific transaction
308    pub app_transactions: Vec<Transaction>,
309}
310
311/// Moves reserved commit metadata into typed CommitInfo fields.
312///
313/// This prevents serde flatten from serializing duplicate commitInfo keys when callers pass fields
314/// such as readVersion or operationParameters through custom metadata. Rust callers keep the
315/// existing tolerant behavior: invalid reserved metadata is logged and dropped. Python validates
316/// the same key set early in validate_reserved_commit_metadata and raises ValueError instead.
317/// See issue 4443 for the Python custom metadata request that introduced this normalization path.
318fn normalize_reserved_commit_metadata(
319    commit_info: &mut CommitInfo,
320    app_metadata: &mut HashMap<String, Value>,
321) {
322    match app_metadata.remove("operationParameters") {
323        Some(Value::Object(operation_parameters)) => {
324            let generated_parameters = commit_info
325                .operation_parameters
326                .get_or_insert_with(HashMap::new);
327            for (key, value) in operation_parameters {
328                generated_parameters
329                    .entry(key)
330                    .or_insert_with(|| operation_parameter_value(value));
331            }
332        }
333        Some(value) => log_unexpected_reserved_metadata_type(
334            "operationParameters",
335            &value,
336            "object with string-compatible values",
337        ),
338        None => {}
339    }
340
341    if let Some(value) = app_metadata.remove("readVersion") {
342        if let Some(value) = value.as_u64() {
343            if commit_info.read_version.is_none() {
344                commit_info.read_version = Some(value);
345            }
346        } else {
347            log_unexpected_reserved_metadata_type("readVersion", &value, "non-negative integer");
348        }
349    }
350
351    promote_string_reserved_metadata(app_metadata, "userId", &mut commit_info.user_id);
352    promote_string_reserved_metadata(app_metadata, "userName", &mut commit_info.user_name);
353    promote_string_reserved_metadata(app_metadata, "userMetadata", &mut commit_info.user_metadata);
354
355    if let Some(value) = app_metadata.remove("isolationLevel") {
356        if let Some(value) = value
357            .as_str()
358            .and_then(|value| IsolationLevel::from_str(value).ok())
359        {
360            if commit_info.isolation_level.is_none() {
361                commit_info.isolation_level = Some(value);
362            }
363        } else {
364            log_unexpected_reserved_metadata_type(
365                "isolationLevel",
366                &value,
367                "valid IsolationLevel string",
368            );
369        }
370    }
371
372    if let Some(value) = app_metadata.remove("isBlindAppend") {
373        if let Some(value) = value.as_bool() {
374            if commit_info.is_blind_append.is_none() {
375                commit_info.is_blind_append = Some(value);
376            }
377        } else {
378            log_unexpected_reserved_metadata_type("isBlindAppend", &value, "boolean");
379        }
380    }
381
382    for key in RESERVED_COMMIT_INFO_KEYS {
383        app_metadata.remove(*key);
384    }
385}
386
387fn promote_string_reserved_metadata(
388    metadata: &mut HashMap<String, Value>,
389    key: &'static str,
390    target: &mut Option<String>,
391) {
392    let Some(value) = metadata.remove(key) else {
393        return;
394    };
395
396    if let Value::String(value) = value {
397        if target.is_none() {
398            *target = Some(value);
399        }
400    } else {
401        log_unexpected_reserved_metadata_type(key, &value, "string");
402    }
403}
404
405fn log_unexpected_reserved_metadata_type(key: &str, value: &Value, expected: &str) {
406    debug!(
407        key,
408        expected,
409        actual = json_value_kind(value),
410        "Ignoring reserved commit metadata key with unexpected type"
411    );
412}
413
414fn json_value_kind(value: &Value) -> &'static str {
415    match value {
416        Value::Null => "null",
417        Value::Bool(_) => "boolean",
418        Value::Number(_) => "number",
419        Value::String(_) => "string",
420        Value::Array(_) => "array",
421        Value::Object(_) => "object",
422    }
423}
424
425fn assign_commit_info_metadata(
426    commit_info: &mut CommitInfo,
427    app_metadata: &mut HashMap<String, Value>,
428) {
429    normalize_reserved_commit_metadata(commit_info, app_metadata);
430
431    let mut existing_info = std::mem::take(&mut commit_info.info);
432    normalize_reserved_commit_metadata(commit_info, &mut existing_info);
433    // app_metadata is the override layer for flattened commit info. It fills missing typed fields
434    // above, and wins over provided CommitInfo.info keys for custom fields.
435    for (key, value) in existing_info {
436        app_metadata.entry(key).or_insert(value);
437    }
438    debug_assert!(
439        RESERVED_COMMIT_INFO_KEYS
440            .iter()
441            .all(|key| !app_metadata.contains_key(*key))
442    );
443
444    commit_info.info = app_metadata.clone();
445}
446
447impl CommitData {
448    /// Create new data to be committed
449    pub fn new(
450        mut actions: Vec<Action>,
451        operation: DeltaOperation,
452        mut app_metadata: HashMap<String, Value>,
453        app_transactions: Vec<Transaction>,
454    ) -> Self {
455        if let Some(Action::CommitInfo(commit_info)) = actions
456            .iter_mut()
457            .find(|action| matches!(action, Action::CommitInfo(..)))
458        {
459            // Callers that provide a CommitInfo action own generated typed fields such as
460            // timestamp and engineInfo. Flattened app metadata overrides custom keys, and callers
461            // may provide clientVersion for compatibility.
462            assign_commit_info_metadata(commit_info, &mut app_metadata);
463        } else {
464            let mut commit_info = operation.get_commit_info();
465            commit_info.timestamp = Some(Utc::now().timestamp_millis());
466            // clientVersion is provenance metadata, but it is not reserved. Callers can override it
467            // while engineInfo remains a generated typed CommitInfo field.
468            app_metadata
469                .entry("clientVersion".to_string())
470                .or_insert(Value::String(format!("delta-rs.{}", crate_version())));
471            assign_commit_info_metadata(&mut commit_info, &mut app_metadata);
472            // commit info should be the first action to support in-commit timestamps.
473            actions.insert(0, Action::CommitInfo(commit_info));
474        }
475
476        for txn in &app_transactions {
477            actions.push(Action::Txn(txn.clone()))
478        }
479
480        CommitData {
481            actions,
482            operation,
483            app_metadata,
484            app_transactions,
485        }
486    }
487
488    /// Obtain the byte representation of the commit.
489    pub fn get_bytes(&self) -> Result<bytes::Bytes, TransactionError> {
490        let mut jsons = Vec::<String>::new();
491        for action in &self.actions {
492            let json = serde_json::to_string(action)
493                .map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
494            jsons.push(json);
495        }
496        Ok(bytes::Bytes::from(jsons.join("\n")))
497    }
498}
499
500#[derive(Clone, Debug, Copy)]
501/// Properties for post commit hook.
502pub struct PostCommitHookProperties {
503    create_checkpoint: bool,
504    /// Override the EnableExpiredLogCleanUp setting, if None config setting is used
505    cleanup_expired_logs: Option<bool>,
506}
507
508#[derive(Clone, Debug)]
509/// End user facing interface to be used by operations on the table.
510/// Enable controlling commit behaviour and modifying metadata that is written during a commit.
511pub struct CommitProperties {
512    pub(crate) app_metadata: HashMap<String, Value>,
513    pub(crate) app_transaction: Vec<Transaction>,
514    max_retries: usize,
515    create_checkpoint: bool,
516    cleanup_expired_logs: Option<bool>,
517}
518
519impl Default for CommitProperties {
520    fn default() -> Self {
521        Self {
522            app_metadata: Default::default(),
523            app_transaction: Vec::new(),
524            max_retries: DEFAULT_RETRIES,
525            create_checkpoint: true,
526            cleanup_expired_logs: None,
527        }
528    }
529}
530
531impl CommitProperties {
532    /// Specify metadata the be committed
533    pub fn with_metadata(
534        mut self,
535        metadata: impl IntoIterator<Item = (String, serde_json::Value)>,
536    ) -> Self {
537        self.app_metadata = HashMap::from_iter(metadata);
538        self
539    }
540
541    /// Specify maximum number of times to retry the transaction before failing to commit
542    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
543        self.max_retries = max_retries;
544        self
545    }
546
547    /// Specify if it should create a checkpoint when the commit interval condition is met
548    pub fn with_create_checkpoint(mut self, create_checkpoint: bool) -> Self {
549        self.create_checkpoint = create_checkpoint;
550        self
551    }
552
553    /// Add an additional application transaction to the commit
554    pub fn with_application_transaction(mut self, txn: Transaction) -> Self {
555        self.app_transaction.push(txn);
556        self
557    }
558
559    /// Override application transactions for the commit
560    pub fn with_application_transactions(mut self, txn: Vec<Transaction>) -> Self {
561        self.app_transaction = txn;
562        self
563    }
564
565    /// Specify if it should clean up the logs when the logRetentionDuration interval is met
566    pub fn with_cleanup_expired_logs(mut self, cleanup_expired_logs: Option<bool>) -> Self {
567        self.cleanup_expired_logs = cleanup_expired_logs;
568        self
569    }
570}
571
572impl From<CommitProperties> for CommitBuilder {
573    fn from(value: CommitProperties) -> Self {
574        CommitBuilder {
575            max_retries: value.max_retries,
576            app_metadata: value.app_metadata,
577            post_commit_hook: Some(PostCommitHookProperties {
578                create_checkpoint: value.create_checkpoint,
579                cleanup_expired_logs: value.cleanup_expired_logs,
580            }),
581            app_transaction: value.app_transaction,
582            ..Default::default()
583        }
584    }
585}
586
587/// Prepare data to be committed to the Delta log and control how the commit is performed
588pub struct CommitBuilder {
589    actions: Vec<Action>,
590    app_metadata: HashMap<String, Value>,
591    app_transaction: Vec<Transaction>,
592    max_retries: usize,
593    post_commit_hook: Option<PostCommitHookProperties>,
594    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
595    operation_id: Uuid,
596}
597
598impl Default for CommitBuilder {
599    fn default() -> Self {
600        CommitBuilder {
601            actions: Vec::new(),
602            app_metadata: HashMap::new(),
603            app_transaction: Vec::new(),
604            max_retries: DEFAULT_RETRIES,
605            post_commit_hook: None,
606            post_commit_hook_handler: None,
607            operation_id: Uuid::new_v4(),
608        }
609    }
610}
611
612impl<'a> CommitBuilder {
613    /// Actions to be included in the commit
614    pub fn with_actions(mut self, actions: Vec<Action>) -> Self {
615        self.actions = actions;
616        self
617    }
618
619    /// Metadata for the operation performed like metrics, user, and notebook
620    pub fn with_app_metadata(mut self, app_metadata: HashMap<String, Value>) -> Self {
621        self.app_metadata = app_metadata;
622        self
623    }
624
625    /// Maximum number of times to retry the transaction before failing to commit
626    pub fn with_max_retries(mut self, max_retries: usize) -> Self {
627        self.max_retries = max_retries;
628        self
629    }
630
631    /// Specify all the post commit hook properties
632    pub fn with_post_commit_hook(mut self, post_commit_hook: PostCommitHookProperties) -> Self {
633        self.post_commit_hook = Some(post_commit_hook);
634        self
635    }
636
637    /// Propagate operation id to log store
638    pub fn with_operation_id(mut self, operation_id: Uuid) -> Self {
639        self.operation_id = operation_id;
640        self
641    }
642
643    /// Set a custom execute handler, for pre and post execution
644    pub fn with_post_commit_hook_handler(
645        mut self,
646        handler: Option<Arc<dyn CustomExecuteHandler>>,
647    ) -> Self {
648        self.post_commit_hook_handler = handler;
649        self
650    }
651
652    /// Prepare a Commit operation using the configured builder
653    pub fn build(
654        self,
655        table_data: Option<&'a dyn TableReference>,
656        log_store: LogStoreRef,
657        operation: DeltaOperation,
658    ) -> PreCommit<'a> {
659        let data = CommitData::new(
660            self.actions,
661            operation,
662            self.app_metadata,
663            self.app_transaction,
664        );
665        PreCommit {
666            log_store,
667            table_data,
668            max_retries: self.max_retries,
669            data,
670            post_commit_hook: self.post_commit_hook,
671            post_commit_hook_handler: self.post_commit_hook_handler,
672            operation_id: self.operation_id,
673        }
674    }
675}
676
677/// Represents a commit that has not yet started but all details are finalized
678pub struct PreCommit<'a> {
679    log_store: LogStoreRef,
680    table_data: Option<&'a dyn TableReference>,
681    data: CommitData,
682    max_retries: usize,
683    post_commit_hook: Option<PostCommitHookProperties>,
684    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
685    operation_id: Uuid,
686}
687
688impl<'a> std::future::IntoFuture for PreCommit<'a> {
689    type Output = DeltaResult<FinalizedCommit>;
690    type IntoFuture = BoxFuture<'a, Self::Output>;
691
692    fn into_future(self) -> Self::IntoFuture {
693        Box::pin(async move { self.into_prepared_commit_future().await?.await?.await })
694    }
695}
696
697impl<'a> PreCommit<'a> {
698    /// Prepare the commit but do not finalize it
699    pub fn into_prepared_commit_future(self) -> BoxFuture<'a, DeltaResult<PreparedCommit<'a>>> {
700        let this = self;
701
702        // Write delta log entry as temporary file to storage. For the actual commit,
703        // the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
704        async fn write_tmp_commit(
705            log_entry: Bytes,
706            store: ObjectStoreRef,
707        ) -> DeltaResult<CommitOrBytes> {
708            let token = uuid::Uuid::new_v4().to_string();
709            let path = Path::from_iter([DELTA_LOG_FOLDER, &format!("_commit_{token}.json.tmp")]);
710            store.put(&path, log_entry.into()).await?;
711            Ok(CommitOrBytes::TmpCommit(path))
712        }
713
714        Box::pin(async move {
715            if let Some(table_reference) = this.table_data {
716                PROTOCOL.can_commit(table_reference, &this.data.actions, &this.data.operation)?;
717            }
718            let log_entry = this.data.get_bytes()?;
719
720            // With the DefaultLogStore & LakeFSLogstore, we just pass the bytes around, since we use conditionalPuts
721            // Other stores will use tmp_commits
722            let commit_or_bytes = if ["LakeFSLogStore", "DefaultLogStore"]
723                .contains(&this.log_store.name().as_str())
724            {
725                CommitOrBytes::LogBytes(log_entry)
726            } else {
727                write_tmp_commit(
728                    log_entry,
729                    this.log_store.object_store(Some(this.operation_id)),
730                )
731                .await?
732            };
733
734            Ok(PreparedCommit {
735                commit_or_bytes,
736                log_store: this.log_store,
737                table_data: this.table_data,
738                max_retries: this.max_retries,
739                data: this.data,
740                post_commit: this.post_commit_hook,
741                post_commit_hook_handler: this.post_commit_hook_handler,
742                operation_id: this.operation_id,
743            })
744        })
745    }
746}
747
748/// Represents a inflight commit
749pub struct PreparedCommit<'a> {
750    commit_or_bytes: CommitOrBytes,
751    log_store: LogStoreRef,
752    data: CommitData,
753    table_data: Option<&'a dyn TableReference>,
754    max_retries: usize,
755    post_commit: Option<PostCommitHookProperties>,
756    post_commit_hook_handler: Option<Arc<dyn CustomExecuteHandler>>,
757    operation_id: Uuid,
758}
759
760impl PreparedCommit<'_> {
761    /// The temporary commit file created
762    pub fn commit_or_bytes(&self) -> &CommitOrBytes {
763        &self.commit_or_bytes
764    }
765}
766
767impl<'a> std::future::IntoFuture for PreparedCommit<'a> {
768    type Output = DeltaResult<PostCommit>;
769    type IntoFuture = BoxFuture<'a, Self::Output>;
770
771    fn into_future(self) -> Self::IntoFuture {
772        let this = self;
773
774        Box::pin(async move {
775            let commit_or_bytes = this.commit_or_bytes;
776
777            let mut attempt_number: usize = 1;
778
779            // Handle the case where table doesn't exist yet (initial table creation)
780            let read_snapshot: EagerSnapshot = if let Some(table_data) = this.table_data {
781                table_data.eager_snapshot().clone()
782            } else {
783                debug!("committing initial table version 0");
784                match this
785                    .log_store
786                    .write_commit_entry(0, commit_or_bytes.clone(), this.operation_id)
787                    .await
788                {
789                    Ok(_) => {
790                        return Ok(PostCommit {
791                            version: 0,
792                            data: this.data,
793                            create_checkpoint: false,
794                            cleanup_expired_logs: None,
795                            log_store: this.log_store,
796                            table_data: None,
797                            custom_execute_handler: this.post_commit_hook_handler,
798                            metrics: CommitMetrics { num_retries: 0 },
799                        });
800                    }
801                    Err(TransactionError::VersionAlreadyExists(0)) => {
802                        // Table was created by another writer since the `this.table_data.is_none()` check.
803                        // Load the current table state and continue with the retry loop.
804                        debug!("version 0 already exists, loading table state for retry");
805                        attempt_number = 2;
806                        let latest_version: Version = this.log_store.get_latest_version(0).await?;
807                        EagerSnapshot::try_new(
808                            this.log_store.as_ref(),
809                            Default::default(),
810                            Some(latest_version),
811                        )
812                        .await?
813                    }
814                    Err(e) => return Err(e.into()),
815                }
816            };
817
818            let mut read_snapshot = read_snapshot;
819
820            let commit_span = info_span!(
821                "commit_with_retries",
822                base_version = read_snapshot.version(),
823                max_retries = this.max_retries,
824                attempt = field::Empty,
825                target_version = field::Empty,
826                conflicts_checked = 0
827            );
828
829            async move {
830                let total_retries = this.max_retries + 1;
831                while attempt_number <= total_retries {
832                    Span::current().record("attempt", attempt_number);
833                    let latest_version = this
834                        .log_store
835                        .get_latest_version(read_snapshot.version())
836                        .await?;
837
838                    if latest_version > read_snapshot.version() {
839                        // If max_retries are set to 0, do not try to use the conflict checker to resolve the conflict
840                        // and throw immediately
841                        if this.max_retries == 0 {
842                            warn!(
843                                base_version = read_snapshot.version(),
844                                latest_version = latest_version,
845                                "table updated but max_retries is 0, failing immediately"
846                            );
847                            return Err(TransactionError::MaxCommitAttempts(
848                                this.max_retries as i32,
849                            )
850                            .into());
851                        }
852                        warn!(
853                            base_version = read_snapshot.version(),
854                            latest_version = latest_version,
855                            versions_behind = latest_version - read_snapshot.version(),
856                            "table updated during transaction, checking for conflicts"
857                        );
858                        let mut steps = latest_version - read_snapshot.version();
859                        let mut conflicts_checked = 0;
860
861                        // Need to check for conflicts with each version between the read_snapshot and
862                        // the latest!
863                        while steps != 0 {
864                            conflicts_checked += 1;
865                            let summary = WinningCommitSummary::try_new(
866                                this.log_store.as_ref(),
867                                latest_version - steps,
868                                (latest_version - steps) + 1,
869                            )
870                            .await?;
871                            let transaction_info = TransactionInfo::try_new(
872                                read_snapshot.log_data(),
873                                this.data.operation.read_predicate(),
874                                &this.data.actions,
875                                this.data.operation.read_whole_table(),
876                            )?;
877                            let conflict_checker = ConflictChecker::new(
878                                transaction_info,
879                                summary,
880                                Some(&this.data.operation),
881                            );
882
883                            match conflict_checker.check_conflicts() {
884                                Ok(_) => {}
885                                Err(err) => {
886                                    error!(
887                                        conflicts_checked = conflicts_checked,
888                                        error = %err,
889                                        "conflict detected, aborting transaction"
890                                    );
891                                    return Err(TransactionError::CommitConflict(err).into());
892                                }
893                            }
894                            steps -= 1;
895                        }
896                        Span::current().record("conflicts_checked", conflicts_checked);
897                        debug!(
898                            conflicts_checked = conflicts_checked,
899                            "all conflicts resolved, updating snapshot"
900                        );
901                        // Update snapshot to latest version after successful conflict check
902                        read_snapshot
903                            .update(&this.log_store, Some(latest_version))
904                            .await?;
905                    }
906                    let version: Version = latest_version + 1;
907                    Span::current().record("target_version", version);
908
909                    match this
910                        .log_store
911                        .write_commit_entry(version, commit_or_bytes.clone(), this.operation_id)
912                        .await
913                    {
914                        Ok(()) => {
915                            info!(
916                                version = version,
917                                num_retries = attempt_number - 1,
918                                "transaction committed successfully"
919                            );
920                            return Ok(PostCommit {
921                                version,
922                                data: this.data,
923                                create_checkpoint: this
924                                    .post_commit
925                                    .map(|v| v.create_checkpoint)
926                                    .unwrap_or_default(),
927                                cleanup_expired_logs: this
928                                    .post_commit
929                                    .map(|v| v.cleanup_expired_logs)
930                                    .unwrap_or_default(),
931                                log_store: this.log_store,
932                                table_data: Some(Box::new(read_snapshot)),
933                                custom_execute_handler: this.post_commit_hook_handler,
934                                metrics: CommitMetrics {
935                                    num_retries: (attempt_number - 1) as u64,
936                                },
937                            });
938                        }
939                        Err(TransactionError::VersionAlreadyExists(version)) => {
940                            warn!(
941                                version = version,
942                                attempt = attempt_number,
943                                "version already exists, will retry"
944                            );
945                            // If the version already exists, loop through again and re-check
946                            // conflicts
947                            attempt_number += 1;
948                        }
949                        Err(err) => {
950                            error!(
951                                version = version,
952                                error = %err,
953                                "commit failed, aborting"
954                            );
955                            this.log_store
956                                .abort_commit_entry(version, commit_or_bytes, this.operation_id)
957                                .await?;
958                            return Err(err.into());
959                        }
960                    }
961                }
962
963                error!(
964                    max_retries = this.max_retries,
965                    "exceeded maximum commit attempts"
966                );
967                Err(TransactionError::MaxCommitAttempts(this.max_retries as i32).into())
968            }
969            .instrument(commit_span)
970            .await
971        })
972    }
973}
974
975/// Represents items for the post commit hook
976pub struct PostCommit {
977    /// The winning version number of the commit
978    pub version: Version,
979    /// The data that was committed to the log store
980    pub data: CommitData,
981    create_checkpoint: bool,
982    cleanup_expired_logs: Option<bool>,
983    log_store: LogStoreRef,
984    table_data: Option<Box<dyn TableReference>>,
985    custom_execute_handler: Option<Arc<dyn CustomExecuteHandler>>,
986    metrics: CommitMetrics,
987}
988
989impl PostCommit {
990    /// Runs the post commit activities
991    async fn run_post_commit_hook(&self) -> DeltaResult<(DeltaTableState, PostCommitMetrics)> {
992        if let Some(table) = &self.table_data {
993            let post_commit_operation_id = Uuid::new_v4();
994            let mut snapshot = table.eager_snapshot().clone();
995            if self.version != snapshot.version() {
996                snapshot.update(&self.log_store, Some(self.version)).await?;
997            }
998
999            let mut state = DeltaTableState { snapshot };
1000
1001            let cleanup_logs = if let Some(cleanup_logs) = self.cleanup_expired_logs {
1002                cleanup_logs
1003            } else {
1004                state.table_config().enable_expired_log_cleanup()
1005            };
1006
1007            // Run arbitrary before_post_commit_hook code
1008            if let Some(custom_execute_handler) = &self.custom_execute_handler {
1009                custom_execute_handler
1010                    .before_post_commit_hook(
1011                        &self.log_store,
1012                        cleanup_logs || self.create_checkpoint,
1013                        post_commit_operation_id,
1014                    )
1015                    .await?
1016            }
1017
1018            let mut new_checkpoint_created = false;
1019            if self.create_checkpoint {
1020                // Execute create checkpoint hook
1021                new_checkpoint_created = self
1022                    .create_checkpoint(
1023                        &state,
1024                        &self.log_store,
1025                        self.version,
1026                        post_commit_operation_id,
1027                    )
1028                    .await?;
1029            }
1030
1031            let mut num_log_files_cleaned_up: u64 = 0;
1032            if cleanup_logs {
1033                // Execute clean up logs hook
1034                num_log_files_cleaned_up = cleanup_expired_logs_for(
1035                    self.version,
1036                    self.log_store.as_ref(),
1037                    Utc::now().timestamp_millis()
1038                        - state.table_config().log_retention_duration().as_millis() as i64,
1039                    Some(post_commit_operation_id),
1040                )
1041                .await? as u64;
1042                if num_log_files_cleaned_up > 0 {
1043                    state = DeltaTableState::try_new(
1044                        &self.log_store,
1045                        state.load_config().clone(),
1046                        Some(self.version),
1047                    )
1048                    .await?;
1049                }
1050            }
1051
1052            // Run arbitrary after_post_commit_hook code
1053            if let Some(custom_execute_handler) = &self.custom_execute_handler {
1054                custom_execute_handler
1055                    .after_post_commit_hook(
1056                        &self.log_store,
1057                        cleanup_logs || self.create_checkpoint,
1058                        post_commit_operation_id,
1059                    )
1060                    .await?
1061            }
1062            Ok((
1063                state,
1064                PostCommitMetrics {
1065                    new_checkpoint_created,
1066                    num_log_files_cleaned_up,
1067                },
1068            ))
1069        } else {
1070            let state =
1071                DeltaTableState::try_new(&self.log_store, Default::default(), Some(self.version))
1072                    .await?;
1073            Ok((
1074                state,
1075                PostCommitMetrics {
1076                    new_checkpoint_created: false,
1077                    num_log_files_cleaned_up: 0,
1078                },
1079            ))
1080        }
1081    }
1082    async fn create_checkpoint(
1083        &self,
1084        table_state: &DeltaTableState,
1085        log_store: &LogStoreRef,
1086        version: Version,
1087        operation_id: Uuid,
1088    ) -> DeltaResult<bool> {
1089        if !table_state.load_config().require_files {
1090            warn!(
1091                "Checkpoint creation in post_commit_hook has been skipped due to table being initialized without files."
1092            );
1093            return Ok(false);
1094        }
1095
1096        let checkpoint_interval = table_state.config().checkpoint_interval().get();
1097        if (version + 1).is_multiple_of(checkpoint_interval) {
1098            create_checkpoint_for(version, log_store.as_ref(), Some(operation_id)).await?;
1099            Ok(true)
1100        } else {
1101            Ok(false)
1102        }
1103    }
1104}
1105
1106/// A commit that successfully completed
1107pub struct FinalizedCommit {
1108    /// The new table state after a commit
1109    pub snapshot: DeltaTableState,
1110
1111    /// Version of the finalized commit
1112    pub version: Version,
1113
1114    /// Metrics associated with the commit operation
1115    pub metrics: Metrics,
1116}
1117
1118impl std::fmt::Debug for FinalizedCommit {
1119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1120        f.debug_struct("FinalizedCommit")
1121            .field("version", &self.version)
1122            .field("metrics", &self.metrics)
1123            .finish()
1124    }
1125}
1126
1127impl FinalizedCommit {
1128    /// The new table state after a commit
1129    pub fn snapshot(&self) -> DeltaTableState {
1130        self.snapshot.clone()
1131    }
1132    /// Version of the finalized commit
1133    pub fn version(&self) -> Version {
1134        self.version
1135    }
1136}
1137
1138impl std::future::IntoFuture for PostCommit {
1139    type Output = DeltaResult<FinalizedCommit>;
1140    type IntoFuture = BoxFuture<'static, Self::Output>;
1141
1142    fn into_future(self) -> Self::IntoFuture {
1143        let this = self;
1144
1145        Box::pin(async move {
1146            match this.run_post_commit_hook().await {
1147                Ok((snapshot, post_commit_metrics)) => Ok(FinalizedCommit {
1148                    snapshot,
1149                    version: this.version,
1150                    metrics: Metrics {
1151                        num_retries: this.metrics.num_retries,
1152                        new_checkpoint_created: post_commit_metrics.new_checkpoint_created,
1153                        num_log_files_cleaned_up: post_commit_metrics.num_log_files_cleaned_up,
1154                    },
1155                }),
1156                Err(err) => Err(err),
1157            }
1158        })
1159    }
1160}
1161
1162#[cfg(test)]
1163mod tests {
1164    use std::sync::Arc;
1165
1166    use super::*;
1167    use crate::kernel::IsolationLevel;
1168    use crate::logstore::{LogStore, StorageConfig, default_logstore::DefaultLogStore};
1169    use crate::protocol::SaveMode;
1170    use object_store::{PutPayload, memory::InMemory};
1171    use serde_json::json;
1172    use url::Url;
1173
1174    #[tokio::test]
1175    async fn test_try_commit_transaction() {
1176        let store = Arc::new(InMemory::new());
1177        let url = Url::parse("mem://what/is/this").unwrap();
1178        let log_store = DefaultLogStore::new(
1179            store.clone(),
1180            store.clone(),
1181            crate::logstore::LogStoreConfig::new(&url, StorageConfig::default()),
1182        );
1183        let version_path = Path::from("_delta_log/00000000000000000000.json");
1184        store.put(&version_path, PutPayload::new()).await.unwrap();
1185
1186        let res = log_store
1187            .write_commit_entry(
1188                0,
1189                CommitOrBytes::LogBytes(PutPayload::new().into()),
1190                Uuid::new_v4(),
1191            )
1192            .await;
1193        // fails if file version already exists
1194        assert!(res.is_err());
1195
1196        // succeeds for next version
1197        log_store
1198            .write_commit_entry(
1199                1,
1200                CommitOrBytes::LogBytes(PutPayload::new().into()),
1201                Uuid::new_v4(),
1202            )
1203            .await
1204            .unwrap();
1205    }
1206
1207    #[test]
1208    fn test_commit_with_retries_tracing_span() {
1209        let span = info_span!(
1210            "commit_with_retries",
1211            base_version = 5,
1212            max_retries = 10,
1213            attempt = field::Empty,
1214            target_version = field::Empty,
1215            conflicts_checked = 0
1216        );
1217
1218        let metadata = span.metadata().expect("span should have metadata");
1219        assert_eq!(metadata.name(), "commit_with_retries");
1220        assert_eq!(metadata.level(), &Level::INFO);
1221        assert!(metadata.is_span());
1222
1223        span.record("attempt", 1);
1224        span.record("target_version", 6);
1225        span.record("conflicts_checked", 2);
1226    }
1227
1228    #[test]
1229    fn test_commit_properties_with_retries() {
1230        let props = CommitProperties::default()
1231            .with_max_retries(5)
1232            .with_create_checkpoint(false);
1233
1234        assert_eq!(props.max_retries, 5);
1235        assert!(!props.create_checkpoint);
1236    }
1237
1238    #[test]
1239    fn test_commit_metrics() {
1240        let metrics = CommitMetrics { num_retries: 3 };
1241        assert_eq!(metrics.num_retries, 3);
1242    }
1243
1244    #[test]
1245    fn test_commit_data_client_version() {
1246        let no_metadata = CommitData::new(
1247            vec![],
1248            DeltaOperation::FileSystemCheck {},
1249            HashMap::new(),
1250            vec![],
1251        );
1252        assert_eq!(
1253            *no_metadata.app_metadata.get("clientVersion").unwrap(),
1254            json!(format!("delta-rs.{}", crate_version()))
1255        );
1256
1257        let with_metadata = CommitData::new(
1258            vec![],
1259            DeltaOperation::FileSystemCheck {},
1260            HashMap::from([("clientVersion".to_owned(), json!("test-client.0.0.1"))]),
1261            vec![],
1262        );
1263        assert_eq!(
1264            *with_metadata.app_metadata.get("clientVersion").unwrap(),
1265            json!("test-client.0.0.1")
1266        );
1267    }
1268
1269    fn commit_info(data: &CommitData) -> &CommitInfo {
1270        match &data.actions[0] {
1271            Action::CommitInfo(info) => info,
1272            action => panic!("expected first action to be commitInfo, got {action:?}"),
1273        }
1274    }
1275
1276    #[test]
1277    fn test_commit_data_strips_and_promotes_reserved_metadata() {
1278        let data = CommitData::new(
1279            vec![],
1280            DeltaOperation::FileSystemCheck {},
1281            HashMap::from([
1282                ("readVersion".to_owned(), json!(7)),
1283                ("userId".to_owned(), json!("user-1")),
1284                ("userName".to_owned(), json!("Jane Doe")),
1285                ("userMetadata".to_owned(), json!("metadata")),
1286                ("isolationLevel".to_owned(), json!("SnapshotIsolation")),
1287                ("isBlindAppend".to_owned(), json!(true)),
1288                ("timestamp".to_owned(), json!(1)),
1289                ("operation".to_owned(), json!("CUSTOM OPERATION")),
1290                ("engineInfo".to_owned(), json!("custom-engine")),
1291                ("custom".to_owned(), json!({"kept": true})),
1292            ]),
1293            vec![],
1294        );
1295
1296        let info = commit_info(&data);
1297        assert_eq!(info.read_version, Some(7));
1298        assert_eq!(info.user_id.as_deref(), Some("user-1"));
1299        assert_eq!(info.user_name.as_deref(), Some("Jane Doe"));
1300        assert_eq!(info.user_metadata.as_deref(), Some("metadata"));
1301        assert_eq!(
1302            info.isolation_level,
1303            Some(IsolationLevel::SnapshotIsolation)
1304        );
1305        assert_eq!(info.is_blind_append, Some(true));
1306        assert_eq!(info.operation.as_deref(), Some("FSCK"));
1307        assert_ne!(info.engine_info.as_deref(), Some("custom-engine"));
1308        assert_eq!(info.info.get("custom"), Some(&json!({"kept": true})));
1309
1310        for key in [
1311            "timestamp",
1312            "userId",
1313            "userName",
1314            "operation",
1315            "operationParameters",
1316            "readVersion",
1317            "isolationLevel",
1318            "isBlindAppend",
1319            "engineInfo",
1320            "userMetadata",
1321        ] {
1322            assert!(
1323                !info.info.contains_key(key),
1324                "reserved key {key} must not remain in flattened commit info"
1325            );
1326        }
1327    }
1328
1329    #[test]
1330    fn test_commit_data_merges_operation_parameters_generated_keys_win() {
1331        let data = CommitData::new(
1332            vec![],
1333            DeltaOperation::Write {
1334                mode: SaveMode::Overwrite,
1335                partition_by: Some(vec!["id".to_owned()]),
1336                predicate: None,
1337            },
1338            HashMap::from([(
1339                "operationParameters".to_owned(),
1340                json!({
1341                    "mode": "custom-mode",
1342                    "partitionBy": "custom-partitioning",
1343                    "customParameter": {"kept": true},
1344                    "customBoolean": true,
1345                    "customNumber": 7,
1346                }),
1347            )]),
1348            vec![],
1349        );
1350
1351        let info = commit_info(&data);
1352        let operation_parameters = info
1353            .operation_parameters
1354            .as_ref()
1355            .expect("operation parameters should be present");
1356        assert_eq!(operation_parameters.get("mode"), Some(&json!("Overwrite")));
1357        assert_eq!(
1358            operation_parameters.get("partitionBy"),
1359            Some(&json!("[\"id\"]"))
1360        );
1361        assert_eq!(
1362            operation_parameters.get("customParameter"),
1363            Some(&json!("{\"kept\":true}"))
1364        );
1365        assert_eq!(
1366            operation_parameters.get("customBoolean"),
1367            Some(&json!("true"))
1368        );
1369        assert_eq!(operation_parameters.get("customNumber"), Some(&json!("7")));
1370        assert!(!info.info.contains_key("operationParameters"));
1371    }
1372
1373    #[test]
1374    fn test_commit_data_normalizes_reserved_keys_from_existing_commit_info_info() {
1375        let data = CommitData::new(
1376            vec![Action::CommitInfo(CommitInfo {
1377                info: HashMap::from([
1378                    ("userName".to_owned(), json!("shadow-user")),
1379                    (
1380                        "operationParameters".to_owned(),
1381                        json!({"custom": {"kept": true}}),
1382                    ),
1383                    ("custom".to_owned(), json!("kept")),
1384                ]),
1385                ..Default::default()
1386            })],
1387            DeltaOperation::FileSystemCheck {},
1388            HashMap::from([("custom".to_owned(), json!("metadata-wins"))]),
1389            vec![],
1390        );
1391
1392        let info = commit_info(&data);
1393        assert_eq!(info.user_name.as_deref(), Some("shadow-user"));
1394        let operation_parameters = info
1395            .operation_parameters
1396            .as_ref()
1397            .expect("operation parameters should be promoted");
1398        assert_eq!(
1399            operation_parameters.get("custom"),
1400            Some(&json!("{\"kept\":true}"))
1401        );
1402        assert_eq!(info.info.get("custom"), Some(&json!("metadata-wins")));
1403        assert!(!info.info.contains_key("userName"));
1404        assert!(!info.info.contains_key("operationParameters"));
1405    }
1406
1407    #[test]
1408    fn test_commit_data_does_not_promote_reserved_metadata_over_typed_fields() {
1409        let data = CommitData::new(
1410            vec![Action::CommitInfo(CommitInfo {
1411                user_name: Some("typed-user".to_owned()),
1412                read_version: Some(10),
1413                ..Default::default()
1414            })],
1415            DeltaOperation::FileSystemCheck {},
1416            HashMap::from([
1417                ("userName".to_owned(), json!("metadata-user")),
1418                ("readVersion".to_owned(), json!(11)),
1419                ("custom".to_owned(), json!("kept")),
1420            ]),
1421            vec![],
1422        );
1423
1424        let info = commit_info(&data);
1425        assert_eq!(info.user_name.as_deref(), Some("typed-user"));
1426        assert_eq!(info.read_version, Some(10));
1427        assert!(!info.info.contains_key("userName"));
1428        assert!(!info.info.contains_key("readVersion"));
1429        assert_eq!(info.info.get("custom"), Some(&json!("kept")));
1430    }
1431
1432    #[test]
1433    fn test_commit_data_strips_wrong_typed_reserved_metadata_without_clobbering_typed_fields() {
1434        let data = CommitData::new(
1435            vec![Action::CommitInfo(CommitInfo {
1436                user_name: Some("typed-user".to_owned()),
1437                read_version: Some(10),
1438                ..Default::default()
1439            })],
1440            DeltaOperation::FileSystemCheck {},
1441            HashMap::from([
1442                ("userName".to_owned(), json!(123)),
1443                ("readVersion".to_owned(), json!("abc")),
1444                ("isBlindAppend".to_owned(), json!("not-a-bool")),
1445                ("custom".to_owned(), json!("kept")),
1446            ]),
1447            vec![],
1448        );
1449
1450        let info = commit_info(&data);
1451        assert_eq!(info.user_name.as_deref(), Some("typed-user"));
1452        assert_eq!(info.read_version, Some(10));
1453        assert_eq!(info.info.get("custom"), Some(&json!("kept")));
1454        assert!(!info.info.contains_key("userName"));
1455        assert!(!info.info.contains_key("readVersion"));
1456        assert!(!info.info.contains_key("isBlindAppend"));
1457    }
1458}