deltalake_core/kernel/transaction/
conflict_checker.rs

1//! Helper module to check if a transaction can be committed in case of conflicting commits.
2use std::collections::HashSet;
3
4use super::CommitInfo;
5#[cfg(feature = "datafusion")]
6use crate::delta_datafusion::DataFusionMixins;
7use crate::errors::DeltaResult;
8use crate::kernel::EagerSnapshot;
9use crate::kernel::Transaction;
10use crate::kernel::{Action, Add, Metadata, Protocol, Remove};
11use crate::logstore::{get_actions, LogStore};
12use crate::protocol::DeltaOperation;
13use crate::table::config::IsolationLevel;
14use crate::DeltaTableError;
15
16#[cfg(feature = "datafusion")]
17use super::state::AddContainer;
18#[cfg(feature = "datafusion")]
19use datafusion_expr::Expr;
20#[cfg(feature = "datafusion")]
21use itertools::Either;
22
23/// Exceptions raised during commit conflict resolution
24#[derive(thiserror::Error, Debug)]
25pub enum CommitConflictError {
26    /// This exception occurs when a concurrent operation adds files in the same partition
27    /// (or anywhere in an un-partitioned table) that your operation reads. The file additions
28    /// can be caused by INSERT, DELETE, UPDATE, or MERGE operations.
29    #[error("Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")]
30    ConcurrentAppend,
31
32    /// This exception occurs when a concurrent operation deleted a file that your operation read.
33    /// Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files.
34    #[error("Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")]
35    ConcurrentDeleteRead,
36
37    /// This exception occurs when a concurrent operation deleted a file that your operation also deletes.
38    /// This could be caused by two concurrent compaction operations rewriting the same files.
39    #[error("Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data.")]
40    ConcurrentDeleteDelete,
41
42    /// This exception occurs when a concurrent transaction updates the metadata of a Delta table.
43    /// Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table.
44    #[error("Metadata changed since last commit.")]
45    MetadataChanged,
46
47    /// If a streaming query using the same checkpoint location is started multiple times concurrently
48    /// and tries to write to the Delta table at the same time. You should never have two streaming
49    /// queries use the same checkpoint location and run at the same time.
50    #[error("Concurrent transaction failed.")]
51    ConcurrentTransaction,
52
53    /// This exception can occur in the following cases:
54    /// - When your Delta table is upgraded to a new version. For future operations to succeed
55    ///   you may need to upgrade your Delta Lake version.
56    /// - When multiple writers are creating or replacing a table at the same time.
57    /// - When multiple writers are writing to an empty path at the same time.
58    #[error("Protocol changed since last commit: {0}")]
59    ProtocolChanged(String),
60
61    /// Error returned when the table requires an unsupported writer version
62    #[error("Delta-rs does not support writer version {0}")]
63    UnsupportedWriterVersion(i32),
64
65    /// Error returned when the table requires an unsupported writer version
66    #[error("Delta-rs does not support reader version {0}")]
67    UnsupportedReaderVersion(i32),
68
69    /// Error returned when the snapshot has missing or corrupted data
70    #[error("Snapshot is corrupted: {source}")]
71    CorruptedState {
72        /// Source error
73        source: Box<dyn std::error::Error + Send + Sync + 'static>,
74    },
75
76    /// Error returned when evaluating predicate
77    #[error("Error evaluating predicate: {source}")]
78    Predicate {
79        /// Source error
80        source: Box<dyn std::error::Error + Send + Sync + 'static>,
81    },
82
83    /// Error returned when no metadata was found in the DeltaTable.
84    #[error("No metadata found, please make sure table is loaded.")]
85    NoMetadata,
86}
87
88/// A struct representing different attributes of current transaction needed for conflict detection.
89#[allow(unused)]
90pub(crate) struct TransactionInfo<'a> {
91    txn_id: String,
92    /// partition predicates by which files have been queried by the transaction
93    ///
94    /// If any new data files or removed data files match this predicate, the
95    /// transaction should fail.
96    #[cfg(not(feature = "datafusion"))]
97    read_predicates: Option<String>,
98    /// partition predicates by which files have been queried by the transaction
99    #[cfg(feature = "datafusion")]
100    read_predicates: Option<Expr>,
101    /// appIds that have been seen by the transaction
102    pub(crate) read_app_ids: HashSet<String>,
103    /// delta log actions that the transaction wants to commit
104    actions: &'a [Action],
105    /// read [`DeltaTableState`] used for the transaction
106    pub(crate) read_snapshot: &'a EagerSnapshot,
107    /// Whether the transaction tainted the whole table
108    read_whole_table: bool,
109}
110
111impl<'a> TransactionInfo<'a> {
112    #[cfg(feature = "datafusion")]
113    pub fn try_new(
114        read_snapshot: &'a EagerSnapshot,
115        read_predicates: Option<String>,
116        actions: &'a [Action],
117        read_whole_table: bool,
118    ) -> DeltaResult<Self> {
119        use datafusion::prelude::SessionContext;
120
121        let session = SessionContext::new();
122        let read_predicates = read_predicates
123            .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
124            .transpose()?;
125
126        let mut read_app_ids = HashSet::<String>::new();
127        for action in actions.iter() {
128            if let Action::Txn(Transaction { app_id, .. }) = action {
129                read_app_ids.insert(app_id.clone());
130            }
131        }
132
133        Ok(Self {
134            txn_id: "".into(),
135            read_predicates,
136            read_app_ids,
137            actions,
138            read_snapshot,
139            read_whole_table,
140        })
141    }
142
143    #[cfg(feature = "datafusion")]
144    #[allow(unused)]
145    pub fn new(
146        read_snapshot: &'a EagerSnapshot,
147        read_predicates: Option<Expr>,
148        actions: &'a Vec<Action>,
149        read_whole_table: bool,
150    ) -> Self {
151        let mut read_app_ids = HashSet::<String>::new();
152        for action in actions.iter() {
153            if let Action::Txn(Transaction { app_id, .. }) = action {
154                read_app_ids.insert(app_id.clone());
155            }
156        }
157        Self {
158            txn_id: "".into(),
159            read_predicates,
160            read_app_ids,
161            actions,
162            read_snapshot,
163            read_whole_table,
164        }
165    }
166
167    #[cfg(not(feature = "datafusion"))]
168    pub fn try_new(
169        read_snapshot: &'a EagerSnapshot,
170        read_predicates: Option<String>,
171        actions: &'a Vec<Action>,
172        read_whole_table: bool,
173    ) -> DeltaResult<Self> {
174        let mut read_app_ids = HashSet::<String>::new();
175        for action in actions.iter() {
176            if let Action::Txn(Transaction { app_id, .. }) = action {
177                read_app_ids.insert(app_id.clone());
178            }
179        }
180        Ok(Self {
181            txn_id: "".into(),
182            read_predicates,
183            read_app_ids,
184            actions,
185            read_snapshot,
186            read_whole_table,
187        })
188    }
189
190    /// Whether the transaction changed the tables metadatas
191    pub fn metadata_changed(&self) -> bool {
192        self.actions
193            .iter()
194            .any(|a| matches!(a, Action::Metadata(_)))
195    }
196
197    #[cfg(feature = "datafusion")]
198    /// Files read by the transaction
199    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
200        use crate::delta_datafusion::files_matching_predicate;
201
202        if let Some(predicate) = &self.read_predicates {
203            Ok(Either::Left(
204                files_matching_predicate(self.read_snapshot, &[predicate.clone()]).map_err(
205                    |err| CommitConflictError::Predicate {
206                        source: Box::new(err),
207                    },
208                )?,
209            ))
210        } else {
211            Ok(Either::Right(std::iter::empty()))
212        }
213    }
214
215    #[cfg(not(feature = "datafusion"))]
216    /// Files read by the transaction
217    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
218        Ok(self.read_snapshot.file_actions().unwrap())
219    }
220
221    /// Whether the whole table was read during the transaction
222    pub fn read_whole_table(&self) -> bool {
223        self.read_whole_table
224    }
225}
226
227/// Summary of the Winning commit against which we want to check the conflict
228#[derive(Debug)]
229pub(crate) struct WinningCommitSummary {
230    pub actions: Vec<Action>,
231    pub commit_info: Option<CommitInfo>,
232}
233
234impl WinningCommitSummary {
235    pub async fn try_new(
236        log_store: &dyn LogStore,
237        read_version: i64,
238        winning_commit_version: i64,
239    ) -> DeltaResult<Self> {
240        // NOTE using assert, since a wrong version would right now mean a bug in our code.
241        assert_eq!(winning_commit_version, read_version + 1);
242
243        let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
244        match commit_log_bytes {
245            Some(bytes) => {
246                let actions = get_actions(winning_commit_version, bytes).await?;
247                let commit_info = actions
248                    .iter()
249                    .find(|action| matches!(action, Action::CommitInfo(_)))
250                    .map(|action| match action {
251                        Action::CommitInfo(info) => info.clone(),
252                        _ => unreachable!(),
253                    });
254
255                Ok(Self {
256                    actions,
257                    commit_info,
258                })
259            }
260            None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
261        }
262    }
263
264    pub fn metadata_updates(&self) -> Vec<Metadata> {
265        self.actions
266            .iter()
267            .cloned()
268            .filter_map(|action| match action {
269                Action::Metadata(metadata) => Some(metadata),
270                _ => None,
271            })
272            .collect()
273    }
274
275    pub fn app_level_transactions(&self) -> HashSet<String> {
276        self.actions
277            .iter()
278            .cloned()
279            .filter_map(|action| match action {
280                Action::Txn(txn) => Some(txn.app_id),
281                _ => None,
282            })
283            .collect()
284    }
285
286    pub fn protocol(&self) -> Vec<Protocol> {
287        self.actions
288            .iter()
289            .cloned()
290            .filter_map(|action| match action {
291                Action::Protocol(protocol) => Some(protocol),
292                _ => None,
293            })
294            .collect()
295    }
296
297    pub fn removed_files(&self) -> Vec<Remove> {
298        self.actions
299            .iter()
300            .cloned()
301            .filter_map(|action| match action {
302                Action::Remove(remove) => Some(remove),
303                _ => None,
304            })
305            .collect()
306    }
307
308    pub fn added_files(&self) -> Vec<Add> {
309        self.actions
310            .iter()
311            .cloned()
312            .filter_map(|action| match action {
313                Action::Add(add) => Some(add),
314                _ => None,
315            })
316            .collect()
317    }
318
319    pub fn blind_append_added_files(&self) -> Vec<Add> {
320        if self.is_blind_append().unwrap_or(false) {
321            self.added_files()
322        } else {
323            vec![]
324        }
325    }
326
327    pub fn changed_data_added_files(&self) -> Vec<Add> {
328        if self.is_blind_append().unwrap_or(false) {
329            vec![]
330        } else {
331            self.added_files()
332        }
333    }
334
335    pub fn is_blind_append(&self) -> Option<bool> {
336        self.commit_info
337            .as_ref()
338            .map(|opt| opt.is_blind_append.unwrap_or(false))
339    }
340}
341
342/// Checks if a failed commit may be committed after a conflicting winning commit
343pub(crate) struct ConflictChecker<'a> {
344    /// transaction information for current transaction at start of check
345    txn_info: TransactionInfo<'a>,
346    /// Summary of the transaction, that has been committed ahead of the current transaction
347    winning_commit_summary: WinningCommitSummary,
348    /// Isolation level for the current transaction
349    isolation_level: IsolationLevel,
350}
351
352impl<'a> ConflictChecker<'a> {
353    pub fn new(
354        transaction_info: TransactionInfo<'a>,
355        winning_commit_summary: WinningCommitSummary,
356        operation: Option<&DeltaOperation>,
357    ) -> ConflictChecker<'a> {
358        let isolation_level = operation
359            .and_then(|op| {
360                if can_downgrade_to_snapshot_isolation(
361                    &winning_commit_summary.actions,
362                    op,
363                    &transaction_info
364                        .read_snapshot
365                        .table_config()
366                        .isolation_level(),
367                ) {
368                    Some(IsolationLevel::SnapshotIsolation)
369                } else {
370                    None
371                }
372            })
373            .unwrap_or_else(|| {
374                transaction_info
375                    .read_snapshot
376                    .table_config()
377                    .isolation_level()
378            });
379
380        Self {
381            txn_info: transaction_info,
382            winning_commit_summary,
383            isolation_level,
384        }
385    }
386
387    /// This function checks conflict of the `initial_current_transaction_info` against the
388    /// `winning_commit_version` and returns an updated [`TransactionInfo`] that represents
389    /// the transaction as if it had started while reading the `winning_commit_version`.
390    pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
391        self.check_protocol_compatibility()?;
392        self.check_no_metadata_updates()?;
393        self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
394        self.check_for_deleted_files_against_current_txn_read_files()?;
395        self.check_for_deleted_files_against_current_txn_deleted_files()?;
396        self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
397        Ok(())
398    }
399
400    /// Asserts that the client is up to date with the protocol and is allowed
401    /// to read and write against the protocol set by the committed transaction.
402    fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
403        for p in self.winning_commit_summary.protocol() {
404            let (win_read, curr_read) = (
405                p.min_reader_version,
406                self.txn_info.read_snapshot.protocol().min_reader_version,
407            );
408            let (win_write, curr_write) = (
409                p.min_writer_version,
410                self.txn_info.read_snapshot.protocol().min_writer_version,
411            );
412            if curr_read < win_read || win_write < curr_write {
413                return Err(CommitConflictError::ProtocolChanged(
414                    format!("required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"),
415                ));
416            };
417        }
418        if !self.winning_commit_summary.protocol().is_empty()
419            && self
420                .txn_info
421                .actions
422                .iter()
423                .any(|a| matches!(a, Action::Protocol(_)))
424        {
425            return Err(CommitConflictError::ProtocolChanged(
426                "protocol changed".into(),
427            ));
428        };
429        Ok(())
430    }
431
432    /// Check if the committed transaction has changed metadata.
433    fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
434        // Fail if the metadata is different than what the txn read.
435        if !self.winning_commit_summary.metadata_updates().is_empty() {
436            Err(CommitConflictError::MetadataChanged)
437        } else {
438            Ok(())
439        }
440    }
441
442    /// Check if the new files added by the already committed transactions
443    /// should have been read by the current transaction.
444    fn check_for_added_files_that_should_have_been_read_by_current_txn(
445        &self,
446    ) -> Result<(), CommitConflictError> {
447        // Skip check, if the operation can be downgraded to snapshot isolation
448        if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
449            return Ok(());
450        }
451
452        // Fail if new files have been added that the txn should have read.
453        let added_files_to_check = match self.isolation_level {
454            IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
455                // don't conflict with blind appends
456                self.winning_commit_summary.changed_data_added_files()
457            }
458            IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
459                let mut files = self.winning_commit_summary.changed_data_added_files();
460                files.extend(self.winning_commit_summary.blind_append_added_files());
461                files
462            }
463            IsolationLevel::SnapshotIsolation => vec![],
464        };
465
466        // Here we need to check if the current transaction would have read the
467        // added files. for this we need to be able to evaluate predicates. Err on the safe side is
468        // to assume all files match
469        cfg_if::cfg_if! {
470            if #[cfg(feature = "datafusion")] {
471                let added_files_matching_predicates = if let (Some(predicate), false) = (
472                    &self.txn_info.read_predicates,
473                    self.txn_info.read_whole_table(),
474                ) {
475                    let arrow_schema = self.txn_info.read_snapshot.arrow_schema().map_err(|err| {
476                        CommitConflictError::CorruptedState {
477                            source: Box::new(err),
478                        }
479                    })?;
480                    let partition_columns = &self
481                        .txn_info
482                        .read_snapshot
483                        .metadata()
484                        .partition_columns;
485                    AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
486                        .predicate_matches(predicate.clone())
487                        .map_err(|err| CommitConflictError::Predicate {
488                            source: Box::new(err),
489                        })?
490                        .cloned()
491                        .collect::<Vec<_>>()
492                } else if self.txn_info.read_whole_table() {
493                    added_files_to_check
494                } else {
495                    vec![]
496                };
497            } else {
498                let added_files_matching_predicates = if self.txn_info.read_whole_table()
499                {
500                    added_files_to_check
501                } else {
502                    vec![]
503                };
504            }
505        }
506
507        if !added_files_matching_predicates.is_empty() {
508            Err(CommitConflictError::ConcurrentAppend)
509        } else {
510            Ok(())
511        }
512    }
513
514    /// Check if [Remove] actions added by already committed transactions
515    /// conflicts with files read by the current transaction.
516    fn check_for_deleted_files_against_current_txn_read_files(
517        &self,
518    ) -> Result<(), CommitConflictError> {
519        // Fail if files have been deleted that the txn read.
520        let read_file_path: HashSet<String> = self
521            .txn_info
522            .read_files()?
523            .map(|f| f.path.clone())
524            .collect();
525        let deleted_read_overlap = self
526            .winning_commit_summary
527            .removed_files()
528            .iter()
529            .find(|&f| read_file_path.contains(&f.path))
530            .cloned();
531        if deleted_read_overlap.is_some()
532            || (!self.winning_commit_summary.removed_files().is_empty()
533                && self.txn_info.read_whole_table())
534        {
535            Err(CommitConflictError::ConcurrentDeleteRead)
536        } else {
537            Ok(())
538        }
539    }
540
541    /// Check if [Remove] actions added by already committed transactions conflicts
542    /// with [Remove] actions this transaction is trying to add.
543    fn check_for_deleted_files_against_current_txn_deleted_files(
544        &self,
545    ) -> Result<(), CommitConflictError> {
546        // Fail if a file is deleted twice.
547        let txn_deleted_files: HashSet<String> = self
548            .txn_info
549            .actions
550            .iter()
551            .cloned()
552            .filter_map(|action| match action {
553                Action::Remove(remove) => Some(remove.path),
554                _ => None,
555            })
556            .collect();
557        let winning_deleted_files: HashSet<String> = self
558            .winning_commit_summary
559            .removed_files()
560            .iter()
561            .cloned()
562            .map(|r| r.path)
563            .collect();
564        let intersection: HashSet<&String> = txn_deleted_files
565            .intersection(&winning_deleted_files)
566            .collect();
567
568        if !intersection.is_empty() {
569            Err(CommitConflictError::ConcurrentDeleteDelete)
570        } else {
571            Ok(())
572        }
573    }
574
575    /// Checks if the winning transaction corresponds to some AppId on which
576    /// current transaction also depends.
577    fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
578        &self,
579    ) -> Result<(), CommitConflictError> {
580        // Fail if the appIds seen by the current transaction has been updated by the winning
581        // transaction i.e. the winning transaction have [Txn] corresponding to
582        // some appId on which current transaction depends on. Example - This can happen when
583        // multiple instances of the same streaming query are running at the same time.
584        let winning_txns = self.winning_commit_summary.app_level_transactions();
585        let txn_overlap: HashSet<&String> = winning_txns
586            .intersection(&self.txn_info.read_app_ids)
587            .collect();
588        if !txn_overlap.is_empty() {
589            Err(CommitConflictError::ConcurrentTransaction)
590        } else {
591            Ok(())
592        }
593    }
594}
595
596// implementation and comments adopted from
597// https://github.com/delta-io/delta/blob/1c18c1d972e37d314711b3a485e6fb7c98fce96d/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1268
598//
599// For no-data-change transactions such as OPTIMIZE/Auto Compaction/ZorderBY, we can
600// change the isolation level to SnapshotIsolation. SnapshotIsolation allows reduced conflict
601// detection by skipping the
602// [ConflictChecker::check_for_added_files_that_should_have_been_read_by_current_txn] check i.e.
603// don't worry about concurrent appends.
604//
605// We can also use SnapshotIsolation for empty transactions. e.g. consider a commit:
606// t0 - Initial state of table
607// t1 - Q1, Q2 starts
608// t2 - Q1 commits
609// t3 - Q2 is empty and wants to commit.
610// In this scenario, we can always allow Q2 to commit without worrying about new files
611// generated by Q1.
612//
613// The final order which satisfies both Serializability and WriteSerializability is: Q2, Q1
614// Note that Metadata only update transactions shouldn't be considered empty. If Q2 above has
615// a Metadata update (say schema change/identity column high watermark update), then Q2 can't
616// be moved above Q1 in the final SERIALIZABLE order. This is because if Q2 is moved above Q1,
617// then Q1 should see the updates from Q2 - which actually didn't happen.
618pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
619    actions: impl IntoIterator<Item = &'a Action>,
620    operation: &DeltaOperation,
621    isolation_level: &IsolationLevel,
622) -> bool {
623    let mut data_changed = false;
624    let mut has_non_file_actions = false;
625    for action in actions {
626        match action {
627            Action::Add(act) if act.data_change => data_changed = true,
628            Action::Remove(rem) if rem.data_change => data_changed = true,
629            _ => has_non_file_actions = true,
630        }
631    }
632
633    if has_non_file_actions {
634        // if Non-file-actions are present (e.g. METADATA etc.), then don't downgrade the isolation level.
635        return false;
636    }
637
638    match isolation_level {
639        IsolationLevel::Serializable => !data_changed,
640        IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
641        IsolationLevel::SnapshotIsolation => false, // this case should never happen, since spanpshot isolation cannot be configured on table
642    }
643}
644
645#[cfg(test)]
646#[allow(unused)]
647mod tests {
648    use std::collections::HashMap;
649
650    #[cfg(feature = "datafusion")]
651    use datafusion_expr::{col, lit};
652    use serde_json::json;
653
654    use super::*;
655    use crate::kernel::Action;
656    use crate::test_utils::{ActionFactory, TestSchemas};
657
658    fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
659        ActionFactory::add(
660            TestSchemas::simple(),
661            HashMap::from_iter([("value", (min, max))]),
662            Default::default(),
663            true,
664        )
665    }
666
667    fn init_table_actions() -> Vec<Action> {
668        vec![
669            ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
670            ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
671        ]
672    }
673
674    #[test]
675    fn test_can_downgrade_to_snapshot_isolation() {
676        let isolation = IsolationLevel::WriteSerializable;
677        let operation = DeltaOperation::Optimize {
678            predicate: None,
679            target_size: 0,
680        };
681        let add =
682            ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
683        let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
684        assert!(!res)
685    }
686
687    // Check whether the test transaction conflict with the concurrent writes by executing the
688    // given params in the following order:
689    // - setup (including setting table isolation level
690    // - reads
691    // - concurrentWrites
692    // - actions
693    #[cfg(feature = "datafusion")]
694    fn execute_test(
695        setup: Option<Vec<Action>>,
696        reads: Option<Expr>,
697        concurrent: Vec<Action>,
698        actions: Vec<Action>,
699        read_whole_table: bool,
700    ) -> Result<(), CommitConflictError> {
701        use crate::table::state::DeltaTableState;
702
703        let setup_actions = setup.unwrap_or_else(init_table_actions);
704        let state = DeltaTableState::from_actions(setup_actions).unwrap();
705        let snapshot = state.snapshot();
706        let transaction_info = TransactionInfo::new(snapshot, reads, &actions, read_whole_table);
707        let summary = WinningCommitSummary {
708            actions: concurrent,
709            commit_info: None,
710        };
711        let checker = ConflictChecker::new(transaction_info, summary, None);
712        checker.check_conflicts()
713    }
714
715    #[tokio::test]
716    #[cfg(feature = "datafusion")]
717    // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94
718    async fn test_allowed_concurrent_actions() {
719        // append - append
720        // append file to table while a concurrent writer also appends a file
721        let file1 = simple_add(true, "1", "10").into();
722        let file2 = simple_add(true, "1", "10").into();
723
724        let result = execute_test(None, None, vec![file1], vec![file2], false);
725        assert!(result.is_ok());
726
727        // disjoint delete - read
728        // the concurrent transaction deletes a file that the current transaction did NOT read
729        let file_not_read = simple_add(true, "1", "10");
730        let file_read = simple_add(true, "100", "10000").into();
731        let mut setup_actions = init_table_actions();
732        setup_actions.push(file_not_read.clone().into());
733        setup_actions.push(file_read);
734        let result = execute_test(
735            Some(setup_actions),
736            Some(col("value").gt(lit::<i32>(10))),
737            vec![ActionFactory::remove(&file_not_read, true).into()],
738            vec![],
739            false,
740        );
741        assert!(result.is_ok());
742
743        // disjoint add - read
744        // concurrently add file, that the current transaction would not have read
745        let file_added = simple_add(true, "1", "10").into();
746        let file_read = simple_add(true, "100", "10000").into();
747        let mut setup_actions = init_table_actions();
748        setup_actions.push(file_read);
749        let result = execute_test(
750            Some(setup_actions),
751            Some(col("value").gt(lit::<i32>(10))),
752            vec![file_added],
753            vec![],
754            false,
755        );
756        assert!(result.is_ok());
757
758        // TODO enable test once we have isolation level downcast
759        // add / read + no write
760        // transaction would have read file added by concurrent txn, but does not write data,
761        // so no real conflicting change even though data was added
762        // let file_added = tu::create_add_action("file_added", true, get_stats(1, 10));
763        // let result = execute_test(
764        //     None,
765        //     Some(col("value").gt(lit::<i32>(5))),
766        //     vec![file_added],
767        //     vec![],
768        //     false,
769        // );
770        // assert!(result.is_ok());
771
772        // TODO disjoint transactions
773    }
774
775    #[tokio::test]
776    #[cfg(feature = "datafusion")]
777    // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94
778    async fn test_disallowed_concurrent_actions() {
779        // delete - delete
780        // remove file from table that has previously been removed
781        let removed_file = simple_add(true, "1", "10");
782        let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
783        let result = execute_test(
784            None,
785            None,
786            vec![removed_file.clone()],
787            vec![removed_file],
788            false,
789        );
790        assert!(matches!(
791            result,
792            Err(CommitConflictError::ConcurrentDeleteDelete)
793        ));
794
795        // add / read + write
796        // a file is concurrently added that should have been read by the current transaction
797        let file_added = simple_add(true, "1", "10").into();
798        let file_should_have_read = simple_add(true, "1", "10").into();
799        let result = execute_test(
800            None,
801            Some(col("value").lt_eq(lit::<i32>(10))),
802            vec![file_should_have_read],
803            vec![file_added],
804            false,
805        );
806        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
807
808        // delete / read
809        // transaction reads a file that is removed by concurrent transaction
810        let file_read = simple_add(true, "1", "10");
811        let mut setup_actions = init_table_actions();
812        setup_actions.push(file_read.clone().into());
813        let result = execute_test(
814            Some(setup_actions),
815            Some(col("value").lt_eq(lit::<i32>(10))),
816            vec![ActionFactory::remove(&file_read, true).into()],
817            vec![],
818            false,
819        );
820        assert!(matches!(
821            result,
822            Err(CommitConflictError::ConcurrentDeleteRead)
823        ));
824
825        // schema change
826        // concurrent transactions changes table metadata
827        let result = execute_test(
828            None,
829            None,
830            vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
831            vec![],
832            false,
833        );
834        assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
835
836        // upgrade / upgrade
837        // current and concurrent transactions change the protocol version
838        let result = execute_test(
839            None,
840            None,
841            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
842            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
843            false,
844        );
845        assert!(matches!(
846            result,
847            Err(CommitConflictError::ProtocolChanged(_))
848        ));
849
850        // taint whole table
851        // `read_whole_table` should disallow any concurrent change, even if the change
852        // is disjoint with the earlier filter
853        let file_part1 = simple_add(true, "1", "10").into();
854        let file_part2 = simple_add(true, "11", "100").into();
855        let file_part3 = simple_add(true, "101", "1000").into();
856        let mut setup_actions = init_table_actions();
857        setup_actions.push(file_part1);
858        let result = execute_test(
859            Some(setup_actions),
860            // filter matches neither existing nor added files
861            Some(col("value").lt(lit::<i32>(0))),
862            vec![file_part2],
863            vec![file_part3],
864            true,
865        );
866        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
867
868        // taint whole table + concurrent remove
869        // `read_whole_table` should disallow any concurrent remove actions
870        let file_part1 = simple_add(true, "1", "10");
871        let file_part2 = simple_add(true, "11", "100").into();
872        let mut setup_actions = init_table_actions();
873        setup_actions.push(file_part1.clone().into());
874        let result = execute_test(
875            Some(setup_actions),
876            None,
877            vec![ActionFactory::remove(&file_part1, true).into()],
878            vec![file_part2],
879            true,
880        );
881        assert!(matches!(
882            result,
883            Err(CommitConflictError::ConcurrentDeleteRead)
884        ));
885
886        // TODO "add in part=2 / read from part=1,2 and write to part=1"
887
888        // TODO conflicting txns
889    }
890}