1use std::collections::HashSet;
3
4use delta_kernel::table_properties::IsolationLevel;
5
6use super::CommitInfo;
7use crate::DeltaTableError;
8#[cfg(feature = "datafusion")]
9use crate::delta_datafusion::DataFusionMixins;
10use crate::errors::DeltaResult;
11use crate::kernel::{
12 Action, Add, LogDataHandler, Metadata, Protocol, Remove, Transaction, Version,
13};
14use crate::logstore::{LogStore, get_actions};
15use crate::protocol::DeltaOperation;
16use crate::table::config::TablePropertiesExt as _;
17
18#[cfg(feature = "datafusion")]
19use super::state::AddContainer;
20#[cfg(feature = "datafusion")]
21use datafusion::logical_expr::Expr;
22#[cfg(feature = "datafusion")]
23use itertools::Either;
24
25#[derive(thiserror::Error, Debug)]
27pub enum CommitConflictError {
28 #[error(
32 "Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
33 )]
34 ConcurrentAppend,
35
36 #[error(
39 "Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
40 )]
41 ConcurrentDeleteRead,
42
43 #[error(
46 "Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data."
47 )]
48 ConcurrentDeleteDelete,
49
50 #[error("Metadata changed since last commit.")]
53 MetadataChanged,
54
55 #[error("Concurrent transaction failed.")]
59 ConcurrentTransaction,
60
61 #[error("Protocol changed since last commit: {0}")]
67 ProtocolChanged(String),
68
69 #[error("Delta-rs does not support writer version {0}")]
71 UnsupportedWriterVersion(i32),
72
73 #[error("Delta-rs does not support reader version {0}")]
75 UnsupportedReaderVersion(i32),
76
77 #[error("Snapshot is corrupted: {source}")]
79 CorruptedState {
80 source: Box<dyn std::error::Error + Send + Sync + 'static>,
82 },
83
84 #[error("Error evaluating predicate: {source}")]
86 Predicate {
87 source: Box<dyn std::error::Error + Send + Sync + 'static>,
89 },
90
91 #[error("No metadata found, please make sure table is loaded.")]
93 NoMetadata,
94}
95
96#[allow(unused)]
98pub(crate) struct TransactionInfo<'a> {
99 txn_id: String,
100 #[cfg(not(feature = "datafusion"))]
105 read_predicates: Option<String>,
106 #[cfg(feature = "datafusion")]
108 read_predicates: Option<Expr>,
109 read_app_ids: HashSet<String>,
111 actions: &'a [Action],
113 read_snapshot: LogDataHandler<'a>,
115 read_whole_table: bool,
117}
118
119impl<'a> TransactionInfo<'a> {
120 #[cfg(feature = "datafusion")]
121 pub fn try_new(
122 read_snapshot: LogDataHandler<'a>,
123 read_predicates: Option<String>,
124 actions: &'a [Action],
125 read_whole_table: bool,
126 ) -> DeltaResult<Self> {
127 use datafusion::prelude::SessionContext;
128
129 let session = SessionContext::new();
130 let read_predicates = read_predicates
131 .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
132 .transpose()?;
133
134 let mut read_app_ids = HashSet::<String>::new();
135 for action in actions.iter() {
136 if let Action::Txn(Transaction { app_id, .. }) = action {
137 read_app_ids.insert(app_id.clone());
138 }
139 }
140
141 Ok(Self::new(
142 read_snapshot,
143 read_predicates,
144 actions,
145 read_whole_table,
146 ))
147 }
148
149 #[cfg(feature = "datafusion")]
150 pub fn new(
151 read_snapshot: LogDataHandler<'a>,
152 read_predicates: Option<Expr>,
153 actions: &'a [Action],
154 read_whole_table: bool,
155 ) -> Self {
156 let mut read_app_ids = HashSet::<String>::new();
157 for action in actions.iter() {
158 if let Action::Txn(Transaction { app_id, .. }) = action {
159 read_app_ids.insert(app_id.clone());
160 }
161 }
162 Self {
163 txn_id: "".into(),
164 read_predicates,
165 read_app_ids,
166 actions,
167 read_snapshot,
168 read_whole_table,
169 }
170 }
171
172 #[cfg(not(feature = "datafusion"))]
173 pub fn try_new(
174 read_snapshot: LogDataHandler<'a>,
175 read_predicates: Option<String>,
176 actions: &'a Vec<Action>,
177 read_whole_table: bool,
178 ) -> DeltaResult<Self> {
179 let mut read_app_ids = HashSet::<String>::new();
180 for action in actions.iter() {
181 if let Action::Txn(Transaction { app_id, .. }) = action {
182 read_app_ids.insert(app_id.clone());
183 }
184 }
185 Ok(Self {
186 txn_id: "".into(),
187 read_predicates,
188 read_app_ids,
189 actions,
190 read_snapshot,
191 read_whole_table,
192 })
193 }
194
195 pub fn metadata_changed(&self) -> bool {
197 self.actions
198 .iter()
199 .any(|a| matches!(a, Action::Metadata(_)))
200 }
201
202 #[cfg(feature = "datafusion")]
203 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
205 use crate::delta_datafusion::files_matching_predicate;
206
207 if let Some(predicate) = &self.read_predicates {
208 Ok(Either::Left(
209 files_matching_predicate(
210 self.read_snapshot.clone(),
211 std::slice::from_ref(predicate),
212 )
213 .map_err(|err| CommitConflictError::Predicate {
214 source: Box::new(err),
215 })?,
216 ))
217 } else {
218 Ok(Either::Right(self.read_snapshot.iter().map(|f| f.to_add())))
219 }
220 }
221
222 #[cfg(not(feature = "datafusion"))]
223 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
225 Ok(self.read_snapshot.iter().map(|f| f.to_add()))
226 }
227
228 pub fn read_whole_table(&self) -> bool {
230 self.read_whole_table
231 }
232}
233
234#[derive(Debug)]
236pub(crate) struct WinningCommitSummary {
237 pub actions: Vec<Action>,
238 pub commit_info: Option<CommitInfo>,
239}
240
241impl WinningCommitSummary {
242 pub async fn try_new(
243 log_store: &dyn LogStore,
244 read_version: Version,
245 winning_commit_version: Version,
246 ) -> DeltaResult<Self> {
247 assert_eq!(winning_commit_version, read_version + 1);
249
250 let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
251 match commit_log_bytes {
252 Some(bytes) => {
253 let actions = get_actions(winning_commit_version, &bytes)?; let commit_info = actions
255 .iter()
256 .find(|action| matches!(action, Action::CommitInfo(_)))
257 .map(|action| match action {
258 Action::CommitInfo(info) => info.clone(),
259 _ => unreachable!(),
260 });
261
262 Ok(Self {
263 actions,
264 commit_info,
265 })
266 }
267 None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
268 }
269 }
270
271 pub fn metadata_updates(&self) -> Vec<Metadata> {
272 self.actions
273 .iter()
274 .cloned()
275 .filter_map(|action| match action {
276 Action::Metadata(metadata) => Some(metadata),
277 _ => None,
278 })
279 .collect()
280 }
281
282 pub fn app_level_transactions(&self) -> HashSet<String> {
283 self.actions
284 .iter()
285 .cloned()
286 .filter_map(|action| match action {
287 Action::Txn(txn) => Some(txn.app_id),
288 _ => None,
289 })
290 .collect()
291 }
292
293 pub fn protocol(&self) -> Vec<Protocol> {
294 self.actions
295 .iter()
296 .cloned()
297 .filter_map(|action| match action {
298 Action::Protocol(protocol) => Some(protocol),
299 _ => None,
300 })
301 .collect()
302 }
303
304 pub fn removed_files(&self) -> Vec<Remove> {
305 self.actions
306 .iter()
307 .cloned()
308 .filter_map(|action| match action {
309 Action::Remove(remove) => Some(remove),
310 _ => None,
311 })
312 .collect()
313 }
314
315 pub fn added_files(&self) -> Vec<Add> {
316 self.actions
317 .iter()
318 .cloned()
319 .filter_map(|action| match action {
320 Action::Add(add) => Some(add),
321 _ => None,
322 })
323 .collect()
324 }
325
326 pub fn blind_append_added_files(&self) -> Vec<Add> {
327 if self.is_blind_append().unwrap_or(false) {
328 self.added_files()
329 } else {
330 vec![]
331 }
332 }
333
334 pub fn changed_data_added_files(&self) -> Vec<Add> {
335 if self.is_blind_append().unwrap_or(false) {
336 vec![]
337 } else {
338 self.added_files()
339 }
340 }
341
342 pub fn is_blind_append(&self) -> Option<bool> {
343 self.commit_info
344 .as_ref()
345 .map(|opt| opt.is_blind_append.unwrap_or(false))
346 }
347}
348
349pub(crate) struct ConflictChecker<'a> {
351 txn_info: TransactionInfo<'a>,
353 winning_commit_summary: WinningCommitSummary,
355 isolation_level: IsolationLevel,
357}
358
359impl<'a> ConflictChecker<'a> {
360 pub fn new(
361 transaction_info: TransactionInfo<'a>,
362 winning_commit_summary: WinningCommitSummary,
363 operation: Option<&DeltaOperation>,
364 ) -> ConflictChecker<'a> {
365 let isolation_level = operation
366 .and_then(|op| {
367 if can_downgrade_to_snapshot_isolation(
368 &winning_commit_summary.actions,
369 op,
370 &transaction_info
371 .read_snapshot
372 .table_properties()
373 .isolation_level(),
374 ) {
375 Some(IsolationLevel::SnapshotIsolation)
376 } else {
377 None
378 }
379 })
380 .unwrap_or_else(|| {
381 transaction_info
382 .read_snapshot
383 .table_properties()
384 .isolation_level()
385 });
386
387 Self {
388 txn_info: transaction_info,
389 winning_commit_summary,
390 isolation_level,
391 }
392 }
393
394 pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
398 self.check_protocol_compatibility()?;
399 self.check_no_metadata_updates()?;
400 self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
401 self.check_for_deleted_files_against_current_txn_read_files()?;
402 self.check_for_deleted_files_against_current_txn_deleted_files()?;
403 self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
404 Ok(())
405 }
406
407 fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
410 for p in self.winning_commit_summary.protocol() {
411 let (win_read, curr_read) = (
412 p.min_reader_version(),
413 self.txn_info.read_snapshot.protocol().min_reader_version(),
414 );
415 let (win_write, curr_write) = (
416 p.min_writer_version(),
417 self.txn_info.read_snapshot.protocol().min_writer_version(),
418 );
419 if curr_read < win_read || win_write < curr_write {
420 return Err(CommitConflictError::ProtocolChanged(format!(
421 "required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"
422 )));
423 };
424 }
425 if !self.winning_commit_summary.protocol().is_empty()
426 && self
427 .txn_info
428 .actions
429 .iter()
430 .any(|a| matches!(a, Action::Protocol(_)))
431 {
432 return Err(CommitConflictError::ProtocolChanged(
433 "protocol changed".into(),
434 ));
435 };
436 Ok(())
437 }
438
439 fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
441 if !self.winning_commit_summary.metadata_updates().is_empty() {
443 Err(CommitConflictError::MetadataChanged)
444 } else {
445 Ok(())
446 }
447 }
448
449 fn check_for_added_files_that_should_have_been_read_by_current_txn(
452 &self,
453 ) -> Result<(), CommitConflictError> {
454 if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
456 return Ok(());
457 }
458
459 let added_files_to_check = match self.isolation_level {
461 IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
462 self.winning_commit_summary.changed_data_added_files()
464 }
465 IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
466 let mut files = self.winning_commit_summary.changed_data_added_files();
467 files.extend(self.winning_commit_summary.blind_append_added_files());
468 files
469 }
470 IsolationLevel::SnapshotIsolation => vec![],
471 };
472
473 cfg_if::cfg_if! {
477 if #[cfg(feature = "datafusion")] {
478 let added_files_matching_predicates = if let (Some(predicate), false) = (
479 &self.txn_info.read_predicates,
480 self.txn_info.read_whole_table(),
481 ) {
482 let arrow_schema = self.txn_info.read_snapshot.read_schema();
483 let partition_columns = self
484 .txn_info
485 .read_snapshot
486 .metadata()
487 .partition_columns()
488 .to_vec();
489 AddContainer::new(&added_files_to_check, &partition_columns, arrow_schema)
490 .predicate_matches(predicate.clone())
491 .map_err(|err| CommitConflictError::Predicate {
492 source: Box::new(err),
493 })?
494 .cloned()
495 .collect::<Vec<_>>()
496 } else if self.txn_info.read_whole_table() {
497 added_files_to_check
498 } else {
499 vec![]
500 };
501 } else {
502 let added_files_matching_predicates = if self.txn_info.read_whole_table()
503 {
504 added_files_to_check
505 } else {
506 vec![]
507 };
508 }
509 }
510
511 if !added_files_matching_predicates.is_empty() {
512 Err(CommitConflictError::ConcurrentAppend)
513 } else {
514 Ok(())
515 }
516 }
517
518 fn check_for_deleted_files_against_current_txn_read_files(
521 &self,
522 ) -> Result<(), CommitConflictError> {
523 let read_file_path: HashSet<String> = self
525 .txn_info
526 .read_files()?
527 .map(|f| f.path.clone())
528 .collect();
529
530 let removed_files_with_data_change: Vec<Remove> = self
535 .winning_commit_summary
536 .removed_files()
537 .into_iter()
538 .filter(|r| r.data_change)
539 .collect();
540
541 let deleted_read_overlap = removed_files_with_data_change
542 .iter()
543 .find(|f| read_file_path.contains(&f.path));
544
545 if deleted_read_overlap.is_some()
546 || (!removed_files_with_data_change.is_empty() && self.txn_info.read_whole_table())
547 {
548 Err(CommitConflictError::ConcurrentDeleteRead)
549 } else {
550 Ok(())
551 }
552 }
553
554 fn check_for_deleted_files_against_current_txn_deleted_files(
557 &self,
558 ) -> Result<(), CommitConflictError> {
559 let txn_deleted_files: HashSet<String> = self
561 .txn_info
562 .actions
563 .iter()
564 .cloned()
565 .filter_map(|action| match action {
566 Action::Remove(remove) => Some(remove.path),
567 _ => None,
568 })
569 .collect();
570 let winning_deleted_files: HashSet<String> = self
571 .winning_commit_summary
572 .removed_files()
573 .iter()
574 .cloned()
575 .map(|r| r.path)
576 .collect();
577 let intersection: HashSet<&String> = txn_deleted_files
578 .intersection(&winning_deleted_files)
579 .collect();
580
581 if !intersection.is_empty() {
582 Err(CommitConflictError::ConcurrentDeleteDelete)
583 } else {
584 Ok(())
585 }
586 }
587
588 fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
591 &self,
592 ) -> Result<(), CommitConflictError> {
593 let winning_txns = self.winning_commit_summary.app_level_transactions();
598 let txn_overlap: HashSet<&String> = winning_txns
599 .intersection(&self.txn_info.read_app_ids)
600 .collect();
601 if !txn_overlap.is_empty() {
602 Err(CommitConflictError::ConcurrentTransaction)
603 } else {
604 Ok(())
605 }
606 }
607}
608
609pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
632 actions: impl IntoIterator<Item = &'a Action>,
633 operation: &DeltaOperation,
634 isolation_level: &IsolationLevel,
635) -> bool {
636 let mut data_changed = false;
637 let mut has_non_file_actions = false;
638 for action in actions {
639 match action {
640 Action::Add(act) if act.data_change => data_changed = true,
641 Action::Remove(rem) if rem.data_change => data_changed = true,
642 _ => has_non_file_actions = true,
643 }
644 }
645
646 if has_non_file_actions {
647 return false;
649 }
650
651 match isolation_level {
652 IsolationLevel::Serializable => !data_changed,
653 IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
654 IsolationLevel::SnapshotIsolation => false, }
656}
657
658#[cfg(test)]
659#[allow(unused)]
660mod tests {
661 use std::collections::HashMap;
662
663 #[cfg(feature = "datafusion")]
664 use datafusion::logical_expr::{col, lit};
665 use serde_json::json;
666
667 use super::*;
668 use crate::kernel::Action;
669 use crate::test_utils::{ActionFactory, TestSchemas};
670
671 fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
672 ActionFactory::add(
673 TestSchemas::simple(),
674 HashMap::from_iter([("value", (min, max))]),
675 Default::default(),
676 true,
677 )
678 }
679
680 fn init_table_actions() -> Vec<Action> {
681 vec![
682 ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
683 ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
684 ]
685 }
686
687 #[test]
688 fn test_can_downgrade_to_snapshot_isolation() {
689 let isolation = IsolationLevel::WriteSerializable;
690 let operation = DeltaOperation::Optimize {
691 predicate: None,
692 target_size: 0,
693 };
694 let add =
695 ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
696 let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
697 assert!(!res)
698 }
699
700 #[cfg(feature = "datafusion")]
707 async fn execute_test(
708 setup: Option<Vec<Action>>,
709 reads: Option<Expr>,
710 concurrent: Vec<Action>,
711 actions: Vec<Action>,
712 read_whole_table: bool,
713 ) -> Result<(), CommitConflictError> {
714 use crate::table::state::DeltaTableState;
715 use object_store::path::Path;
716
717 let setup_actions = setup.unwrap_or_else(init_table_actions);
718 let state = DeltaTableState::from_actions(setup_actions).await.unwrap();
719 let snapshot = state.snapshot();
720 let transaction_info =
721 TransactionInfo::new(snapshot.log_data(), reads, &actions, read_whole_table);
722 let summary = WinningCommitSummary {
723 actions: concurrent,
724 commit_info: None,
725 };
726 let checker = ConflictChecker::new(transaction_info, summary, None);
727 checker.check_conflicts()
728 }
729
730 #[tokio::test]
732 #[cfg(feature = "datafusion")]
733 async fn test_concurrent_append_append() {
734 let file1 = simple_add(true, "1", "10").into();
736 let file2 = simple_add(true, "1", "10").into();
737
738 let result = execute_test(None, None, vec![file1], vec![file2], false).await;
739 assert!(result.is_ok());
740 }
741
742 #[tokio::test]
743 #[cfg(feature = "datafusion")]
744 async fn test_disjoint_delete_read() {
745 let file_not_read = simple_add(true, "1", "10");
747 let file_read = simple_add(true, "100", "10000").into();
748 let mut setup_actions = init_table_actions();
749 setup_actions.push(file_not_read.clone().into());
750 setup_actions.push(file_read);
751 let result = execute_test(
752 Some(setup_actions),
753 Some(col("value").gt(lit::<i32>(10))),
754 vec![ActionFactory::remove(&file_not_read, true).into()],
755 vec![],
756 false,
757 )
758 .await;
759 assert!(result.is_ok());
760 }
761
762 #[tokio::test]
763 #[cfg(feature = "datafusion")]
764 async fn test_disjoint_add_read() {
765 let file_added = simple_add(true, "1", "10").into();
767 let file_read = simple_add(true, "100", "10000").into();
768 let mut setup_actions = init_table_actions();
769 setup_actions.push(file_read);
770 let result = execute_test(
771 Some(setup_actions),
772 Some(col("value").gt(lit::<i32>(10))),
773 vec![file_added],
774 vec![],
775 false,
776 )
777 .await;
778 assert!(result.is_ok());
779 }
780
781 #[tokio::test]
782 #[cfg(feature = "datafusion")]
783 async fn test_concurrent_delete_delete() {
784 let removed_file = simple_add(true, "1", "10");
786 let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
787 let result = execute_test(
788 None,
789 None,
790 vec![removed_file.clone()],
791 vec![removed_file],
792 false,
793 )
794 .await;
795 assert!(matches!(
796 result,
797 Err(CommitConflictError::ConcurrentDeleteDelete)
798 ));
799 }
800
801 #[tokio::test]
802 #[cfg(feature = "datafusion")]
803 async fn test_concurrent_add_conflicts_with_read_and_write() {
804 let file_added = simple_add(true, "1", "10").into();
806 let file_should_have_read = simple_add(true, "1", "10").into();
807 let result = execute_test(
808 None,
809 Some(col("value").lt_eq(lit::<i32>(10))),
810 vec![file_should_have_read],
811 vec![file_added],
812 false,
813 )
814 .await;
815 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
816 }
817
818 #[tokio::test]
819 #[cfg(feature = "datafusion")]
820 async fn test_concurrent_delete_conflicts_with_read() {
821 let file_read = simple_add(true, "1", "10");
823 let mut setup_actions = init_table_actions();
824 setup_actions.push(file_read.clone().into());
825 let result = execute_test(
826 Some(setup_actions),
827 Some(col("value").lt_eq(lit::<i32>(10))),
828 vec![ActionFactory::remove(&file_read, true).into()],
829 vec![],
830 false,
831 )
832 .await;
833 assert!(matches!(
834 result,
835 Err(CommitConflictError::ConcurrentDeleteRead)
836 ));
837 }
838
839 #[tokio::test]
840 #[cfg(feature = "datafusion")]
841 async fn test_concurrent_metadata_change() {
842 let result = execute_test(
844 None,
845 None,
846 vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
847 vec![],
848 false,
849 )
850 .await;
851 assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
852 }
853
854 #[tokio::test]
855 #[cfg(feature = "datafusion")]
856 async fn test_concurrent_protocol_upgrade() {
857 let result = execute_test(
859 None,
860 None,
861 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
862 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
863 false,
864 )
865 .await;
866 assert!(matches!(
867 result,
868 Err(CommitConflictError::ProtocolChanged(_))
869 ));
870 }
871
872 #[tokio::test]
873 #[cfg(feature = "datafusion")]
874 async fn test_read_whole_table_disallows_concurrent_append() {
875 let file_part1 = simple_add(true, "1", "10").into();
878 let file_part2 = simple_add(true, "11", "100").into();
879 let file_part3 = simple_add(true, "101", "1000").into();
880 let mut setup_actions = init_table_actions();
881 setup_actions.push(file_part1);
882 let result = execute_test(
883 Some(setup_actions),
884 Some(col("value").lt(lit::<i32>(0))),
886 vec![file_part2],
887 vec![file_part3],
888 true,
889 )
890 .await;
891 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
892 }
893
894 #[tokio::test]
895 #[cfg(feature = "datafusion")]
896 async fn test_read_whole_table_disallows_concurrent_remove() {
897 let file_part1 = simple_add(true, "1", "10");
899 let file_part2 = simple_add(true, "11", "100").into();
900 let mut setup_actions = init_table_actions();
901 setup_actions.push(file_part1.clone().into());
902 let result = execute_test(
903 Some(setup_actions),
904 None,
905 vec![ActionFactory::remove(&file_part1, true).into()],
906 vec![file_part2],
907 true,
908 )
909 .await;
910 assert!(matches!(
911 result,
912 Err(CommitConflictError::ConcurrentDeleteRead)
913 ));
914 }
915
916 #[tokio::test]
917 #[cfg(feature = "datafusion")]
918 async fn test_compaction_remove_does_not_conflict_with_read() {
919 let file_read = simple_add(true, "1", "10");
922 let mut setup_actions = init_table_actions();
923 setup_actions.push(file_read.clone().into());
924
925 let mut compaction_remove = ActionFactory::remove(&file_read, false);
927 compaction_remove.data_change = false;
928
929 let result = execute_test(
930 Some(setup_actions),
931 Some(col("value").lt_eq(lit::<i32>(10))),
932 vec![compaction_remove.into()],
933 vec![],
934 false,
935 )
936 .await;
937 assert!(
939 result.is_ok(),
940 "Compaction with data_change=false should not conflict with reads"
941 );
942 }
943
944 #[tokio::test]
945 #[cfg(feature = "datafusion")]
946 async fn test_data_delete_conflicts_with_read() {
947 let file_read = simple_add(true, "1", "10");
949 let mut setup_actions = init_table_actions();
950 setup_actions.push(file_read.clone().into());
951
952 let result = execute_test(
953 Some(setup_actions),
954 Some(col("value").lt_eq(lit::<i32>(10))),
955 vec![ActionFactory::remove(&file_read, true).into()],
956 vec![],
957 false,
958 )
959 .await;
960 assert!(
962 matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
963 "Delete with data_change=true should conflict with reads"
964 );
965 }
966
967 #[tokio::test]
968 #[cfg(feature = "datafusion")]
969 async fn test_compaction_does_not_conflict_with_whole_table_read() {
970 let file_part1 = simple_add(true, "1", "10");
972 let file_part2 = simple_add(true, "11", "100").into();
973 let mut setup_actions = init_table_actions();
974 setup_actions.push(file_part1.clone().into());
975
976 let compaction_remove = ActionFactory::remove(&file_part1, false);
977
978 let result = execute_test(
979 Some(setup_actions),
980 None,
981 vec![compaction_remove.into()],
982 vec![file_part2],
983 true, )
985 .await;
986 assert!(
988 result.is_ok(),
989 "Compaction with data_change=false should not conflict even with read_whole_table"
990 );
991 }
992
993 #[tokio::test]
994 #[cfg(feature = "datafusion")]
995 async fn test_multiple_compaction_removes_do_not_conflict() {
996 let file1 = simple_add(true, "1", "10");
998 let file2 = simple_add(true, "11", "20");
999 let mut setup_actions = init_table_actions();
1000 setup_actions.push(file1.clone().into());
1001 setup_actions.push(file2.clone().into());
1002
1003 let mut remove1 = ActionFactory::remove(&file1, false);
1004 remove1.data_change = false;
1005 let mut remove2 = ActionFactory::remove(&file2, false);
1006 remove2.data_change = false;
1007
1008 let result = execute_test(
1009 Some(setup_actions),
1010 Some(col("value").lt_eq(lit::<i32>(20))),
1011 vec![remove1.into(), remove2.into()],
1012 vec![],
1013 false,
1014 )
1015 .await;
1016 assert!(
1018 result.is_ok(),
1019 "Multiple compaction removes with data_change=false should not conflict"
1020 );
1021 }
1022
1023 #[tokio::test]
1024 #[cfg(feature = "datafusion")]
1025 async fn test_mixed_removes_conflict_if_any_data_change() {
1026 let file1 = simple_add(true, "1", "10");
1028 let file2 = simple_add(true, "11", "20");
1029 let mut setup_actions = init_table_actions();
1030 setup_actions.push(file1.clone().into());
1031 setup_actions.push(file2.clone().into());
1032
1033 let mut compaction_remove = ActionFactory::remove(&file1, false);
1034 compaction_remove.data_change = false;
1035 let data_remove = ActionFactory::remove(&file2, true); let result = execute_test(
1038 Some(setup_actions),
1039 Some(col("value").lt_eq(lit::<i32>(20))),
1040 vec![compaction_remove.into(), data_remove.into()],
1041 vec![],
1042 false,
1043 )
1044 .await;
1045 assert!(
1047 matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
1048 "Mixed removes should conflict if any have data_change=true"
1049 );
1050 }
1051
1052 #[tokio::test]
1053 #[cfg(feature = "datafusion")]
1054 async fn test_concurrent_compaction_double_delete_still_conflicts() {
1055 let removed_file = simple_add(true, "1", "10");
1058 let mut setup_actions = init_table_actions();
1059 setup_actions.push(removed_file.clone().into());
1060
1061 let mut remove1 = ActionFactory::remove(&removed_file, false);
1062 remove1.data_change = false;
1063 let mut remove2 = ActionFactory::remove(&removed_file, false);
1064 remove2.data_change = false;
1065
1066 let result = execute_test(
1067 Some(setup_actions),
1068 None,
1069 vec![remove1.into()],
1070 vec![remove2.into()],
1071 false,
1072 )
1073 .await;
1074 assert!(
1076 matches!(result, Err(CommitConflictError::ConcurrentDeleteDelete)),
1077 "Concurrent double delete should conflict even with data_change=false"
1078 );
1079 }
1080
1081 #[tokio::test]
1082 #[cfg(feature = "datafusion")]
1083 async fn test_disjoint_partitions_add_and_write() {
1084 let file_part1_existing = simple_add(true, "1", "10");
1090 let file_part2_added = simple_add(true, "100", "200").into();
1091 let file_part1_new = simple_add(true, "5", "15").into();
1092
1093 let mut setup_actions = init_table_actions();
1094 setup_actions.push(file_part1_existing.into());
1095
1096 let result = execute_test(
1100 Some(setup_actions),
1101 Some(col("value").lt_eq(lit::<i32>(200))),
1102 vec![file_part2_added],
1103 vec![file_part1_new],
1104 false,
1105 )
1106 .await;
1107
1108 assert!(
1110 matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1111 "Adding file matching read predicate should conflict"
1112 );
1113 }
1114
1115 #[tokio::test]
1116 #[cfg(feature = "datafusion")]
1117 async fn test_disjoint_partitions_read_write_different_ranges() {
1118 let file_part1 = simple_add(true, "1", "10");
1120 let file_part2_added = simple_add(true, "100", "200").into();
1121 let file_part1_new = simple_add(true, "5", "15").into();
1122
1123 let mut setup_actions = init_table_actions();
1124 setup_actions.push(file_part1.into());
1125
1126 let result = execute_test(
1130 Some(setup_actions),
1131 Some(col("value").lt_eq(lit::<i32>(50))),
1132 vec![file_part2_added],
1133 vec![file_part1_new],
1134 false,
1135 )
1136 .await;
1137
1138 assert!(
1140 result.is_ok(),
1141 "Disjoint partition writes should not conflict"
1142 );
1143 }
1144
1145 #[tokio::test]
1146 #[cfg(feature = "datafusion")]
1147 async fn test_conflicting_app_transactions() {
1148 let file1 = simple_add(true, "1", "10").into();
1150
1151 let app_id = "streaming_query_1".to_string();
1152 let txn_action = Action::Txn(Transaction {
1153 app_id: app_id.clone(),
1154 version: 1,
1155 last_updated: None,
1156 });
1157
1158 let current_actions = vec![txn_action.clone()];
1160
1161 let concurrent_actions = vec![txn_action, file1];
1163
1164 let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1165
1166 assert!(
1168 matches!(result, Err(CommitConflictError::ConcurrentTransaction)),
1169 "Conflicting app transactions should fail"
1170 );
1171 }
1172
1173 #[tokio::test]
1174 #[cfg(feature = "datafusion")]
1175 async fn test_non_conflicting_different_app_transactions() {
1176 let file1 = simple_add(true, "1", "10").into();
1178
1179 let app_id1 = "streaming_query_1".to_string();
1180 let app_id2 = "streaming_query_2".to_string();
1181
1182 let txn_action1 = Action::Txn(Transaction {
1183 app_id: app_id1,
1184 version: 1,
1185 last_updated: None,
1186 });
1187
1188 let txn_action2 = Action::Txn(Transaction {
1189 app_id: app_id2,
1190 version: 1,
1191 last_updated: None,
1192 });
1193
1194 let current_actions = vec![txn_action1];
1196
1197 let concurrent_actions = vec![txn_action2, file1];
1199
1200 let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1201
1202 assert!(
1204 result.is_ok(),
1205 "Non-conflicting app transactions should succeed"
1206 );
1207 }
1208
1209 #[tokio::test]
1210 #[cfg(feature = "datafusion")]
1211 async fn test_replace_where_initial_empty_conflicts_on_concurrent_add() {
1212 let mut setup_actions = init_table_actions();
1214 setup_actions.push(simple_add(true, "1", "1").into());
1215
1216 let result = execute_test(
1217 Some(setup_actions),
1218 Some(col("value").gt_eq(lit::<i32>(2))), vec![simple_add(true, "3", "3").into()], vec![simple_add(true, "2", "2").into()],
1221 false,
1222 )
1223 .await;
1224
1225 assert!(
1226 matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1227 "ReplaceWhere-style empty read should conflict when a matching row is concurrently added"
1228 );
1229 }
1230
1231 #[tokio::test]
1232 #[cfg(feature = "datafusion")]
1233 async fn test_replace_where_disjoint_empty_allows_commit() {
1234 let mut setup_actions = init_table_actions();
1236 setup_actions.push(simple_add(true, "1", "1").into());
1237
1238 let result = execute_test(
1239 Some(setup_actions),
1240 Some(
1241 col("value")
1242 .gt(lit::<i32>(1))
1243 .and(col("value").lt_eq(lit::<i32>(3))),
1244 ), vec![simple_add(true, "5", "5").into()], vec![simple_add(true, "2", "2").into()],
1247 false,
1248 )
1249 .await;
1250
1251 assert!(
1252 result.is_ok(),
1253 "Disjoint replaceWhere-style transactions with empty reads should succeed"
1254 );
1255 }
1256}