Skip to main content

deltalake_core/kernel/transaction/
conflict_checker.rs

1//! Helper module to check if a transaction can be committed in case of conflicting commits.
2use std::collections::HashSet;
3
4use delta_kernel::table_properties::IsolationLevel;
5
6use super::CommitInfo;
7use crate::DeltaTableError;
8#[cfg(feature = "datafusion")]
9use crate::delta_datafusion::DataFusionMixins;
10use crate::errors::DeltaResult;
11use crate::kernel::{Action, Add, LogDataHandler, Metadata, Protocol, Remove, Transaction};
12use crate::logstore::{LogStore, get_actions};
13use crate::protocol::DeltaOperation;
14use crate::table::config::TablePropertiesExt as _;
15
16#[cfg(feature = "datafusion")]
17use super::state::AddContainer;
18#[cfg(feature = "datafusion")]
19use datafusion::logical_expr::Expr;
20#[cfg(feature = "datafusion")]
21use itertools::Either;
22
23/// Exceptions raised during commit conflict resolution
24#[derive(thiserror::Error, Debug)]
25pub enum CommitConflictError {
26    /// This exception occurs when a concurrent operation adds files in the same partition
27    /// (or anywhere in an un-partitioned table) that your operation reads. The file additions
28    /// can be caused by INSERT, DELETE, UPDATE, or MERGE operations.
29    #[error(
30        "Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
31    )]
32    ConcurrentAppend,
33
34    /// This exception occurs when a concurrent operation deleted a file that your operation read.
35    /// Common causes are a DELETE, UPDATE, or MERGE operation that rewrites files.
36    #[error(
37        "Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
38    )]
39    ConcurrentDeleteRead,
40
41    /// This exception occurs when a concurrent operation deleted a file that your operation also deletes.
42    /// This could be caused by two concurrent compaction operations rewriting the same files.
43    #[error(
44        "Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data."
45    )]
46    ConcurrentDeleteDelete,
47
48    /// This exception occurs when a concurrent transaction updates the metadata of a Delta table.
49    /// Common causes are ALTER TABLE operations or writes to your Delta table that update the schema of the table.
50    #[error("Metadata changed since last commit.")]
51    MetadataChanged,
52
53    /// If a streaming query using the same checkpoint location is started multiple times concurrently
54    /// and tries to write to the Delta table at the same time. You should never have two streaming
55    /// queries use the same checkpoint location and run at the same time.
56    #[error("Concurrent transaction failed.")]
57    ConcurrentTransaction,
58
59    /// This exception can occur in the following cases:
60    /// - When your Delta table is upgraded to a new version. For future operations to succeed
61    ///   you may need to upgrade your Delta Lake version.
62    /// - When multiple writers are creating or replacing a table at the same time.
63    /// - When multiple writers are writing to an empty path at the same time.
64    #[error("Protocol changed since last commit: {0}")]
65    ProtocolChanged(String),
66
67    /// Error returned when the table requires an unsupported writer version
68    #[error("Delta-rs does not support writer version {0}")]
69    UnsupportedWriterVersion(i32),
70
71    /// Error returned when the table requires an unsupported writer version
72    #[error("Delta-rs does not support reader version {0}")]
73    UnsupportedReaderVersion(i32),
74
75    /// Error returned when the snapshot has missing or corrupted data
76    #[error("Snapshot is corrupted: {source}")]
77    CorruptedState {
78        /// Source error
79        source: Box<dyn std::error::Error + Send + Sync + 'static>,
80    },
81
82    /// Error returned when evaluating predicate
83    #[error("Error evaluating predicate: {source}")]
84    Predicate {
85        /// Source error
86        source: Box<dyn std::error::Error + Send + Sync + 'static>,
87    },
88
89    /// Error returned when no metadata was found in the DeltaTable.
90    #[error("No metadata found, please make sure table is loaded.")]
91    NoMetadata,
92}
93
94/// A struct representing different attributes of current transaction needed for conflict detection.
95#[allow(unused)]
96pub(crate) struct TransactionInfo<'a> {
97    txn_id: String,
98    /// partition predicates by which files have been queried by the transaction
99    ///
100    /// If any new data files or removed data files match this predicate, the
101    /// transaction should fail.
102    #[cfg(not(feature = "datafusion"))]
103    read_predicates: Option<String>,
104    /// partition predicates by which files have been queried by the transaction
105    #[cfg(feature = "datafusion")]
106    read_predicates: Option<Expr>,
107    /// appIds that have been seen by the transaction
108    read_app_ids: HashSet<String>,
109    /// delta log actions that the transaction wants to commit
110    actions: &'a [Action],
111    /// read [`DeltaTableState`] used for the transaction
112    read_snapshot: LogDataHandler<'a>,
113    /// Whether the transaction tainted the whole table
114    read_whole_table: bool,
115}
116
117impl<'a> TransactionInfo<'a> {
118    #[cfg(feature = "datafusion")]
119    pub fn try_new(
120        read_snapshot: LogDataHandler<'a>,
121        read_predicates: Option<String>,
122        actions: &'a [Action],
123        read_whole_table: bool,
124    ) -> DeltaResult<Self> {
125        use datafusion::prelude::SessionContext;
126
127        let session = SessionContext::new();
128        let read_predicates = read_predicates
129            .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
130            .transpose()?;
131
132        let mut read_app_ids = HashSet::<String>::new();
133        for action in actions.iter() {
134            if let Action::Txn(Transaction { app_id, .. }) = action {
135                read_app_ids.insert(app_id.clone());
136            }
137        }
138
139        Ok(Self::new(
140            read_snapshot,
141            read_predicates,
142            actions,
143            read_whole_table,
144        ))
145    }
146
147    #[cfg(feature = "datafusion")]
148    pub fn new(
149        read_snapshot: LogDataHandler<'a>,
150        read_predicates: Option<Expr>,
151        actions: &'a [Action],
152        read_whole_table: bool,
153    ) -> Self {
154        let mut read_app_ids = HashSet::<String>::new();
155        for action in actions.iter() {
156            if let Action::Txn(Transaction { app_id, .. }) = action {
157                read_app_ids.insert(app_id.clone());
158            }
159        }
160        Self {
161            txn_id: "".into(),
162            read_predicates,
163            read_app_ids,
164            actions,
165            read_snapshot,
166            read_whole_table,
167        }
168    }
169
170    #[cfg(not(feature = "datafusion"))]
171    pub fn try_new(
172        read_snapshot: LogDataHandler<'a>,
173        read_predicates: Option<String>,
174        actions: &'a Vec<Action>,
175        read_whole_table: bool,
176    ) -> DeltaResult<Self> {
177        let mut read_app_ids = HashSet::<String>::new();
178        for action in actions.iter() {
179            if let Action::Txn(Transaction { app_id, .. }) = action {
180                read_app_ids.insert(app_id.clone());
181            }
182        }
183        Ok(Self {
184            txn_id: "".into(),
185            read_predicates,
186            read_app_ids,
187            actions,
188            read_snapshot,
189            read_whole_table,
190        })
191    }
192
193    /// Whether the transaction changed the tables metadatas
194    pub fn metadata_changed(&self) -> bool {
195        self.actions
196            .iter()
197            .any(|a| matches!(a, Action::Metadata(_)))
198    }
199
200    #[cfg(feature = "datafusion")]
201    /// Files read by the transaction
202    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
203        use crate::delta_datafusion::files_matching_predicate;
204
205        if let Some(predicate) = &self.read_predicates {
206            Ok(Either::Left(
207                files_matching_predicate(
208                    self.read_snapshot.clone(),
209                    std::slice::from_ref(predicate),
210                )
211                .map_err(|err| CommitConflictError::Predicate {
212                    source: Box::new(err),
213                })?,
214            ))
215        } else {
216            Ok(Either::Right(
217                self.read_snapshot.iter().map(|f| f.add_action()),
218            ))
219        }
220    }
221
222    #[cfg(not(feature = "datafusion"))]
223    /// Files read by the transaction
224    pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
225        Ok(self.read_snapshot.iter().map(|f| f.add_action()))
226    }
227
228    /// Whether the whole table was read during the transaction
229    pub fn read_whole_table(&self) -> bool {
230        self.read_whole_table
231    }
232}
233
234/// Summary of the Winning commit against which we want to check the conflict
235#[derive(Debug)]
236pub(crate) struct WinningCommitSummary {
237    pub actions: Vec<Action>,
238    pub commit_info: Option<CommitInfo>,
239}
240
241impl WinningCommitSummary {
242    pub async fn try_new(
243        log_store: &dyn LogStore,
244        read_version: i64,
245        winning_commit_version: i64,
246    ) -> DeltaResult<Self> {
247        // NOTE using assert, since a wrong version would right now mean a bug in our code.
248        assert_eq!(winning_commit_version, read_version + 1);
249
250        let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
251        match commit_log_bytes {
252            Some(bytes) => {
253                let actions = get_actions(winning_commit_version, &bytes)?; // ← ADD ? HERE
254                let commit_info = actions
255                    .iter()
256                    .find(|action| matches!(action, Action::CommitInfo(_)))
257                    .map(|action| match action {
258                        Action::CommitInfo(info) => info.clone(),
259                        _ => unreachable!(),
260                    });
261
262                Ok(Self {
263                    actions,
264                    commit_info,
265                })
266            }
267            None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
268        }
269    }
270
271    pub fn metadata_updates(&self) -> Vec<Metadata> {
272        self.actions
273            .iter()
274            .cloned()
275            .filter_map(|action| match action {
276                Action::Metadata(metadata) => Some(metadata),
277                _ => None,
278            })
279            .collect()
280    }
281
282    pub fn app_level_transactions(&self) -> HashSet<String> {
283        self.actions
284            .iter()
285            .cloned()
286            .filter_map(|action| match action {
287                Action::Txn(txn) => Some(txn.app_id),
288                _ => None,
289            })
290            .collect()
291    }
292
293    pub fn protocol(&self) -> Vec<Protocol> {
294        self.actions
295            .iter()
296            .cloned()
297            .filter_map(|action| match action {
298                Action::Protocol(protocol) => Some(protocol),
299                _ => None,
300            })
301            .collect()
302    }
303
304    pub fn removed_files(&self) -> Vec<Remove> {
305        self.actions
306            .iter()
307            .cloned()
308            .filter_map(|action| match action {
309                Action::Remove(remove) => Some(remove),
310                _ => None,
311            })
312            .collect()
313    }
314
315    pub fn added_files(&self) -> Vec<Add> {
316        self.actions
317            .iter()
318            .cloned()
319            .filter_map(|action| match action {
320                Action::Add(add) => Some(add),
321                _ => None,
322            })
323            .collect()
324    }
325
326    pub fn blind_append_added_files(&self) -> Vec<Add> {
327        if self.is_blind_append().unwrap_or(false) {
328            self.added_files()
329        } else {
330            vec![]
331        }
332    }
333
334    pub fn changed_data_added_files(&self) -> Vec<Add> {
335        if self.is_blind_append().unwrap_or(false) {
336            vec![]
337        } else {
338            self.added_files()
339        }
340    }
341
342    pub fn is_blind_append(&self) -> Option<bool> {
343        self.commit_info
344            .as_ref()
345            .map(|opt| opt.is_blind_append.unwrap_or(false))
346    }
347}
348
349/// Checks if a failed commit may be committed after a conflicting winning commit
350pub(crate) struct ConflictChecker<'a> {
351    /// transaction information for current transaction at start of check
352    txn_info: TransactionInfo<'a>,
353    /// Summary of the transaction, that has been committed ahead of the current transaction
354    winning_commit_summary: WinningCommitSummary,
355    /// Isolation level for the current transaction
356    isolation_level: IsolationLevel,
357}
358
359impl<'a> ConflictChecker<'a> {
360    pub fn new(
361        transaction_info: TransactionInfo<'a>,
362        winning_commit_summary: WinningCommitSummary,
363        operation: Option<&DeltaOperation>,
364    ) -> ConflictChecker<'a> {
365        let isolation_level = operation
366            .and_then(|op| {
367                if can_downgrade_to_snapshot_isolation(
368                    &winning_commit_summary.actions,
369                    op,
370                    &transaction_info
371                        .read_snapshot
372                        .table_properties()
373                        .isolation_level(),
374                ) {
375                    Some(IsolationLevel::SnapshotIsolation)
376                } else {
377                    None
378                }
379            })
380            .unwrap_or_else(|| {
381                transaction_info
382                    .read_snapshot
383                    .table_properties()
384                    .isolation_level()
385            });
386
387        Self {
388            txn_info: transaction_info,
389            winning_commit_summary,
390            isolation_level,
391        }
392    }
393
394    /// This function checks conflict of the `initial_current_transaction_info` against the
395    /// `winning_commit_version` and returns an updated [`TransactionInfo`] that represents
396    /// the transaction as if it had started while reading the `winning_commit_version`.
397    pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
398        self.check_protocol_compatibility()?;
399        self.check_no_metadata_updates()?;
400        self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
401        self.check_for_deleted_files_against_current_txn_read_files()?;
402        self.check_for_deleted_files_against_current_txn_deleted_files()?;
403        self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
404        Ok(())
405    }
406
407    /// Asserts that the client is up to date with the protocol and is allowed
408    /// to read and write against the protocol set by the committed transaction.
409    fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
410        for p in self.winning_commit_summary.protocol() {
411            let (win_read, curr_read) = (
412                p.min_reader_version(),
413                self.txn_info.read_snapshot.protocol().min_reader_version(),
414            );
415            let (win_write, curr_write) = (
416                p.min_writer_version(),
417                self.txn_info.read_snapshot.protocol().min_writer_version(),
418            );
419            if curr_read < win_read || win_write < curr_write {
420                return Err(CommitConflictError::ProtocolChanged(format!(
421                    "required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"
422                )));
423            };
424        }
425        if !self.winning_commit_summary.protocol().is_empty()
426            && self
427                .txn_info
428                .actions
429                .iter()
430                .any(|a| matches!(a, Action::Protocol(_)))
431        {
432            return Err(CommitConflictError::ProtocolChanged(
433                "protocol changed".into(),
434            ));
435        };
436        Ok(())
437    }
438
439    /// Check if the committed transaction has changed metadata.
440    fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
441        // Fail if the metadata is different than what the txn read.
442        if !self.winning_commit_summary.metadata_updates().is_empty() {
443            Err(CommitConflictError::MetadataChanged)
444        } else {
445            Ok(())
446        }
447    }
448
449    /// Check if the new files added by the already committed transactions
450    /// should have been read by the current transaction.
451    fn check_for_added_files_that_should_have_been_read_by_current_txn(
452        &self,
453    ) -> Result<(), CommitConflictError> {
454        // Skip check, if the operation can be downgraded to snapshot isolation
455        if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
456            return Ok(());
457        }
458
459        // Fail if new files have been added that the txn should have read.
460        let added_files_to_check = match self.isolation_level {
461            IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
462                // don't conflict with blind appends
463                self.winning_commit_summary.changed_data_added_files()
464            }
465            IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
466                let mut files = self.winning_commit_summary.changed_data_added_files();
467                files.extend(self.winning_commit_summary.blind_append_added_files());
468                files
469            }
470            IsolationLevel::SnapshotIsolation => vec![],
471        };
472
473        // Here we need to check if the current transaction would have read the
474        // added files. for this we need to be able to evaluate predicates. Err on the safe side is
475        // to assume all files match
476        cfg_if::cfg_if! {
477            if #[cfg(feature = "datafusion")] {
478                let added_files_matching_predicates = if let (Some(predicate), false) = (
479                    &self.txn_info.read_predicates,
480                    self.txn_info.read_whole_table(),
481                ) {
482                    let arrow_schema = self.txn_info.read_snapshot.read_schema();
483                    let partition_columns = &self
484                        .txn_info
485                        .read_snapshot
486                        .metadata()
487                        .partition_columns();
488                    AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
489                        .predicate_matches(predicate.clone())
490                        .map_err(|err| CommitConflictError::Predicate {
491                            source: Box::new(err),
492                        })?
493                        .cloned()
494                        .collect::<Vec<_>>()
495                } else if self.txn_info.read_whole_table() {
496                    added_files_to_check
497                } else {
498                    vec![]
499                };
500            } else {
501                let added_files_matching_predicates = if self.txn_info.read_whole_table()
502                {
503                    added_files_to_check
504                } else {
505                    vec![]
506                };
507            }
508        }
509
510        if !added_files_matching_predicates.is_empty() {
511            Err(CommitConflictError::ConcurrentAppend)
512        } else {
513            Ok(())
514        }
515    }
516
517    /// Check if [Remove] actions added by already committed transactions
518    /// conflicts with files read by the current transaction.
519    fn check_for_deleted_files_against_current_txn_read_files(
520        &self,
521    ) -> Result<(), CommitConflictError> {
522        // Fail if files have been deleted that the txn read.
523        let read_file_path: HashSet<String> = self
524            .txn_info
525            .read_files()?
526            .map(|f| f.path.clone())
527            .collect();
528
529        // Only consider removals with data_change = true as conflicts.
530        // Removals with data_change = false (e.g., from OPTIMIZE/compaction)
531        // don't change the logical data, only the physical layout, so they
532        // shouldn't conflict with concurrent read operations.
533        let removed_files_with_data_change: Vec<Remove> = self
534            .winning_commit_summary
535            .removed_files()
536            .into_iter()
537            .filter(|r| r.data_change)
538            .collect();
539
540        let deleted_read_overlap = removed_files_with_data_change
541            .iter()
542            .find(|f| read_file_path.contains(&f.path));
543
544        if deleted_read_overlap.is_some()
545            || (!removed_files_with_data_change.is_empty() && self.txn_info.read_whole_table())
546        {
547            Err(CommitConflictError::ConcurrentDeleteRead)
548        } else {
549            Ok(())
550        }
551    }
552
553    /// Check if [Remove] actions added by already committed transactions conflicts
554    /// with [Remove] actions this transaction is trying to add.
555    fn check_for_deleted_files_against_current_txn_deleted_files(
556        &self,
557    ) -> Result<(), CommitConflictError> {
558        // Fail if a file is deleted twice.
559        let txn_deleted_files: HashSet<String> = self
560            .txn_info
561            .actions
562            .iter()
563            .cloned()
564            .filter_map(|action| match action {
565                Action::Remove(remove) => Some(remove.path),
566                _ => None,
567            })
568            .collect();
569        let winning_deleted_files: HashSet<String> = self
570            .winning_commit_summary
571            .removed_files()
572            .iter()
573            .cloned()
574            .map(|r| r.path)
575            .collect();
576        let intersection: HashSet<&String> = txn_deleted_files
577            .intersection(&winning_deleted_files)
578            .collect();
579
580        if !intersection.is_empty() {
581            Err(CommitConflictError::ConcurrentDeleteDelete)
582        } else {
583            Ok(())
584        }
585    }
586
587    /// Checks if the winning transaction corresponds to some AppId on which
588    /// current transaction also depends.
589    fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
590        &self,
591    ) -> Result<(), CommitConflictError> {
592        // Fail if the appIds seen by the current transaction has been updated by the winning
593        // transaction i.e. the winning transaction have [Txn] corresponding to
594        // some appId on which current transaction depends on. Example - This can happen when
595        // multiple instances of the same streaming query are running at the same time.
596        let winning_txns = self.winning_commit_summary.app_level_transactions();
597        let txn_overlap: HashSet<&String> = winning_txns
598            .intersection(&self.txn_info.read_app_ids)
599            .collect();
600        if !txn_overlap.is_empty() {
601            Err(CommitConflictError::ConcurrentTransaction)
602        } else {
603            Ok(())
604        }
605    }
606}
607
608// implementation and comments adopted from
609// https://github.com/delta-io/delta/blob/1c18c1d972e37d314711b3a485e6fb7c98fce96d/core/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala#L1268
610//
611// For no-data-change transactions such as OPTIMIZE/Auto Compaction/ZorderBY, we can
612// change the isolation level to SnapshotIsolation. SnapshotIsolation allows reduced conflict
613// detection by skipping the
614// [ConflictChecker::check_for_added_files_that_should_have_been_read_by_current_txn] check i.e.
615// don't worry about concurrent appends.
616//
617// We can also use SnapshotIsolation for empty transactions. e.g. consider a commit:
618// t0 - Initial state of table
619// t1 - Q1, Q2 starts
620// t2 - Q1 commits
621// t3 - Q2 is empty and wants to commit.
622// In this scenario, we can always allow Q2 to commit without worrying about new files
623// generated by Q1.
624//
625// The final order which satisfies both Serializability and WriteSerializability is: Q2, Q1
626// Note that Metadata only update transactions shouldn't be considered empty. If Q2 above has
627// a Metadata update (say schema change/identity column high watermark update), then Q2 can't
628// be moved above Q1 in the final SERIALIZABLE order. This is because if Q2 is moved above Q1,
629// then Q1 should see the updates from Q2 - which actually didn't happen.
630pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
631    actions: impl IntoIterator<Item = &'a Action>,
632    operation: &DeltaOperation,
633    isolation_level: &IsolationLevel,
634) -> bool {
635    let mut data_changed = false;
636    let mut has_non_file_actions = false;
637    for action in actions {
638        match action {
639            Action::Add(act) if act.data_change => data_changed = true,
640            Action::Remove(rem) if rem.data_change => data_changed = true,
641            _ => has_non_file_actions = true,
642        }
643    }
644
645    if has_non_file_actions {
646        // if Non-file-actions are present (e.g. METADATA etc.), then don't downgrade the isolation level.
647        return false;
648    }
649
650    match isolation_level {
651        IsolationLevel::Serializable => !data_changed,
652        IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
653        IsolationLevel::SnapshotIsolation => false, // this case should never happen, since spanpshot isolation cannot be configured on table
654    }
655}
656
657#[cfg(test)]
658#[allow(unused)]
659mod tests {
660    use std::collections::HashMap;
661
662    #[cfg(feature = "datafusion")]
663    use datafusion::logical_expr::{col, lit};
664    use serde_json::json;
665
666    use super::*;
667    use crate::kernel::Action;
668    use crate::test_utils::{ActionFactory, TestSchemas};
669
670    fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
671        ActionFactory::add(
672            TestSchemas::simple(),
673            HashMap::from_iter([("value", (min, max))]),
674            Default::default(),
675            true,
676        )
677    }
678
679    fn init_table_actions() -> Vec<Action> {
680        vec![
681            ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
682            ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
683        ]
684    }
685
686    #[test]
687    fn test_can_downgrade_to_snapshot_isolation() {
688        let isolation = IsolationLevel::WriteSerializable;
689        let operation = DeltaOperation::Optimize {
690            predicate: None,
691            target_size: 0,
692        };
693        let add =
694            ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
695        let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
696        assert!(!res)
697    }
698
699    // Check whether the test transaction conflict with the concurrent writes by executing the
700    // given params in the following order:
701    // - setup (including setting table isolation level
702    // - reads
703    // - concurrentWrites
704    // - actions
705    #[cfg(feature = "datafusion")]
706    async fn execute_test(
707        setup: Option<Vec<Action>>,
708        reads: Option<Expr>,
709        concurrent: Vec<Action>,
710        actions: Vec<Action>,
711        read_whole_table: bool,
712    ) -> Result<(), CommitConflictError> {
713        use crate::table::state::DeltaTableState;
714        use object_store::path::Path;
715
716        let setup_actions = setup.unwrap_or_else(init_table_actions);
717        let state = DeltaTableState::from_actions(setup_actions).await.unwrap();
718        let snapshot = state.snapshot();
719        let transaction_info =
720            TransactionInfo::new(snapshot.log_data(), reads, &actions, read_whole_table);
721        let summary = WinningCommitSummary {
722            actions: concurrent,
723            commit_info: None,
724        };
725        let checker = ConflictChecker::new(transaction_info, summary, None);
726        checker.check_conflicts()
727    }
728
729    // tests adopted from https://github.com/delta-io/delta/blob/24c025128612a4ae02d0ad958621f928cda9a3ec/core/src/test/scala/org/apache/spark/sql/delta/OptimisticTransactionSuite.scala#L40-L94
730    #[tokio::test]
731    #[cfg(feature = "datafusion")]
732    async fn test_concurrent_append_append() {
733        // append file to table while a concurrent writer also appends a file
734        let file1 = simple_add(true, "1", "10").into();
735        let file2 = simple_add(true, "1", "10").into();
736
737        let result = execute_test(None, None, vec![file1], vec![file2], false).await;
738        assert!(result.is_ok());
739    }
740
741    #[tokio::test]
742    #[cfg(feature = "datafusion")]
743    async fn test_disjoint_delete_read() {
744        // the concurrent transaction deletes a file that the current transaction did NOT read
745        let file_not_read = simple_add(true, "1", "10");
746        let file_read = simple_add(true, "100", "10000").into();
747        let mut setup_actions = init_table_actions();
748        setup_actions.push(file_not_read.clone().into());
749        setup_actions.push(file_read);
750        let result = execute_test(
751            Some(setup_actions),
752            Some(col("value").gt(lit::<i32>(10))),
753            vec![ActionFactory::remove(&file_not_read, true).into()],
754            vec![],
755            false,
756        )
757        .await;
758        assert!(result.is_ok());
759    }
760
761    #[tokio::test]
762    #[cfg(feature = "datafusion")]
763    async fn test_disjoint_add_read() {
764        // concurrently add file, that the current transaction would not have read
765        let file_added = simple_add(true, "1", "10").into();
766        let file_read = simple_add(true, "100", "10000").into();
767        let mut setup_actions = init_table_actions();
768        setup_actions.push(file_read);
769        let result = execute_test(
770            Some(setup_actions),
771            Some(col("value").gt(lit::<i32>(10))),
772            vec![file_added],
773            vec![],
774            false,
775        )
776        .await;
777        assert!(result.is_ok());
778    }
779
780    #[tokio::test]
781    #[cfg(feature = "datafusion")]
782    async fn test_concurrent_delete_delete() {
783        // remove file from table that has previously been removed
784        let removed_file = simple_add(true, "1", "10");
785        let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
786        let result = execute_test(
787            None,
788            None,
789            vec![removed_file.clone()],
790            vec![removed_file],
791            false,
792        )
793        .await;
794        assert!(matches!(
795            result,
796            Err(CommitConflictError::ConcurrentDeleteDelete)
797        ));
798    }
799
800    #[tokio::test]
801    #[cfg(feature = "datafusion")]
802    async fn test_concurrent_add_conflicts_with_read_and_write() {
803        // a file is concurrently added that should have been read by the current transaction
804        let file_added = simple_add(true, "1", "10").into();
805        let file_should_have_read = simple_add(true, "1", "10").into();
806        let result = execute_test(
807            None,
808            Some(col("value").lt_eq(lit::<i32>(10))),
809            vec![file_should_have_read],
810            vec![file_added],
811            false,
812        )
813        .await;
814        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
815    }
816
817    #[tokio::test]
818    #[cfg(feature = "datafusion")]
819    async fn test_concurrent_delete_conflicts_with_read() {
820        // transaction reads a file that is removed by concurrent transaction
821        let file_read = simple_add(true, "1", "10");
822        let mut setup_actions = init_table_actions();
823        setup_actions.push(file_read.clone().into());
824        let result = execute_test(
825            Some(setup_actions),
826            Some(col("value").lt_eq(lit::<i32>(10))),
827            vec![ActionFactory::remove(&file_read, true).into()],
828            vec![],
829            false,
830        )
831        .await;
832        assert!(matches!(
833            result,
834            Err(CommitConflictError::ConcurrentDeleteRead)
835        ));
836    }
837
838    #[tokio::test]
839    #[cfg(feature = "datafusion")]
840    async fn test_concurrent_metadata_change() {
841        // concurrent transactions changes table metadata
842        let result = execute_test(
843            None,
844            None,
845            vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
846            vec![],
847            false,
848        )
849        .await;
850        assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
851    }
852
853    #[tokio::test]
854    #[cfg(feature = "datafusion")]
855    async fn test_concurrent_protocol_upgrade() {
856        // current and concurrent transactions change the protocol version
857        let result = execute_test(
858            None,
859            None,
860            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
861            vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
862            false,
863        )
864        .await;
865        assert!(matches!(
866            result,
867            Err(CommitConflictError::ProtocolChanged(_))
868        ));
869    }
870
871    #[tokio::test]
872    #[cfg(feature = "datafusion")]
873    async fn test_read_whole_table_disallows_concurrent_append() {
874        // `read_whole_table` should disallow any concurrent change, even if the change
875        // is disjoint with the earlier filter
876        let file_part1 = simple_add(true, "1", "10").into();
877        let file_part2 = simple_add(true, "11", "100").into();
878        let file_part3 = simple_add(true, "101", "1000").into();
879        let mut setup_actions = init_table_actions();
880        setup_actions.push(file_part1);
881        let result = execute_test(
882            Some(setup_actions),
883            // filter matches neither existing nor added files
884            Some(col("value").lt(lit::<i32>(0))),
885            vec![file_part2],
886            vec![file_part3],
887            true,
888        )
889        .await;
890        assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
891    }
892
893    #[tokio::test]
894    #[cfg(feature = "datafusion")]
895    async fn test_read_whole_table_disallows_concurrent_remove() {
896        // `read_whole_table` should disallow any concurrent remove actions
897        let file_part1 = simple_add(true, "1", "10");
898        let file_part2 = simple_add(true, "11", "100").into();
899        let mut setup_actions = init_table_actions();
900        setup_actions.push(file_part1.clone().into());
901        let result = execute_test(
902            Some(setup_actions),
903            None,
904            vec![ActionFactory::remove(&file_part1, true).into()],
905            vec![file_part2],
906            true,
907        )
908        .await;
909        assert!(matches!(
910            result,
911            Err(CommitConflictError::ConcurrentDeleteRead)
912        ));
913    }
914
915    #[tokio::test]
916    #[cfg(feature = "datafusion")]
917    async fn test_compaction_remove_does_not_conflict_with_read() {
918        // Concurrent compaction (data_change = false) should NOT conflict with reads
919        // A file is removed by a compaction operation (data_change = false) while being read
920        let file_read = simple_add(true, "1", "10");
921        let mut setup_actions = init_table_actions();
922        setup_actions.push(file_read.clone().into());
923
924        // Create a remove action with data_change = false (simulating OPTIMIZE/compaction)
925        let mut compaction_remove = ActionFactory::remove(&file_read, false);
926        compaction_remove.data_change = false;
927
928        let result = execute_test(
929            Some(setup_actions),
930            Some(col("value").lt_eq(lit::<i32>(10))),
931            vec![compaction_remove.into()],
932            vec![],
933            false,
934        )
935        .await;
936        // Should succeed because data_change = false means it's just a physical reorganization
937        assert!(
938            result.is_ok(),
939            "Compaction with data_change=false should not conflict with reads"
940        );
941    }
942
943    #[tokio::test]
944    #[cfg(feature = "datafusion")]
945    async fn test_data_delete_conflicts_with_read() {
946        // Delete with data_change = true should still conflict with reads
947        let file_read = simple_add(true, "1", "10");
948        let mut setup_actions = init_table_actions();
949        setup_actions.push(file_read.clone().into());
950
951        let result = execute_test(
952            Some(setup_actions),
953            Some(col("value").lt_eq(lit::<i32>(10))),
954            vec![ActionFactory::remove(&file_read, true).into()],
955            vec![],
956            false,
957        )
958        .await;
959        // Should fail because data_change = true means logical data was removed
960        assert!(
961            matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
962            "Delete with data_change=true should conflict with reads"
963        );
964    }
965
966    #[tokio::test]
967    #[cfg(feature = "datafusion")]
968    async fn test_compaction_does_not_conflict_with_whole_table_read() {
969        // Concurrent compaction with read_whole_table and data_change = false
970        let file_part1 = simple_add(true, "1", "10");
971        let file_part2 = simple_add(true, "11", "100").into();
972        let mut setup_actions = init_table_actions();
973        setup_actions.push(file_part1.clone().into());
974
975        let compaction_remove = ActionFactory::remove(&file_part1, false);
976
977        let result = execute_test(
978            Some(setup_actions),
979            None,
980            vec![compaction_remove.into()],
981            vec![file_part2],
982            true, // read_whole_table
983        )
984        .await;
985        // Should succeed because data_change = false, even with read_whole_table
986        assert!(
987            result.is_ok(),
988            "Compaction with data_change=false should not conflict even with read_whole_table"
989        );
990    }
991
992    #[tokio::test]
993    #[cfg(feature = "datafusion")]
994    async fn test_multiple_compaction_removes_do_not_conflict() {
995        // Multiple files removed with data_change = false
996        let file1 = simple_add(true, "1", "10");
997        let file2 = simple_add(true, "11", "20");
998        let mut setup_actions = init_table_actions();
999        setup_actions.push(file1.clone().into());
1000        setup_actions.push(file2.clone().into());
1001
1002        let mut remove1 = ActionFactory::remove(&file1, false);
1003        remove1.data_change = false;
1004        let mut remove2 = ActionFactory::remove(&file2, false);
1005        remove2.data_change = false;
1006
1007        let result = execute_test(
1008            Some(setup_actions),
1009            Some(col("value").lt_eq(lit::<i32>(20))),
1010            vec![remove1.into(), remove2.into()],
1011            vec![],
1012            false,
1013        )
1014        .await;
1015        // Should succeed because both removes have data_change = false
1016        assert!(
1017            result.is_ok(),
1018            "Multiple compaction removes with data_change=false should not conflict"
1019        );
1020    }
1021
1022    #[tokio::test]
1023    #[cfg(feature = "datafusion")]
1024    async fn test_mixed_removes_conflict_if_any_data_change() {
1025        // Mixed removes - one with data_change = false, one with data_change = true
1026        let file1 = simple_add(true, "1", "10");
1027        let file2 = simple_add(true, "11", "20");
1028        let mut setup_actions = init_table_actions();
1029        setup_actions.push(file1.clone().into());
1030        setup_actions.push(file2.clone().into());
1031
1032        let mut compaction_remove = ActionFactory::remove(&file1, false);
1033        compaction_remove.data_change = false;
1034        let data_remove = ActionFactory::remove(&file2, true); // data_change = true
1035
1036        let result = execute_test(
1037            Some(setup_actions),
1038            Some(col("value").lt_eq(lit::<i32>(20))),
1039            vec![compaction_remove.into(), data_remove.into()],
1040            vec![],
1041            false,
1042        )
1043        .await;
1044        // Should fail because one of the removes has data_change = true
1045        assert!(
1046            matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
1047            "Mixed removes should conflict if any have data_change=true"
1048        );
1049    }
1050
1051    #[tokio::test]
1052    #[cfg(feature = "datafusion")]
1053    async fn test_concurrent_compaction_double_delete_still_conflicts() {
1054        // Concurrent double delete with data_change = false
1055        // Both transactions try to remove the same file with data_change = false
1056        let removed_file = simple_add(true, "1", "10");
1057        let mut setup_actions = init_table_actions();
1058        setup_actions.push(removed_file.clone().into());
1059
1060        let mut remove1 = ActionFactory::remove(&removed_file, false);
1061        remove1.data_change = false;
1062        let mut remove2 = ActionFactory::remove(&removed_file, false);
1063        remove2.data_change = false;
1064
1065        let result = execute_test(
1066            Some(setup_actions),
1067            None,
1068            vec![remove1.into()],
1069            vec![remove2.into()],
1070            false,
1071        )
1072        .await;
1073        // Should still fail - even with data_change = false, can't delete the same file twice
1074        assert!(
1075            matches!(result, Err(CommitConflictError::ConcurrentDeleteDelete)),
1076            "Concurrent double delete should conflict even with data_change=false"
1077        );
1078    }
1079
1080    #[tokio::test]
1081    #[cfg(feature = "datafusion")]
1082    async fn test_disjoint_partitions_add_and_write() {
1083        // Test for: "add in part=2 / read from part=1,2 and write to part=1"
1084        // Transaction reads from partitions 1 and 2, concurrent txn adds to partition 2,
1085        // and current txn writes to partition 1.
1086        // This should succeed because the write is disjoint from the concurrent add.
1087
1088        let file_part1_existing = simple_add(true, "1", "10");
1089        let file_part2_added = simple_add(true, "100", "200").into();
1090        let file_part1_new = simple_add(true, "5", "15").into();
1091
1092        let mut setup_actions = init_table_actions();
1093        setup_actions.push(file_part1_existing.into());
1094
1095        // Read from both partitions (value <= 200 covers both ranges)
1096        // Concurrent adds file with value 100-200
1097        // Current transaction adds file with value 5-15
1098        let result = execute_test(
1099            Some(setup_actions),
1100            Some(col("value").lt_eq(lit::<i32>(200))),
1101            vec![file_part2_added],
1102            vec![file_part1_new],
1103            false,
1104        )
1105        .await;
1106
1107        // This should fail with ConcurrentAppend because the predicate matches the added file
1108        assert!(
1109            matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1110            "Adding file matching read predicate should conflict"
1111        );
1112    }
1113
1114    #[tokio::test]
1115    #[cfg(feature = "datafusion")]
1116    async fn test_disjoint_partitions_read_write_different_ranges() {
1117        // Test disjoint partitions: read from one range, concurrent write to different range
1118        let file_part1 = simple_add(true, "1", "10");
1119        let file_part2_added = simple_add(true, "100", "200").into();
1120        let file_part1_new = simple_add(true, "5", "15").into();
1121
1122        let mut setup_actions = init_table_actions();
1123        setup_actions.push(file_part1.into());
1124
1125        // Read only from partition 1 (value <= 50)
1126        // Concurrent adds to partition 2 (value 100-200)
1127        // Current transaction adds to partition 1
1128        let result = execute_test(
1129            Some(setup_actions),
1130            Some(col("value").lt_eq(lit::<i32>(50))),
1131            vec![file_part2_added],
1132            vec![file_part1_new],
1133            false,
1134        )
1135        .await;
1136
1137        // This should succeed because the concurrent add is outside the read predicate
1138        assert!(
1139            result.is_ok(),
1140            "Disjoint partition writes should not conflict"
1141        );
1142    }
1143
1144    #[tokio::test]
1145    #[cfg(feature = "datafusion")]
1146    async fn test_conflicting_app_transactions() {
1147        // Test for conflicting application transactions (e.g., duplicate streaming queries)
1148        let file1 = simple_add(true, "1", "10").into();
1149
1150        let app_id = "streaming_query_1".to_string();
1151        let txn_action = Action::Txn(Transaction {
1152            app_id: app_id.clone(),
1153            version: 1,
1154            last_updated: None,
1155        });
1156
1157        // Current transaction depends on app_id
1158        let current_actions = vec![txn_action.clone()];
1159
1160        // Concurrent transaction also updates the same app_id
1161        let concurrent_actions = vec![txn_action, file1];
1162
1163        let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1164
1165        // Should fail because both transactions use the same app_id
1166        assert!(
1167            matches!(result, Err(CommitConflictError::ConcurrentTransaction)),
1168            "Conflicting app transactions should fail"
1169        );
1170    }
1171
1172    #[tokio::test]
1173    #[cfg(feature = "datafusion")]
1174    async fn test_non_conflicting_different_app_transactions() {
1175        // Test non-conflicting application transactions with different app_ids
1176        let file1 = simple_add(true, "1", "10").into();
1177
1178        let app_id1 = "streaming_query_1".to_string();
1179        let app_id2 = "streaming_query_2".to_string();
1180
1181        let txn_action1 = Action::Txn(Transaction {
1182            app_id: app_id1,
1183            version: 1,
1184            last_updated: None,
1185        });
1186
1187        let txn_action2 = Action::Txn(Transaction {
1188            app_id: app_id2,
1189            version: 1,
1190            last_updated: None,
1191        });
1192
1193        // Current transaction depends on app_id1
1194        let current_actions = vec![txn_action1];
1195
1196        // Concurrent transaction updates app_id2
1197        let concurrent_actions = vec![txn_action2, file1];
1198
1199        let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1200
1201        // Should succeed because different app_ids don't conflict
1202        assert!(
1203            result.is_ok(),
1204            "Non-conflicting app transactions should succeed"
1205        );
1206    }
1207
1208    #[tokio::test]
1209    #[cfg(feature = "datafusion")]
1210    async fn test_replace_where_initial_empty_conflicts_on_concurrent_add() {
1211        // Empty predicate read, concurrent add within predicate, then current write -> conflicts
1212        let mut setup_actions = init_table_actions();
1213        setup_actions.push(simple_add(true, "1", "1").into());
1214
1215        let result = execute_test(
1216            Some(setup_actions),
1217            Some(col("value").gt_eq(lit::<i32>(2))), // no files read
1218            vec![simple_add(true, "3", "3").into()], // concurrent add matches predicate
1219            vec![simple_add(true, "2", "2").into()],
1220            false,
1221        )
1222        .await;
1223
1224        assert!(
1225            matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1226            "ReplaceWhere-style empty read should conflict when a matching row is concurrently added"
1227        );
1228    }
1229
1230    #[tokio::test]
1231    #[cfg(feature = "datafusion")]
1232    async fn test_replace_where_disjoint_empty_allows_commit() {
1233        // Empty predicate read, concurrent add outside predicate, then current write -> allowed
1234        let mut setup_actions = init_table_actions();
1235        setup_actions.push(simple_add(true, "1", "1").into());
1236
1237        let result = execute_test(
1238            Some(setup_actions),
1239            Some(
1240                col("value")
1241                    .gt(lit::<i32>(1))
1242                    .and(col("value").lt_eq(lit::<i32>(3))),
1243            ), // empty read
1244            vec![simple_add(true, "5", "5").into()], // disjoint from read predicate
1245            vec![simple_add(true, "2", "2").into()],
1246            false,
1247        )
1248        .await;
1249
1250        assert!(
1251            result.is_ok(),
1252            "Disjoint replaceWhere-style transactions with empty reads should succeed"
1253        );
1254    }
1255}