Skip to main content

deltalake_core/kernel/transaction/
conflict_checker.rs

1//! Helper module to check if a transaction can be committed in case of conflicting commits.
2use std::collections::HashSet;
3
4use delta_kernel::table_properties::IsolationLevel;
5
6use super::CommitInfo;
7use crate::DeltaTableError;
8#[cfg(feature = "datafusion")]
9use crate::delta_datafusion::DataFusionMixins;
10use crate::errors::DeltaResult;
11use crate::kernel::{
12    Action, Add, LogDataHandler, Metadata, Protocol, Remove, Transaction, Version,
13};
14use crate::logstore::{LogStore, get_actions};
15use crate::protocol::DeltaOperation;
16use crate::table::config::TablePropertiesExt as _;
17
18#[cfg(feature = "datafusion")]
19use super::state::AddContainer;
20#[cfg(feature = "datafusion")]
21use datafusion::logical_expr::Expr;
22#[cfg(feature = "datafusion")]
23use itertools::Either;
24
25/// Exceptions raised during commit conflict resolution
26#[derive(thiserror::Error, Debug)]
27pub enum CommitConflictError {
28    /// This exception occurs when a concurrent operation adds files in the same partition
29    /// (or anywhere in an un-partitioned table) that your operation reads. The file additions
30    /// can be caused by INSERT, DELETE, UPDATE, or MERGE operations.
31    #[error(
32        "Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
33    )]
34    ConcurrentAppend,
35
36    /// This exception occurs when a concurrent operation deleted a file that your operation read.
37    /// Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files.
38    #[error(
39        "Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
40    )]
41    ConcurrentDeleteRead,
42
43    /// This exception occurs when a concurrent operation deleted a file that your operation also deletes.
44    /// This could be caused by two concurrent compaction operations rewriting the same files.
45    #[error(
46        "Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data."
47    )]
48    ConcurrentDeleteDelete,
49
50    /// This exception occurs when a concurrent transaction updates the metadata of a Delta table.
51    /// Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table.
52    #[error("Metadata changed since last commit.")]
53    MetadataChanged,
54
55    /// If a streaming query using the same checkpoint location is started multiple times concurrently
56    /// and tries to write to the Delta table at the same time. You should never have two streaming
57    /// queries use the same checkpoint location and run at the same time.
58    #[error("Concurrent transaction failed.")]
59    ConcurrentTransaction,
60
61    /// This exception can occur in the following cases:
62    /// - When your Delta table is upgraded to a new version. For future operations to succeed
63    ///   you may need to upgrade your Delta Lake version.
64    /// - When multiple writers are creating or replacing a table at the same time.
65    /// - When multiple writers are writing to an empty path at the same time.
66    #[error("Protocol changed since last commit: {0}")]
67    ProtocolChanged(String),
68
69    /// Error returned when the table requires an unsupported writer version
70    #[error("Delta-rs does not support writer version {0}")]
71    UnsupportedWriterVersion(i32),
72
73    /// Error returned when the table requires an unsupported writer version
74    #[error("Delta-rs does not support reader version {0}")]
75    UnsupportedReaderVersion(i32),
76
77    /// Error returned when the snapshot has missing or corrupted data
78    #[error("Snapshot is corrupted: {source}")]
79    CorruptedState {
80        /// Source error
81        source: Box<dyn std::error::Error + Send + Sync + 'static>,
82    },
83
84    /// Error returned when evaluating predicate
85    #[error("Error evaluating predicate: {source}")]
86    Predicate {
87        /// Source error
88        source: Box<dyn std::error::Error + Send + Sync + 'static>,
89    },
90
91    /// Error returned when no metadata was found in the DeltaTable.
92    #[error("No metadata found, please make sure table is loaded.")]
93    NoMetadata,
94}
95
96/// A struct representing different attributes of current transaction needed for conflict detection.
97#[allow(unused)]
98pub(crate) struct TransactionInfo<'a> {
99    txn_id: String,
100    /// partition predicates by which files have been queried by the transaction
101    ///
102    /// If any new data files or removed data files match this predicate, the
103    /// transaction should fail.
104    #[cfg(not(feature = "datafusion"))]
105    read_predicates: Option<String>,
106    /// partition predicates by which files have been queried by the transaction
107    #[cfg(feature = "datafusion")]
108    read_predicates: Option<Expr>,
109    /// appIds that have been seen by the transaction
110    read_app_ids: HashSet<String>,
111    /// delta log actions that the transaction wants to commit
112    actions: &'a [Action],
113    /// read [`DeltaTableState`] used for the transaction
114    read_snapshot: LogDataHandler<'a>,
115    /// Whether the transaction tainted the whole table
116    read_whole_table: bool,
117}
118
119impl<'a> TransactionInfo<'a> {
120    #[cfg(feature = "datafusion")]
121    pub fn try_new(
122        read_snapshot: LogDataHandler<'a>,
123        read_predicates: Option<String>,
124        actions: &'a [Action],
125        read_whole_table: bool,
126    ) -> DeltaResult<Self> {
127        use datafusion::prelude::SessionContext;
128
129        let session = SessionContext::new();
130        let read_predicates = read_predicates
131            .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
132            .transpose()?;
133
134        let mut read_app_ids = HashSet::<String>::new();
135        for action in actions.iter() {
136            if let Action::Txn(Transaction { app_id, .. }) = action {
137                read_app_ids.insert(app_id.clone());
138            }
139        }
140
141        Ok(Self::new(
142            read_snapshot,
143            read_predicates,
144            actions,
145            read_whole_table,
146        ))
147    }
148
149    #[cfg(feature = "datafusion")]
150    pub fn new(
151        read_snapshot: LogDataHandler<'a>,
152        read_predicates: Option<Expr>,
153        actions: &'a [Action],
154        read_whole_table: bool,
155    ) -> Self {
156        let mut read_app_ids = HashSet::<String>::new();
157        for action in actions.iter() {
158            if let Action::Txn(Transaction { app_id, .. }) = action {
159                read_app_ids.insert(app_id.clone());
160            }
161        }
162        Self {
163            txn_id: "".into(),
164            read_predicates,
165            read_app_ids,
166            actions,
167            read_snapshot,
168            read_whole_table,
169        }
170    }
171
172    #[cfg(not(feature = "datafusion"))]
173    pub fn try_new(
174        read_snapshot: LogDataHandler<'a>,
175        read_predicates: Option<String>,
176        actions: &'a Vec<Action>,
177        read_whole_table: bool,
178    ) -> DeltaResult<Self> {
179        let mut read_app_ids = HashSet::<String>::new();
180        for action in actions.iter() {
181            if let Action::Txn(Transaction { app_id, .. }) = action {
182                read_app_ids.insert(app_id.clone());
183            }
184        }
185        Ok(Self {
186            txn_id: "".into(),
187            read_predicates,
188            read_app_ids,
189            actions,
190            read_snapshot,
191            read_whole_table,
192        })
193    }
194
195    /// Whether the transaction changed the tables metadatas
196    pub fn metadata_changed(&self) -> bool {
197        self.actions
198            .iter()
199            .any(|a| matches!(a, Action::Metadata(_)))
200    }
201
202    #[cfg(feature = "datafusion")]
203    /// Files read by the transaction
204    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
205        use crate::delta_datafusion::files_matching_predicate;
206
207        if let Some(predicate) = &self.read_predicates {
208            Ok(Either::Left(
209                files_matching_predicate(
210                    self.read_snapshot.clone(),
211                    std::slice::from_ref(predicate),
212                )
213                .map_err(|err| CommitConflictError::Predicate {
214                    source: Box::new(err),
215                })?,
216            ))
217        } else {
218            Ok(Either::Right(self.read_snapshot.iter().map(|f| f.to_add())))
219        }
220    }
221
222    #[cfg(not(feature = "datafusion"))]
223    /// Files read by the transaction
224    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
225        Ok(self.read_snapshot.iter().map(|f| f.to_add()))
226    }
227
228    /// Whether the whole table was read during the transaction
229    pub fn read_whole_table(&self) -> bool {
230        self.read_whole_table
231    }
232}
233
234/// Summary of the Winning commit against which we want to check the conflict
235#[derive(Debug)]
236pub(crate) struct WinningCommitSummary {
237    pub actions: Vec<Action>,
238    pub commit_info: Option<CommitInfo>,
239}
240
241impl WinningCommitSummary {
242    pub async fn try_new(
243        log_store: &dyn LogStore,
244        read_version: Version,
245        winning_commit_version: Version,
246    ) -> DeltaResult<Self> {
247        // NOTE using assert, since a wrong version would right now mean a bug in our code.
248        assert_eq!(winning_commit_version, read_version + 1);
249
250        let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
251        match commit_log_bytes {
252            Some(bytes) => {
253                let actions = get_actions(winning_commit_version, &bytes)?; // ← ADD ? HERE
254                let commit_info = actions
255                    .iter()
256                    .find(|action| matches!(action, Action::CommitInfo(_)))
257                    .map(|action| match action {
258                        Action::CommitInfo(info) => info.clone(),
259                        _ => unreachable!(),
260                    });
261
262                Ok(Self {
263                    actions,
264                    commit_info,
265                })
266            }
267            None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
268        }
269    }
270
271    pub fn metadata_updates(&self) -> Vec<Metadata> {
272        self.actions
273            .iter()
274            .cloned()
275            .filter_map(|action| match action {
276                Action::Metadata(metadata) => Some(metadata),
277                _ => None,
278            })
279            .collect()
280    }
281
282    pub fn app_level_transactions(&self) -> HashSet<String> {
283        self.actions
284            .iter()
285            .cloned()
286            .filter_map(|action| match action {
287                Action::Txn(txn) => Some(txn.app_id),
288                _ => None,
289            })
290            .collect()
291    }
292
293    pub fn protocol(&self) -> Vec<Protocol> {
294        self.actions
295            .iter()
296            .cloned()
297            .filter_map(|action| match action {
298                Action::Protocol(protocol) => Some(protocol),
299                _ => None,
300            })
301            .collect()
302    }
303
304    pub fn removed_files(&self) -> Vec<Remove> {
305        self.actions
306            .iter()
307            .cloned()
308            .filter_map(|action| match action {
309                Action::Remove(remove) => Some(remove),
310                _ => None,
311            })
312            .collect()
313    }
314
315    pub fn added_files(&self) -> Vec<Add> {
316        self.actions
317            .iter()
318            .cloned()
319            .filter_map(|action| match action {
320                Action::Add(add) => Some(add),
321                _ => None,
322            })
323            .collect()
324    }
325
326    pub fn blind_append_added_files(&self) -> Vec<Add> {
327        if self.is_blind_append().unwrap_or(false) {
328            self.added_files()
329        } else {
330            vec![]
331        }
332    }
333
334    pub fn changed_data_added_files(&self) -> Vec<Add> {
335        if self.is_blind_append().unwrap_or(false) {
336            vec![]
337        } else {
338            self.added_files()
339        }
340    }
341
342    pub fn is_blind_append(&self) -> Option<bool> {
343        self.commit_info
344            .as_ref()
345            .map(|opt| opt.is_blind_append.unwrap_or(false))
346    }
347}
348
349/// Checks if a failed commit may be committed after a conflicting winning commit
350pub(crate) struct ConflictChecker<'a> {
351    /// transaction information for current transaction at start of check
352    txn_info: TransactionInfo<'a>,
353    /// Summary of the transaction, that has been committed ahead of the current transaction
354    winning_commit_summary: WinningCommitSummary,
355    /// Isolation level for the current transaction
356    isolation_level: IsolationLevel,
357}
358
359impl<'a> ConflictChecker<'a> {
360    pub fn new(
361        transaction_info: TransactionInfo<'a>,
362        winning_commit_summary: WinningCommitSummary,
363        operation: Option<&DeltaOperation>,
364    ) -> ConflictChecker<'a> {
365        let isolation_level = operation
366            .and_then(|op| {
367                if can_downgrade_to_snapshot_isolation(
368                    &winning_commit_summary.actions,
369                    op,
370                    &transaction_info
371                        .read_snapshot
372                        .table_properties()
373                        .isolation_level(),
374                ) {
375                    Some(IsolationLevel::SnapshotIsolation)
376                } else {
377                    None
378                }
379            })
380            .unwrap_or_else(|| {
381                transaction_info
382                    .read_snapshot
383                    .table_properties()
384                    .isolation_level()
385            });
386
387        Self {
388            txn_info: transaction_info,
389            winning_commit_summary,
390            isolation_level,
391        }
392    }
393
394    /// This function checks conflict of the `initial_current_transaction_info` against the
395    /// `winning_commit_version` and returns an updated [`TransactionInfo`] that represents
396    /// the transaction as if it had started while reading the `winning_commit_version`.
397    pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
398        self.check_protocol_compatibility()?;
399        self.check_no_metadata_updates()?;
400        self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
401        self.check_for_deleted_files_against_current_txn_read_files()?;
402        self.check_for_deleted_files_against_current_txn_deleted_files()?;
403        self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
404        Ok(())
405    }
406
407    /// Asserts that the client is up to date with the protocol and is allowed
408    /// to read and write against the protocol set by the committed transaction.
409    fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
410        for p in self.winning_commit_summary.protocol() {
411            let (win_read, curr_read) = (
412                p.min_reader_version(),
413                self.txn_info.read_snapshot.protocol().min_reader_version(),
414            );
415            let (win_write, curr_write) = (
416                p.min_writer_version(),
417                self.txn_info.read_snapshot.protocol().min_writer_version(),
418            );
419            if curr_read < win_read || win_write < curr_write {
420                return Err(CommitConflictError::ProtocolChanged(format!(
421                    "required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"
422                )));
423            };
424        }
425        if !self.winning_commit_summary.protocol().is_empty()
426            && self
427                .txn_info
428                .actions
429                .iter()
430                .any(|a| matches!(a, Action::Protocol(_)))
431        {
432            return Err(CommitConflictError::ProtocolChanged(
433                "protocol changed".into(),
434            ));
435        };
436        Ok(())
437    }
438
439    /// Check if the committed transaction has changed metadata.
440    fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
441        // Fail if the metadata is different than what the txn read.
442        if !self.winning_commit_summary.metadata_updates().is_empty() {
443            Err(CommitConflictError::MetadataChanged)
444        } else {
445            Ok(())
446        }
447    }
448
449    /// Check if the new files added by the already committed transactions
450    /// should have been read by the current transaction.
451    fn check_for_added_files_that_should_have_been_read_by_current_txn(
452        &self,
453    ) -> Result<(), CommitConflictError> {
454        // Skip check, if the operation can be downgraded to snapshot isolation
455        if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
456            return Ok(());
457        }
458
459        // Fail if new files have been added that the txn should have read.
460        let added_files_to_check = match self.isolation_level {
461            IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
462                // don't conflict with blind appends
463                self.winning_commit_summary.changed_data_added_files()
464            }
465            IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
466                let mut files = self.winning_commit_summary.changed_data_added_files();
467                files.extend(self.winning_commit_summary.blind_append_added_files());
468                files
469            }
470            IsolationLevel::SnapshotIsolation => vec![],
471        };
472
473        // Here we need to check if the current transaction would have read the
474        // added files. for this we need to be able to evaluate predicates. Err on the safe side is
475        // to assume all files match
476        cfg_if::cfg_if! {
477            if #[cfg(feature = "datafusion")] {
478                let added_files_matching_predicates = if let (Some(predicate), false) = (
479                    &self.txn_info.read_predicates,
480                    self.txn_info.read_whole_table(),
481                ) {
482                    let arrow_schema = self.txn_info.read_snapshot.read_schema();
483                    let partition_columns = self
484                        .txn_info
485                        .read_snapshot
486                        .metadata()
487                        .partition_columns()
488                        .to_vec();
489                    AddContainer::new(&added_files_to_check, &partition_columns, arrow_schema)
490                        .predicate_matches(predicate.clone())
491                        .map_err(|err| CommitConflictError::Predicate {
492                            source: Box::new(err),
493                        })?
494                        .cloned()
495                        .collect::<Vec<_>>()
496                } else if self.txn_info.read_whole_table() {
497                    added_files_to_check
498                } else {
499                    vec![]
500                };
501            } else {
502                let added_files_matching_predicates = if self.txn_info.read_whole_table()
503                {
504                    added_files_to_check
505                } else {
506                    vec![]
507                };
508            }
509        }
510
511        if !added_files_matching_predicates.is_empty() {
512            Err(CommitConflictError::ConcurrentAppend)
513        } else {
514            Ok(())
515        }
516    }
517
518    /// Check if [Remove] actions added by already committed transactions
519    /// conflicts with files read by the current transaction.
520    fn check_for_deleted_files_against_current_txn_read_files(
521        &self,
522    ) -> Result<(), CommitConflictError> {
523        // Fail if files have been deleted that the txn read.
524        let read_file_path: HashSet<String> = self
525            .txn_info
526            .read_files()?
527            .map(|f| f.path.clone())
528            .collect();
529
530        // Only consider removals with data_change = true as conflicts.
531        // Removals with data_change = false (e.g., from OPTIMIZE/compaction)
532        // don't change the logical data, only the physical layout, so they
533        // shouldn't conflict with concurrent read operations.
534        let removed_files_with_data_change: Vec<Remove> = self
535            .winning_commit_summary
536            .removed_files()
537            .into_iter()
538            .filter(|r| r.data_change)
539            .collect();
540
541        let deleted_read_overlap = removed_files_with_data_change
542            .iter()
543            .find(|f| read_file_path.contains(&f.path));
544
545        if deleted_read_overlap.is_some()
546            || (!removed_files_with_data_change.is_empty() && self.txn_info.read_whole_table())
547        {
548            Err(CommitConflictError::ConcurrentDeleteRead)
549        } else {
550            Ok(())
551        }
552    }
553
554    /// Check if [Remove] actions added by already committed transactions conflicts
555    /// with [Remove] actions this transaction is trying to add.
556    fn check_for_deleted_files_against_current_txn_deleted_files(
557        &self,
558    ) -> Result<(), CommitConflictError> {
559        // Fail if a file is deleted twice.
560        let txn_deleted_files: HashSet<String> = self
561            .txn_info
562            .actions
563            .iter()
564            .cloned()
565            .filter_map(|action| match action {
566                Action::Remove(remove) => Some(remove.path),
567                _ => None,
568            })
569            .collect();
570        let winning_deleted_files: HashSet<String> = self
571            .winning_commit_summary
572            .removed_files()
573            .iter()
574            .cloned()
575            .map(|r| r.path)
576            .collect();
577        let intersection: HashSet<&String> = txn_deleted_files
578            .intersection(&winning_deleted_files)
579            .collect();
580
581        if !intersection.is_empty() {
582            Err(CommitConflictError::ConcurrentDeleteDelete)
583        } else {
584            Ok(())
585        }
586    }
587
588    /// Checks if the winning transaction corresponds to some AppId on which
589    /// current transaction also depends.
590    fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
591        &self,
592    ) -> Result<(), CommitConflictError> {
593        // Fail if the appIds seen by the current transaction has been updated by the winning
594        // transaction i.e. the winning transaction have [Txn] corresponding to
595        // some appId on which current transaction depends on. Example - This can happen when
596        // multiple instances of the same streaming query are running at the same time.
597        let winning_txns = self.winning_commit_summary.app_level_transactions();
598        let txn_overlap: HashSet<&String> = winning_txns
599            .intersection(&self.txn_info.read_app_ids)
600            .collect();
601        if !txn_overlap.is_empty() {
602            Err(CommitConflictError::ConcurrentTransaction)
603        } else {
604            Ok(())
605        }
606    }
607}
608
609// implementation and comments adopted from
610// https://github.com/delta-io/delta/blob/1c18c1d972e37d314711b3a485e6fb7c98fce96d/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1268
611//
612// For no-data-change transactions such as OPTIMIZE/Auto Compaction/ZorderBY, we can
613// change the isolation level to SnapshotIsolation. SnapshotIsolation allows reduced conflict
614// detection by skipping the
615// [ConflictChecker::check_for_added_files_that_should_have_been_read_by_current_txn] check i.e.
616// don't worry about concurrent appends.
617//
618// We can also use SnapshotIsolation for empty transactions. e.g. consider a commit:
619// t0 - Initial state of table
620// t1 - Q1, Q2 starts
621// t2 - Q1 commits
622// t3 - Q2 is empty and wants to commit.
623// In this scenario, we can always allow Q2 to commit without worrying about new files
624// generated by Q1.
625//
626// The final order which satisfies both Serializability and WriteSerializability is: Q2, Q1
627// Note that Metadata only update transactions shouldn't be considered empty. If Q2 above has
628// a Metadata update (say schema change/identity column high watermark update), then Q2 can't
629// be moved above Q1 in the final SERIALIZABLE order. This is because if Q2 is moved above Q1,
630// then Q1 should see the updates from Q2 - which actually didn't happen.
631pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
632    actions: impl IntoIterator<Item = &'a Action>,
633    operation: &DeltaOperation,
634    isolation_level: &IsolationLevel,
635) -> bool {
636    let mut data_changed = false;
637    let mut has_non_file_actions = false;
638    for action in actions {
639        match action {
640            Action::Add(act) if act.data_change => data_changed = true,
641            Action::Remove(rem) if rem.data_change => data_changed = true,
642            _ => has_non_file_actions = true,
643        }
644    }
645
646    if has_non_file_actions {
647        // if Non-file-actions are present (e.g. METADATA etc.), then don't downgrade the isolation level.
648        return false;
649    }
650
651    match isolation_level {
652        IsolationLevel::Serializable => !data_changed,
653        IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
654        IsolationLevel::SnapshotIsolation => false, // this case should never happen, since spanpshot isolation cannot be configured on table
655    }
656}
657
658#[cfg(test)]
659#[allow(unused)]
660mod tests {
661    use std::collections::HashMap;
662
663    #[cfg(feature = "datafusion")]
664    use datafusion::logical_expr::{col, lit};
665    use serde_json::json;
666
667    use super::*;
668    use crate::kernel::Action;
669    use crate::test_utils::{ActionFactory, TestSchemas};
670
671    fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
672        ActionFactory::add(
673            TestSchemas::simple(),
674            HashMap::from_iter([("value", (min, max))]),
675            Default::default(),
676            true,
677        )
678    }
679
680    fn init_table_actions() -> Vec<Action> {
681        vec![
682            ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
683            ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
684        ]
685    }
686
687    #[test]
688    fn test_can_downgrade_to_snapshot_isolation() {
689        let isolation = IsolationLevel::WriteSerializable;
690        let operation = DeltaOperation::Optimize {
691            predicate: None,
692            target_size: 0,
693        };
694        let add =
695            ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
696        let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
697        assert!(!res)
698    }
699
700    // Check whether the test transaction conflict with the concurrent writes by executing the
701    // given params in the following order:
702    // - setup (including setting table isolation level
703    // - reads
704    // - concurrentWrites
705    // - actions
706    #[cfg(feature = "datafusion")]
707    async fn execute_test(
708        setup: Option<Vec<Action>>,
709        reads: Option<Expr>,
710        concurrent: Vec<Action>,
711        actions: Vec<Action>,
712        read_whole_table: bool,
713    ) -> Result<(), CommitConflictError> {
714        use crate::table::state::DeltaTableState;
715        use object_store::path::Path;
716
717        let setup_actions = setup.unwrap_or_else(init_table_actions);
718        let state = DeltaTableState::from_actions(setup_actions).await.unwrap();
719        let snapshot = state.snapshot();
720        let transaction_info =
721            TransactionInfo::new(snapshot.log_data(), reads, &actions, read_whole_table);
722        let summary = WinningCommitSummary {
723            actions: concurrent,
724            commit_info: None,
725        };
726        let checker = ConflictChecker::new(transaction_info, summary, None);
727        checker.check_conflicts()
728    }
729
730    // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94
731    #[tokio::test]
732    #[cfg(feature = "datafusion")]
733    async fn test_concurrent_append_append() {
734        // append file to table while a concurrent writer also appends a file
735        let file1 = simple_add(true, "1", "10").into();
736        let file2 = simple_add(true, "1", "10").into();
737
738        let result = execute_test(None, None, vec![file1], vec![file2], false).await;
739        assert!(result.is_ok());
740    }
741
742    #[tokio::test]
743    #[cfg(feature = "datafusion")]
744    async fn test_disjoint_delete_read() {
745        // the concurrent transaction deletes a file that the current transaction did NOT read
746        let file_not_read = simple_add(true, "1", "10");
747        let file_read = simple_add(true, "100", "10000").into();
748        let mut setup_actions = init_table_actions();
749        setup_actions.push(file_not_read.clone().into());
750        setup_actions.push(file_read);
751        let result = execute_test(
752            Some(setup_actions),
753            Some(col("value").gt(lit::<i32>(10))),
754            vec![ActionFactory::remove(&file_not_read, true).into()],
755            vec![],
756            false,
757        )
758        .await;
759        assert!(result.is_ok());
760    }
761
762    #[tokio::test]
763    #[cfg(feature = "datafusion")]
764    async fn test_disjoint_add_read() {
765        // concurrently add file, that the current transaction would not have read
766        let file_added = simple_add(true, "1", "10").into();
767        let file_read = simple_add(true, "100", "10000").into();
768        let mut setup_actions = init_table_actions();
769        setup_actions.push(file_read);
770        let result = execute_test(
771            Some(setup_actions),
772            Some(col("value").gt(lit::<i32>(10))),
773            vec![file_added],
774            vec![],
775            false,
776        )
777        .await;
778        assert!(result.is_ok());
779    }
780
781    #[tokio::test]
782    #[cfg(feature = "datafusion")]
783    async fn test_concurrent_delete_delete() {
784        // remove file from table that has previously been removed
785        let removed_file = simple_add(true, "1", "10");
786        let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
787        let result = execute_test(
788            None,
789            None,
790            vec![removed_file.clone()],
791            vec![removed_file],
792            false,
793        )
794        .await;
795        assert!(matches!(
796            result,
797            Err(CommitConflictError::ConcurrentDeleteDelete)
798        ));
799    }
800
801    #[tokio::test]
802    #[cfg(feature = "datafusion")]
803    async fn test_concurrent_add_conflicts_with_read_and_write() {
804        // a file is concurrently added that should have been read by the current transaction
805        let file_added = simple_add(true, "1", "10").into();
806        let file_should_have_read = simple_add(true, "1", "10").into();
807        let result = execute_test(
808            None,
809            Some(col("value").lt_eq(lit::<i32>(10))),
810            vec![file_should_have_read],
811            vec![file_added],
812            false,
813        )
814        .await;
815        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
816    }
817
818    #[tokio::test]
819    #[cfg(feature = "datafusion")]
820    async fn test_concurrent_delete_conflicts_with_read() {
821        // transaction reads a file that is removed by concurrent transaction
822        let file_read = simple_add(true, "1", "10");
823        let mut setup_actions = init_table_actions();
824        setup_actions.push(file_read.clone().into());
825        let result = execute_test(
826            Some(setup_actions),
827            Some(col("value").lt_eq(lit::<i32>(10))),
828            vec![ActionFactory::remove(&file_read, true).into()],
829            vec![],
830            false,
831        )
832        .await;
833        assert!(matches!(
834            result,
835            Err(CommitConflictError::ConcurrentDeleteRead)
836        ));
837    }
838
839    #[tokio::test]
840    #[cfg(feature = "datafusion")]
841    async fn test_concurrent_metadata_change() {
842        // concurrent transactions changes table metadata
843        let result = execute_test(
844            None,
845            None,
846            vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
847            vec![],
848            false,
849        )
850        .await;
851        assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
852    }
853
854    #[tokio::test]
855    #[cfg(feature = "datafusion")]
856    async fn test_concurrent_protocol_upgrade() {
857        // current and concurrent transactions change the protocol version
858        let result = execute_test(
859            None,
860            None,
861            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
862            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
863            false,
864        )
865        .await;
866        assert!(matches!(
867            result,
868            Err(CommitConflictError::ProtocolChanged(_))
869        ));
870    }
871
872    #[tokio::test]
873    #[cfg(feature = "datafusion")]
874    async fn test_read_whole_table_disallows_concurrent_append() {
875        // `read_whole_table` should disallow any concurrent change, even if the change
876        // is disjoint with the earlier filter
877        let file_part1 = simple_add(true, "1", "10").into();
878        let file_part2 = simple_add(true, "11", "100").into();
879        let file_part3 = simple_add(true, "101", "1000").into();
880        let mut setup_actions = init_table_actions();
881        setup_actions.push(file_part1);
882        let result = execute_test(
883            Some(setup_actions),
884            // filter matches neither existing nor added files
885            Some(col("value").lt(lit::<i32>(0))),
886            vec![file_part2],
887            vec![file_part3],
888            true,
889        )
890        .await;
891        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
892    }
893
894    #[tokio::test]
895    #[cfg(feature = "datafusion")]
896    async fn test_read_whole_table_disallows_concurrent_remove() {
897        // `read_whole_table` should disallow any concurrent remove actions
898        let file_part1 = simple_add(true, "1", "10");
899        let file_part2 = simple_add(true, "11", "100").into();
900        let mut setup_actions = init_table_actions();
901        setup_actions.push(file_part1.clone().into());
902        let result = execute_test(
903            Some(setup_actions),
904            None,
905            vec![ActionFactory::remove(&file_part1, true).into()],
906            vec![file_part2],
907            true,
908        )
909        .await;
910        assert!(matches!(
911            result,
912            Err(CommitConflictError::ConcurrentDeleteRead)
913        ));
914    }
915
916    #[tokio::test]
917    #[cfg(feature = "datafusion")]
918    async fn test_compaction_remove_does_not_conflict_with_read() {
919        // Concurrent compaction (data_change = false) should NOT conflict with reads
920        // A file is removed by a compaction operation (data_change = false) while being read
921        let file_read = simple_add(true, "1", "10");
922        let mut setup_actions = init_table_actions();
923        setup_actions.push(file_read.clone().into());
924
925        // Create a remove action with data_change = false (simulating OPTIMIZE/compaction)
926        let mut compaction_remove = ActionFactory::remove(&file_read, false);
927        compaction_remove.data_change = false;
928
929        let result = execute_test(
930            Some(setup_actions),
931            Some(col("value").lt_eq(lit::<i32>(10))),
932            vec![compaction_remove.into()],
933            vec![],
934            false,
935        )
936        .await;
937        // Should succeed because data_change = false means it's just a physical reorganization
938        assert!(
939            result.is_ok(),
940            "Compaction with data_change=false should not conflict with reads"
941        );
942    }
943
944    #[tokio::test]
945    #[cfg(feature = "datafusion")]
946    async fn test_data_delete_conflicts_with_read() {
947        // Delete with data_change = true should still conflict with reads
948        let file_read = simple_add(true, "1", "10");
949        let mut setup_actions = init_table_actions();
950        setup_actions.push(file_read.clone().into());
951
952        let result = execute_test(
953            Some(setup_actions),
954            Some(col("value").lt_eq(lit::<i32>(10))),
955            vec![ActionFactory::remove(&file_read, true).into()],
956            vec![],
957            false,
958        )
959        .await;
960        // Should fail because data_change = true means logical data was removed
961        assert!(
962            matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
963            "Delete with data_change=true should conflict with reads"
964        );
965    }
966
967    #[tokio::test]
968    #[cfg(feature = "datafusion")]
969    async fn test_compaction_does_not_conflict_with_whole_table_read() {
970        // Concurrent compaction with read_whole_table and data_change = false
971        let file_part1 = simple_add(true, "1", "10");
972        let file_part2 = simple_add(true, "11", "100").into();
973        let mut setup_actions = init_table_actions();
974        setup_actions.push(file_part1.clone().into());
975
976        let compaction_remove = ActionFactory::remove(&file_part1, false);
977
978        let result = execute_test(
979            Some(setup_actions),
980            None,
981            vec![compaction_remove.into()],
982            vec![file_part2],
983            true, // read_whole_table
984        )
985        .await;
986        // Should succeed because data_change = false, even with read_whole_table
987        assert!(
988            result.is_ok(),
989            "Compaction with data_change=false should not conflict even with read_whole_table"
990        );
991    }
992
993    #[tokio::test]
994    #[cfg(feature = "datafusion")]
995    async fn test_multiple_compaction_removes_do_not_conflict() {
996        // Multiple files removed with data_change = false
997        let file1 = simple_add(true, "1", "10");
998        let file2 = simple_add(true, "11", "20");
999        let mut setup_actions = init_table_actions();
1000        setup_actions.push(file1.clone().into());
1001        setup_actions.push(file2.clone().into());
1002
1003        let mut remove1 = ActionFactory::remove(&file1, false);
1004        remove1.data_change = false;
1005        let mut remove2 = ActionFactory::remove(&file2, false);
1006        remove2.data_change = false;
1007
1008        let result = execute_test(
1009            Some(setup_actions),
1010            Some(col("value").lt_eq(lit::<i32>(20))),
1011            vec![remove1.into(), remove2.into()],
1012            vec![],
1013            false,
1014        )
1015        .await;
1016        // Should succeed because both removes have data_change = false
1017        assert!(
1018            result.is_ok(),
1019            "Multiple compaction removes with data_change=false should not conflict"
1020        );
1021    }
1022
1023    #[tokio::test]
1024    #[cfg(feature = "datafusion")]
1025    async fn test_mixed_removes_conflict_if_any_data_change() {
1026        // Mixed removes - one with data_change = false, one with data_change = true
1027        let file1 = simple_add(true, "1", "10");
1028        let file2 = simple_add(true, "11", "20");
1029        let mut setup_actions = init_table_actions();
1030        setup_actions.push(file1.clone().into());
1031        setup_actions.push(file2.clone().into());
1032
1033        let mut compaction_remove = ActionFactory::remove(&file1, false);
1034        compaction_remove.data_change = false;
1035        let data_remove = ActionFactory::remove(&file2, true); // data_change = true
1036
1037        let result = execute_test(
1038            Some(setup_actions),
1039            Some(col("value").lt_eq(lit::<i32>(20))),
1040            vec![compaction_remove.into(), data_remove.into()],
1041            vec![],
1042            false,
1043        )
1044        .await;
1045        // Should fail because one of the removes has data_change = true
1046        assert!(
1047            matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
1048            "Mixed removes should conflict if any have data_change=true"
1049        );
1050    }
1051
1052    #[tokio::test]
1053    #[cfg(feature = "datafusion")]
1054    async fn test_concurrent_compaction_double_delete_still_conflicts() {
1055        // Concurrent double delete with data_change = false
1056        // Both transactions try to remove the same file with data_change = false
1057        let removed_file = simple_add(true, "1", "10");
1058        let mut setup_actions = init_table_actions();
1059        setup_actions.push(removed_file.clone().into());
1060
1061        let mut remove1 = ActionFactory::remove(&removed_file, false);
1062        remove1.data_change = false;
1063        let mut remove2 = ActionFactory::remove(&removed_file, false);
1064        remove2.data_change = false;
1065
1066        let result = execute_test(
1067            Some(setup_actions),
1068            None,
1069            vec![remove1.into()],
1070            vec![remove2.into()],
1071            false,
1072        )
1073        .await;
1074        // Should still fail - even with data_change = false, can't delete the same file twice
1075        assert!(
1076            matches!(result, Err(CommitConflictError::ConcurrentDeleteDelete)),
1077            "Concurrent double delete should conflict even with data_change=false"
1078        );
1079    }
1080
1081    #[tokio::test]
1082    #[cfg(feature = "datafusion")]
1083    async fn test_disjoint_partitions_add_and_write() {
1084        // Test for: "add in part=2 / read from part=1,2 and write to part=1"
1085        // Transaction reads from partitions 1 and 2, concurrent txn adds to partition 2,
1086        // and current txn writes to partition 1.
1087        // This should succeed because the write is disjoint from the concurrent add.
1088
1089        let file_part1_existing = simple_add(true, "1", "10");
1090        let file_part2_added = simple_add(true, "100", "200").into();
1091        let file_part1_new = simple_add(true, "5", "15").into();
1092
1093        let mut setup_actions = init_table_actions();
1094        setup_actions.push(file_part1_existing.into());
1095
1096        // Read from both partitions (value <= 200 covers both ranges)
1097        // Concurrent adds file with value 100-200
1098        // Current transaction adds file with value 5-15
1099        let result = execute_test(
1100            Some(setup_actions),
1101            Some(col("value").lt_eq(lit::<i32>(200))),
1102            vec![file_part2_added],
1103            vec![file_part1_new],
1104            false,
1105        )
1106        .await;
1107
1108        // This should fail with ConcurrentAppend because the predicate matches the added file
1109        assert!(
1110            matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1111            "Adding file matching read predicate should conflict"
1112        );
1113    }
1114
1115    #[tokio::test]
1116    #[cfg(feature = "datafusion")]
1117    async fn test_disjoint_partitions_read_write_different_ranges() {
1118        // Test disjoint partitions: read from one range, concurrent write to different range
1119        let file_part1 = simple_add(true, "1", "10");
1120        let file_part2_added = simple_add(true, "100", "200").into();
1121        let file_part1_new = simple_add(true, "5", "15").into();
1122
1123        let mut setup_actions = init_table_actions();
1124        setup_actions.push(file_part1.into());
1125
1126        // Read only from partition 1 (value <= 50)
1127        // Concurrent adds to partition 2 (value 100-200)
1128        // Current transaction adds to partition 1
1129        let result = execute_test(
1130            Some(setup_actions),
1131            Some(col("value").lt_eq(lit::<i32>(50))),
1132            vec![file_part2_added],
1133            vec![file_part1_new],
1134            false,
1135        )
1136        .await;
1137
1138        // This should succeed because the concurrent add is outside the read predicate
1139        assert!(
1140            result.is_ok(),
1141            "Disjoint partition writes should not conflict"
1142        );
1143    }
1144
1145    #[tokio::test]
1146    #[cfg(feature = "datafusion")]
1147    async fn test_conflicting_app_transactions() {
1148        // Test for conflicting application transactions (e.g., duplicate streaming queries)
1149        let file1 = simple_add(true, "1", "10").into();
1150
1151        let app_id = "streaming_query_1".to_string();
1152        let txn_action = Action::Txn(Transaction {
1153            app_id: app_id.clone(),
1154            version: 1,
1155            last_updated: None,
1156        });
1157
1158        // Current transaction depends on app_id
1159        let current_actions = vec![txn_action.clone()];
1160
1161        // Concurrent transaction also updates the same app_id
1162        let concurrent_actions = vec![txn_action, file1];
1163
1164        let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1165
1166        // Should fail because both transactions use the same app_id
1167        assert!(
1168            matches!(result, Err(CommitConflictError::ConcurrentTransaction)),
1169            "Conflicting app transactions should fail"
1170        );
1171    }
1172
1173    #[tokio::test]
1174    #[cfg(feature = "datafusion")]
1175    async fn test_non_conflicting_different_app_transactions() {
1176        // Test non-conflicting application transactions with different app_ids
1177        let file1 = simple_add(true, "1", "10").into();
1178
1179        let app_id1 = "streaming_query_1".to_string();
1180        let app_id2 = "streaming_query_2".to_string();
1181
1182        let txn_action1 = Action::Txn(Transaction {
1183            app_id: app_id1,
1184            version: 1,
1185            last_updated: None,
1186        });
1187
1188        let txn_action2 = Action::Txn(Transaction {
1189            app_id: app_id2,
1190            version: 1,
1191            last_updated: None,
1192        });
1193
1194        // Current transaction depends on app_id1
1195        let current_actions = vec![txn_action1];
1196
1197        // Concurrent transaction updates app_id2
1198        let concurrent_actions = vec![txn_action2, file1];
1199
1200        let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1201
1202        // Should succeed because different app_ids don't conflict
1203        assert!(
1204            result.is_ok(),
1205            "Non-conflicting app transactions should succeed"
1206        );
1207    }
1208
1209    #[tokio::test]
1210    #[cfg(feature = "datafusion")]
1211    async fn test_replace_where_initial_empty_conflicts_on_concurrent_add() {
1212        // Empty predicate read, concurrent add within predicate, then current write -> conflicts
1213        let mut setup_actions = init_table_actions();
1214        setup_actions.push(simple_add(true, "1", "1").into());
1215
1216        let result = execute_test(
1217            Some(setup_actions),
1218            Some(col("value").gt_eq(lit::<i32>(2))), // no files read
1219            vec![simple_add(true, "3", "3").into()], // concurrent add matches predicate
1220            vec![simple_add(true, "2", "2").into()],
1221            false,
1222        )
1223        .await;
1224
1225        assert!(
1226            matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1227            "ReplaceWhere-style empty read should conflict when a matching row is concurrently added"
1228        );
1229    }
1230
1231    #[tokio::test]
1232    #[cfg(feature = "datafusion")]
1233    async fn test_replace_where_disjoint_empty_allows_commit() {
1234        // Empty predicate read, concurrent add outside predicate, then current write -> allowed
1235        let mut setup_actions = init_table_actions();
1236        setup_actions.push(simple_add(true, "1", "1").into());
1237
1238        let result = execute_test(
1239            Some(setup_actions),
1240            Some(
1241                col("value")
1242                    .gt(lit::<i32>(1))
1243                    .and(col("value").lt_eq(lit::<i32>(3))),
1244            ), // empty read
1245            vec![simple_add(true, "5", "5").into()], // disjoint from read predicate
1246            vec![simple_add(true, "2", "2").into()],
1247            false,
1248        )
1249        .await;
1250
1251        assert!(
1252            result.is_ok(),
1253            "Disjoint replaceWhere-style transactions with empty reads should succeed"
1254        );
1255    }
1256}