1use std::collections::HashSet;
3
4use delta_kernel::table_properties::IsolationLevel;
5
6use super::CommitInfo;
7use crate::DeltaTableError;
8#[cfg(feature = "datafusion")]
9use crate::delta_datafusion::DataFusionMixins;
10use crate::errors::DeltaResult;
11use crate::kernel::{Action, Add, LogDataHandler, Metadata, Protocol, Remove, Transaction};
12use crate::logstore::{LogStore, get_actions};
13use crate::protocol::DeltaOperation;
14use crate::table::config::TablePropertiesExt as _;
15
16#[cfg(feature = "datafusion")]
17use super::state::AddContainer;
18#[cfg(feature = "datafusion")]
19use datafusion::logical_expr::Expr;
20#[cfg(feature = "datafusion")]
21use itertools::Either;
22
23#[derive(thiserror::Error, Debug)]
25pub enum CommitConflictError {
26 #[error(
30 "Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
31 )]
32 ConcurrentAppend,
33
34 #[error(
37 "Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
38 )]
39 ConcurrentDeleteRead,
40
41 #[error(
44 "Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data."
45 )]
46 ConcurrentDeleteDelete,
47
48 #[error("Metadata changed since last commit.")]
51 MetadataChanged,
52
53 #[error("Concurrent transaction failed.")]
57 ConcurrentTransaction,
58
59 #[error("Protocol changed since last commit: {0}")]
65 ProtocolChanged(String),
66
67 #[error("Delta-rs does not support writer version {0}")]
69 UnsupportedWriterVersion(i32),
70
71 #[error("Delta-rs does not support reader version {0}")]
73 UnsupportedReaderVersion(i32),
74
75 #[error("Snapshot is corrupted: {source}")]
77 CorruptedState {
78 source: Box<dyn std::error::Error + Send + Sync + 'static>,
80 },
81
82 #[error("Error evaluating predicate: {source}")]
84 Predicate {
85 source: Box<dyn std::error::Error + Send + Sync + 'static>,
87 },
88
89 #[error("No metadata found, please make sure table is loaded.")]
91 NoMetadata,
92}
93
94#[allow(unused)]
96pub(crate) struct TransactionInfo<'a> {
97 txn_id: String,
98 #[cfg(not(feature = "datafusion"))]
103 read_predicates: Option<String>,
104 #[cfg(feature = "datafusion")]
106 read_predicates: Option<Expr>,
107 read_app_ids: HashSet<String>,
109 actions: &'a [Action],
111 read_snapshot: LogDataHandler<'a>,
113 read_whole_table: bool,
115}
116
117impl<'a> TransactionInfo<'a> {
118 #[cfg(feature = "datafusion")]
119 pub fn try_new(
120 read_snapshot: LogDataHandler<'a>,
121 read_predicates: Option<String>,
122 actions: &'a [Action],
123 read_whole_table: bool,
124 ) -> DeltaResult<Self> {
125 use datafusion::prelude::SessionContext;
126
127 let session = SessionContext::new();
128 let read_predicates = read_predicates
129 .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
130 .transpose()?;
131
132 let mut read_app_ids = HashSet::<String>::new();
133 for action in actions.iter() {
134 if let Action::Txn(Transaction { app_id, .. }) = action {
135 read_app_ids.insert(app_id.clone());
136 }
137 }
138
139 Ok(Self::new(
140 read_snapshot,
141 read_predicates,
142 actions,
143 read_whole_table,
144 ))
145 }
146
147 #[cfg(feature = "datafusion")]
148 pub fn new(
149 read_snapshot: LogDataHandler<'a>,
150 read_predicates: Option<Expr>,
151 actions: &'a [Action],
152 read_whole_table: bool,
153 ) -> Self {
154 let mut read_app_ids = HashSet::<String>::new();
155 for action in actions.iter() {
156 if let Action::Txn(Transaction { app_id, .. }) = action {
157 read_app_ids.insert(app_id.clone());
158 }
159 }
160 Self {
161 txn_id: "".into(),
162 read_predicates,
163 read_app_ids,
164 actions,
165 read_snapshot,
166 read_whole_table,
167 }
168 }
169
170 #[cfg(not(feature = "datafusion"))]
171 pub fn try_new(
172 read_snapshot: LogDataHandler<'a>,
173 read_predicates: Option<String>,
174 actions: &'a Vec<Action>,
175 read_whole_table: bool,
176 ) -> DeltaResult<Self> {
177 let mut read_app_ids = HashSet::<String>::new();
178 for action in actions.iter() {
179 if let Action::Txn(Transaction { app_id, .. }) = action {
180 read_app_ids.insert(app_id.clone());
181 }
182 }
183 Ok(Self {
184 txn_id: "".into(),
185 read_predicates,
186 read_app_ids,
187 actions,
188 read_snapshot,
189 read_whole_table,
190 })
191 }
192
193 pub fn metadata_changed(&self) -> bool {
195 self.actions
196 .iter()
197 .any(|a| matches!(a, Action::Metadata(_)))
198 }
199
200 #[cfg(feature = "datafusion")]
201 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
203 use crate::delta_datafusion::files_matching_predicate;
204
205 if let Some(predicate) = &self.read_predicates {
206 Ok(Either::Left(
207 files_matching_predicate(
208 self.read_snapshot.clone(),
209 std::slice::from_ref(predicate),
210 )
211 .map_err(|err| CommitConflictError::Predicate {
212 source: Box::new(err),
213 })?,
214 ))
215 } else {
216 Ok(Either::Right(
217 self.read_snapshot.iter().map(|f| f.add_action()),
218 ))
219 }
220 }
221
222 #[cfg(not(feature = "datafusion"))]
223 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
225 Ok(self.read_snapshot.iter().map(|f| f.add_action()))
226 }
227
228 pub fn read_whole_table(&self) -> bool {
230 self.read_whole_table
231 }
232}
233
234#[derive(Debug)]
236pub(crate) struct WinningCommitSummary {
237 pub actions: Vec<Action>,
238 pub commit_info: Option<CommitInfo>,
239}
240
241impl WinningCommitSummary {
242 pub async fn try_new(
243 log_store: &dyn LogStore,
244 read_version: i64,
245 winning_commit_version: i64,
246 ) -> DeltaResult<Self> {
247 assert_eq!(winning_commit_version, read_version + 1);
249
250 let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
251 match commit_log_bytes {
252 Some(bytes) => {
253 let actions = get_actions(winning_commit_version, &bytes)?; let commit_info = actions
255 .iter()
256 .find(|action| matches!(action, Action::CommitInfo(_)))
257 .map(|action| match action {
258 Action::CommitInfo(info) => info.clone(),
259 _ => unreachable!(),
260 });
261
262 Ok(Self {
263 actions,
264 commit_info,
265 })
266 }
267 None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
268 }
269 }
270
271 pub fn metadata_updates(&self) -> Vec<Metadata> {
272 self.actions
273 .iter()
274 .cloned()
275 .filter_map(|action| match action {
276 Action::Metadata(metadata) => Some(metadata),
277 _ => None,
278 })
279 .collect()
280 }
281
282 pub fn app_level_transactions(&self) -> HashSet<String> {
283 self.actions
284 .iter()
285 .cloned()
286 .filter_map(|action| match action {
287 Action::Txn(txn) => Some(txn.app_id),
288 _ => None,
289 })
290 .collect()
291 }
292
293 pub fn protocol(&self) -> Vec<Protocol> {
294 self.actions
295 .iter()
296 .cloned()
297 .filter_map(|action| match action {
298 Action::Protocol(protocol) => Some(protocol),
299 _ => None,
300 })
301 .collect()
302 }
303
304 pub fn removed_files(&self) -> Vec<Remove> {
305 self.actions
306 .iter()
307 .cloned()
308 .filter_map(|action| match action {
309 Action::Remove(remove) => Some(remove),
310 _ => None,
311 })
312 .collect()
313 }
314
315 pub fn added_files(&self) -> Vec<Add> {
316 self.actions
317 .iter()
318 .cloned()
319 .filter_map(|action| match action {
320 Action::Add(add) => Some(add),
321 _ => None,
322 })
323 .collect()
324 }
325
326 pub fn blind_append_added_files(&self) -> Vec<Add> {
327 if self.is_blind_append().unwrap_or(false) {
328 self.added_files()
329 } else {
330 vec![]
331 }
332 }
333
334 pub fn changed_data_added_files(&self) -> Vec<Add> {
335 if self.is_blind_append().unwrap_or(false) {
336 vec![]
337 } else {
338 self.added_files()
339 }
340 }
341
342 pub fn is_blind_append(&self) -> Option<bool> {
343 self.commit_info
344 .as_ref()
345 .map(|opt| opt.is_blind_append.unwrap_or(false))
346 }
347}
348
349pub(crate) struct ConflictChecker<'a> {
351 txn_info: TransactionInfo<'a>,
353 winning_commit_summary: WinningCommitSummary,
355 isolation_level: IsolationLevel,
357}
358
359impl<'a> ConflictChecker<'a> {
360 pub fn new(
361 transaction_info: TransactionInfo<'a>,
362 winning_commit_summary: WinningCommitSummary,
363 operation: Option<&DeltaOperation>,
364 ) -> ConflictChecker<'a> {
365 let isolation_level = operation
366 .and_then(|op| {
367 if can_downgrade_to_snapshot_isolation(
368 &winning_commit_summary.actions,
369 op,
370 &transaction_info
371 .read_snapshot
372 .table_properties()
373 .isolation_level(),
374 ) {
375 Some(IsolationLevel::SnapshotIsolation)
376 } else {
377 None
378 }
379 })
380 .unwrap_or_else(|| {
381 transaction_info
382 .read_snapshot
383 .table_properties()
384 .isolation_level()
385 });
386
387 Self {
388 txn_info: transaction_info,
389 winning_commit_summary,
390 isolation_level,
391 }
392 }
393
394 pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
398 self.check_protocol_compatibility()?;
399 self.check_no_metadata_updates()?;
400 self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
401 self.check_for_deleted_files_against_current_txn_read_files()?;
402 self.check_for_deleted_files_against_current_txn_deleted_files()?;
403 self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
404 Ok(())
405 }
406
407 fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
410 for p in self.winning_commit_summary.protocol() {
411 let (win_read, curr_read) = (
412 p.min_reader_version(),
413 self.txn_info.read_snapshot.protocol().min_reader_version(),
414 );
415 let (win_write, curr_write) = (
416 p.min_writer_version(),
417 self.txn_info.read_snapshot.protocol().min_writer_version(),
418 );
419 if curr_read < win_read || win_write < curr_write {
420 return Err(CommitConflictError::ProtocolChanged(format!(
421 "required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"
422 )));
423 };
424 }
425 if !self.winning_commit_summary.protocol().is_empty()
426 && self
427 .txn_info
428 .actions
429 .iter()
430 .any(|a| matches!(a, Action::Protocol(_)))
431 {
432 return Err(CommitConflictError::ProtocolChanged(
433 "protocol changed".into(),
434 ));
435 };
436 Ok(())
437 }
438
439 fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
441 if !self.winning_commit_summary.metadata_updates().is_empty() {
443 Err(CommitConflictError::MetadataChanged)
444 } else {
445 Ok(())
446 }
447 }
448
449 fn check_for_added_files_that_should_have_been_read_by_current_txn(
452 &self,
453 ) -> Result<(), CommitConflictError> {
454 if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
456 return Ok(());
457 }
458
459 let added_files_to_check = match self.isolation_level {
461 IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
462 self.winning_commit_summary.changed_data_added_files()
464 }
465 IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
466 let mut files = self.winning_commit_summary.changed_data_added_files();
467 files.extend(self.winning_commit_summary.blind_append_added_files());
468 files
469 }
470 IsolationLevel::SnapshotIsolation => vec![],
471 };
472
473 cfg_if::cfg_if! {
477 if #[cfg(feature = "datafusion")] {
478 let added_files_matching_predicates = if let (Some(predicate), false) = (
479 &self.txn_info.read_predicates,
480 self.txn_info.read_whole_table(),
481 ) {
482 let arrow_schema = self.txn_info.read_snapshot.read_schema();
483 let partition_columns = &self
484 .txn_info
485 .read_snapshot
486 .metadata()
487 .partition_columns();
488 AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
489 .predicate_matches(predicate.clone())
490 .map_err(|err| CommitConflictError::Predicate {
491 source: Box::new(err),
492 })?
493 .cloned()
494 .collect::<Vec<_>>()
495 } else if self.txn_info.read_whole_table() {
496 added_files_to_check
497 } else {
498 vec![]
499 };
500 } else {
501 let added_files_matching_predicates = if self.txn_info.read_whole_table()
502 {
503 added_files_to_check
504 } else {
505 vec![]
506 };
507 }
508 }
509
510 if !added_files_matching_predicates.is_empty() {
511 Err(CommitConflictError::ConcurrentAppend)
512 } else {
513 Ok(())
514 }
515 }
516
517 fn check_for_deleted_files_against_current_txn_read_files(
520 &self,
521 ) -> Result<(), CommitConflictError> {
522 let read_file_path: HashSet<String> = self
524 .txn_info
525 .read_files()?
526 .map(|f| f.path.clone())
527 .collect();
528
529 let removed_files_with_data_change: Vec<Remove> = self
534 .winning_commit_summary
535 .removed_files()
536 .into_iter()
537 .filter(|r| r.data_change)
538 .collect();
539
540 let deleted_read_overlap = removed_files_with_data_change
541 .iter()
542 .find(|f| read_file_path.contains(&f.path));
543
544 if deleted_read_overlap.is_some()
545 || (!removed_files_with_data_change.is_empty() && self.txn_info.read_whole_table())
546 {
547 Err(CommitConflictError::ConcurrentDeleteRead)
548 } else {
549 Ok(())
550 }
551 }
552
553 fn check_for_deleted_files_against_current_txn_deleted_files(
556 &self,
557 ) -> Result<(), CommitConflictError> {
558 let txn_deleted_files: HashSet<String> = self
560 .txn_info
561 .actions
562 .iter()
563 .cloned()
564 .filter_map(|action| match action {
565 Action::Remove(remove) => Some(remove.path),
566 _ => None,
567 })
568 .collect();
569 let winning_deleted_files: HashSet<String> = self
570 .winning_commit_summary
571 .removed_files()
572 .iter()
573 .cloned()
574 .map(|r| r.path)
575 .collect();
576 let intersection: HashSet<&String> = txn_deleted_files
577 .intersection(&winning_deleted_files)
578 .collect();
579
580 if !intersection.is_empty() {
581 Err(CommitConflictError::ConcurrentDeleteDelete)
582 } else {
583 Ok(())
584 }
585 }
586
587 fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
590 &self,
591 ) -> Result<(), CommitConflictError> {
592 let winning_txns = self.winning_commit_summary.app_level_transactions();
597 let txn_overlap: HashSet<&String> = winning_txns
598 .intersection(&self.txn_info.read_app_ids)
599 .collect();
600 if !txn_overlap.is_empty() {
601 Err(CommitConflictError::ConcurrentTransaction)
602 } else {
603 Ok(())
604 }
605 }
606}
607
608pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
631 actions: impl IntoIterator<Item = &'a Action>,
632 operation: &DeltaOperation,
633 isolation_level: &IsolationLevel,
634) -> bool {
635 let mut data_changed = false;
636 let mut has_non_file_actions = false;
637 for action in actions {
638 match action {
639 Action::Add(act) if act.data_change => data_changed = true,
640 Action::Remove(rem) if rem.data_change => data_changed = true,
641 _ => has_non_file_actions = true,
642 }
643 }
644
645 if has_non_file_actions {
646 return false;
648 }
649
650 match isolation_level {
651 IsolationLevel::Serializable => !data_changed,
652 IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
653 IsolationLevel::SnapshotIsolation => false, }
655}
656
657#[cfg(test)]
658#[allow(unused)]
659mod tests {
660 use std::collections::HashMap;
661
662 #[cfg(feature = "datafusion")]
663 use datafusion::logical_expr::{col, lit};
664 use serde_json::json;
665
666 use super::*;
667 use crate::kernel::Action;
668 use crate::test_utils::{ActionFactory, TestSchemas};
669
670 fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
671 ActionFactory::add(
672 TestSchemas::simple(),
673 HashMap::from_iter([("value", (min, max))]),
674 Default::default(),
675 true,
676 )
677 }
678
679 fn init_table_actions() -> Vec<Action> {
680 vec![
681 ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
682 ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
683 ]
684 }
685
686 #[test]
687 fn test_can_downgrade_to_snapshot_isolation() {
688 let isolation = IsolationLevel::WriteSerializable;
689 let operation = DeltaOperation::Optimize {
690 predicate: None,
691 target_size: 0,
692 };
693 let add =
694 ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
695 let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
696 assert!(!res)
697 }
698
699 #[cfg(feature = "datafusion")]
706 async fn execute_test(
707 setup: Option<Vec<Action>>,
708 reads: Option<Expr>,
709 concurrent: Vec<Action>,
710 actions: Vec<Action>,
711 read_whole_table: bool,
712 ) -> Result<(), CommitConflictError> {
713 use crate::table::state::DeltaTableState;
714 use object_store::path::Path;
715
716 let setup_actions = setup.unwrap_or_else(init_table_actions);
717 let state = DeltaTableState::from_actions(setup_actions).await.unwrap();
718 let snapshot = state.snapshot();
719 let transaction_info =
720 TransactionInfo::new(snapshot.log_data(), reads, &actions, read_whole_table);
721 let summary = WinningCommitSummary {
722 actions: concurrent,
723 commit_info: None,
724 };
725 let checker = ConflictChecker::new(transaction_info, summary, None);
726 checker.check_conflicts()
727 }
728
729 #[tokio::test]
731 #[cfg(feature = "datafusion")]
732 async fn test_concurrent_append_append() {
733 let file1 = simple_add(true, "1", "10").into();
735 let file2 = simple_add(true, "1", "10").into();
736
737 let result = execute_test(None, None, vec![file1], vec![file2], false).await;
738 assert!(result.is_ok());
739 }
740
741 #[tokio::test]
742 #[cfg(feature = "datafusion")]
743 async fn test_disjoint_delete_read() {
744 let file_not_read = simple_add(true, "1", "10");
746 let file_read = simple_add(true, "100", "10000").into();
747 let mut setup_actions = init_table_actions();
748 setup_actions.push(file_not_read.clone().into());
749 setup_actions.push(file_read);
750 let result = execute_test(
751 Some(setup_actions),
752 Some(col("value").gt(lit::<i32>(10))),
753 vec![ActionFactory::remove(&file_not_read, true).into()],
754 vec![],
755 false,
756 )
757 .await;
758 assert!(result.is_ok());
759 }
760
761 #[tokio::test]
762 #[cfg(feature = "datafusion")]
763 async fn test_disjoint_add_read() {
764 let file_added = simple_add(true, "1", "10").into();
766 let file_read = simple_add(true, "100", "10000").into();
767 let mut setup_actions = init_table_actions();
768 setup_actions.push(file_read);
769 let result = execute_test(
770 Some(setup_actions),
771 Some(col("value").gt(lit::<i32>(10))),
772 vec![file_added],
773 vec![],
774 false,
775 )
776 .await;
777 assert!(result.is_ok());
778 }
779
780 #[tokio::test]
781 #[cfg(feature = "datafusion")]
782 async fn test_concurrent_delete_delete() {
783 let removed_file = simple_add(true, "1", "10");
785 let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
786 let result = execute_test(
787 None,
788 None,
789 vec![removed_file.clone()],
790 vec![removed_file],
791 false,
792 )
793 .await;
794 assert!(matches!(
795 result,
796 Err(CommitConflictError::ConcurrentDeleteDelete)
797 ));
798 }
799
800 #[tokio::test]
801 #[cfg(feature = "datafusion")]
802 async fn test_concurrent_add_conflicts_with_read_and_write() {
803 let file_added = simple_add(true, "1", "10").into();
805 let file_should_have_read = simple_add(true, "1", "10").into();
806 let result = execute_test(
807 None,
808 Some(col("value").lt_eq(lit::<i32>(10))),
809 vec![file_should_have_read],
810 vec![file_added],
811 false,
812 )
813 .await;
814 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
815 }
816
817 #[tokio::test]
818 #[cfg(feature = "datafusion")]
819 async fn test_concurrent_delete_conflicts_with_read() {
820 let file_read = simple_add(true, "1", "10");
822 let mut setup_actions = init_table_actions();
823 setup_actions.push(file_read.clone().into());
824 let result = execute_test(
825 Some(setup_actions),
826 Some(col("value").lt_eq(lit::<i32>(10))),
827 vec![ActionFactory::remove(&file_read, true).into()],
828 vec![],
829 false,
830 )
831 .await;
832 assert!(matches!(
833 result,
834 Err(CommitConflictError::ConcurrentDeleteRead)
835 ));
836 }
837
838 #[tokio::test]
839 #[cfg(feature = "datafusion")]
840 async fn test_concurrent_metadata_change() {
841 let result = execute_test(
843 None,
844 None,
845 vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
846 vec![],
847 false,
848 )
849 .await;
850 assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
851 }
852
853 #[tokio::test]
854 #[cfg(feature = "datafusion")]
855 async fn test_concurrent_protocol_upgrade() {
856 let result = execute_test(
858 None,
859 None,
860 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
861 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
862 false,
863 )
864 .await;
865 assert!(matches!(
866 result,
867 Err(CommitConflictError::ProtocolChanged(_))
868 ));
869 }
870
871 #[tokio::test]
872 #[cfg(feature = "datafusion")]
873 async fn test_read_whole_table_disallows_concurrent_append() {
874 let file_part1 = simple_add(true, "1", "10").into();
877 let file_part2 = simple_add(true, "11", "100").into();
878 let file_part3 = simple_add(true, "101", "1000").into();
879 let mut setup_actions = init_table_actions();
880 setup_actions.push(file_part1);
881 let result = execute_test(
882 Some(setup_actions),
883 Some(col("value").lt(lit::<i32>(0))),
885 vec![file_part2],
886 vec![file_part3],
887 true,
888 )
889 .await;
890 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
891 }
892
893 #[tokio::test]
894 #[cfg(feature = "datafusion")]
895 async fn test_read_whole_table_disallows_concurrent_remove() {
896 let file_part1 = simple_add(true, "1", "10");
898 let file_part2 = simple_add(true, "11", "100").into();
899 let mut setup_actions = init_table_actions();
900 setup_actions.push(file_part1.clone().into());
901 let result = execute_test(
902 Some(setup_actions),
903 None,
904 vec![ActionFactory::remove(&file_part1, true).into()],
905 vec![file_part2],
906 true,
907 )
908 .await;
909 assert!(matches!(
910 result,
911 Err(CommitConflictError::ConcurrentDeleteRead)
912 ));
913 }
914
915 #[tokio::test]
916 #[cfg(feature = "datafusion")]
917 async fn test_compaction_remove_does_not_conflict_with_read() {
918 let file_read = simple_add(true, "1", "10");
921 let mut setup_actions = init_table_actions();
922 setup_actions.push(file_read.clone().into());
923
924 let mut compaction_remove = ActionFactory::remove(&file_read, false);
926 compaction_remove.data_change = false;
927
928 let result = execute_test(
929 Some(setup_actions),
930 Some(col("value").lt_eq(lit::<i32>(10))),
931 vec![compaction_remove.into()],
932 vec![],
933 false,
934 )
935 .await;
936 assert!(
938 result.is_ok(),
939 "Compaction with data_change=false should not conflict with reads"
940 );
941 }
942
943 #[tokio::test]
944 #[cfg(feature = "datafusion")]
945 async fn test_data_delete_conflicts_with_read() {
946 let file_read = simple_add(true, "1", "10");
948 let mut setup_actions = init_table_actions();
949 setup_actions.push(file_read.clone().into());
950
951 let result = execute_test(
952 Some(setup_actions),
953 Some(col("value").lt_eq(lit::<i32>(10))),
954 vec![ActionFactory::remove(&file_read, true).into()],
955 vec![],
956 false,
957 )
958 .await;
959 assert!(
961 matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
962 "Delete with data_change=true should conflict with reads"
963 );
964 }
965
966 #[tokio::test]
967 #[cfg(feature = "datafusion")]
968 async fn test_compaction_does_not_conflict_with_whole_table_read() {
969 let file_part1 = simple_add(true, "1", "10");
971 let file_part2 = simple_add(true, "11", "100").into();
972 let mut setup_actions = init_table_actions();
973 setup_actions.push(file_part1.clone().into());
974
975 let compaction_remove = ActionFactory::remove(&file_part1, false);
976
977 let result = execute_test(
978 Some(setup_actions),
979 None,
980 vec![compaction_remove.into()],
981 vec![file_part2],
982 true, )
984 .await;
985 assert!(
987 result.is_ok(),
988 "Compaction with data_change=false should not conflict even with read_whole_table"
989 );
990 }
991
992 #[tokio::test]
993 #[cfg(feature = "datafusion")]
994 async fn test_multiple_compaction_removes_do_not_conflict() {
995 let file1 = simple_add(true, "1", "10");
997 let file2 = simple_add(true, "11", "20");
998 let mut setup_actions = init_table_actions();
999 setup_actions.push(file1.clone().into());
1000 setup_actions.push(file2.clone().into());
1001
1002 let mut remove1 = ActionFactory::remove(&file1, false);
1003 remove1.data_change = false;
1004 let mut remove2 = ActionFactory::remove(&file2, false);
1005 remove2.data_change = false;
1006
1007 let result = execute_test(
1008 Some(setup_actions),
1009 Some(col("value").lt_eq(lit::<i32>(20))),
1010 vec![remove1.into(), remove2.into()],
1011 vec![],
1012 false,
1013 )
1014 .await;
1015 assert!(
1017 result.is_ok(),
1018 "Multiple compaction removes with data_change=false should not conflict"
1019 );
1020 }
1021
1022 #[tokio::test]
1023 #[cfg(feature = "datafusion")]
1024 async fn test_mixed_removes_conflict_if_any_data_change() {
1025 let file1 = simple_add(true, "1", "10");
1027 let file2 = simple_add(true, "11", "20");
1028 let mut setup_actions = init_table_actions();
1029 setup_actions.push(file1.clone().into());
1030 setup_actions.push(file2.clone().into());
1031
1032 let mut compaction_remove = ActionFactory::remove(&file1, false);
1033 compaction_remove.data_change = false;
1034 let data_remove = ActionFactory::remove(&file2, true); let result = execute_test(
1037 Some(setup_actions),
1038 Some(col("value").lt_eq(lit::<i32>(20))),
1039 vec![compaction_remove.into(), data_remove.into()],
1040 vec![],
1041 false,
1042 )
1043 .await;
1044 assert!(
1046 matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
1047 "Mixed removes should conflict if any have data_change=true"
1048 );
1049 }
1050
1051 #[tokio::test]
1052 #[cfg(feature = "datafusion")]
1053 async fn test_concurrent_compaction_double_delete_still_conflicts() {
1054 let removed_file = simple_add(true, "1", "10");
1057 let mut setup_actions = init_table_actions();
1058 setup_actions.push(removed_file.clone().into());
1059
1060 let mut remove1 = ActionFactory::remove(&removed_file, false);
1061 remove1.data_change = false;
1062 let mut remove2 = ActionFactory::remove(&removed_file, false);
1063 remove2.data_change = false;
1064
1065 let result = execute_test(
1066 Some(setup_actions),
1067 None,
1068 vec![remove1.into()],
1069 vec![remove2.into()],
1070 false,
1071 )
1072 .await;
1073 assert!(
1075 matches!(result, Err(CommitConflictError::ConcurrentDeleteDelete)),
1076 "Concurrent double delete should conflict even with data_change=false"
1077 );
1078 }
1079
1080 #[tokio::test]
1081 #[cfg(feature = "datafusion")]
1082 async fn test_disjoint_partitions_add_and_write() {
1083 let file_part1_existing = simple_add(true, "1", "10");
1089 let file_part2_added = simple_add(true, "100", "200").into();
1090 let file_part1_new = simple_add(true, "5", "15").into();
1091
1092 let mut setup_actions = init_table_actions();
1093 setup_actions.push(file_part1_existing.into());
1094
1095 let result = execute_test(
1099 Some(setup_actions),
1100 Some(col("value").lt_eq(lit::<i32>(200))),
1101 vec![file_part2_added],
1102 vec![file_part1_new],
1103 false,
1104 )
1105 .await;
1106
1107 assert!(
1109 matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1110 "Adding file matching read predicate should conflict"
1111 );
1112 }
1113
1114 #[tokio::test]
1115 #[cfg(feature = "datafusion")]
1116 async fn test_disjoint_partitions_read_write_different_ranges() {
1117 let file_part1 = simple_add(true, "1", "10");
1119 let file_part2_added = simple_add(true, "100", "200").into();
1120 let file_part1_new = simple_add(true, "5", "15").into();
1121
1122 let mut setup_actions = init_table_actions();
1123 setup_actions.push(file_part1.into());
1124
1125 let result = execute_test(
1129 Some(setup_actions),
1130 Some(col("value").lt_eq(lit::<i32>(50))),
1131 vec![file_part2_added],
1132 vec![file_part1_new],
1133 false,
1134 )
1135 .await;
1136
1137 assert!(
1139 result.is_ok(),
1140 "Disjoint partition writes should not conflict"
1141 );
1142 }
1143
1144 #[tokio::test]
1145 #[cfg(feature = "datafusion")]
1146 async fn test_conflicting_app_transactions() {
1147 let file1 = simple_add(true, "1", "10").into();
1149
1150 let app_id = "streaming_query_1".to_string();
1151 let txn_action = Action::Txn(Transaction {
1152 app_id: app_id.clone(),
1153 version: 1,
1154 last_updated: None,
1155 });
1156
1157 let current_actions = vec![txn_action.clone()];
1159
1160 let concurrent_actions = vec![txn_action, file1];
1162
1163 let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1164
1165 assert!(
1167 matches!(result, Err(CommitConflictError::ConcurrentTransaction)),
1168 "Conflicting app transactions should fail"
1169 );
1170 }
1171
1172 #[tokio::test]
1173 #[cfg(feature = "datafusion")]
1174 async fn test_non_conflicting_different_app_transactions() {
1175 let file1 = simple_add(true, "1", "10").into();
1177
1178 let app_id1 = "streaming_query_1".to_string();
1179 let app_id2 = "streaming_query_2".to_string();
1180
1181 let txn_action1 = Action::Txn(Transaction {
1182 app_id: app_id1,
1183 version: 1,
1184 last_updated: None,
1185 });
1186
1187 let txn_action2 = Action::Txn(Transaction {
1188 app_id: app_id2,
1189 version: 1,
1190 last_updated: None,
1191 });
1192
1193 let current_actions = vec![txn_action1];
1195
1196 let concurrent_actions = vec![txn_action2, file1];
1198
1199 let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
1200
1201 assert!(
1203 result.is_ok(),
1204 "Non-conflicting app transactions should succeed"
1205 );
1206 }
1207
1208 #[tokio::test]
1209 #[cfg(feature = "datafusion")]
1210 async fn test_replace_where_initial_empty_conflicts_on_concurrent_add() {
1211 let mut setup_actions = init_table_actions();
1213 setup_actions.push(simple_add(true, "1", "1").into());
1214
1215 let result = execute_test(
1216 Some(setup_actions),
1217 Some(col("value").gt_eq(lit::<i32>(2))), vec![simple_add(true, "3", "3").into()], vec![simple_add(true, "2", "2").into()],
1220 false,
1221 )
1222 .await;
1223
1224 assert!(
1225 matches!(result, Err(CommitConflictError::ConcurrentAppend)),
1226 "ReplaceWhere-style empty read should conflict when a matching row is concurrently added"
1227 );
1228 }
1229
1230 #[tokio::test]
1231 #[cfg(feature = "datafusion")]
1232 async fn test_replace_where_disjoint_empty_allows_commit() {
1233 let mut setup_actions = init_table_actions();
1235 setup_actions.push(simple_add(true, "1", "1").into());
1236
1237 let result = execute_test(
1238 Some(setup_actions),
1239 Some(
1240 col("value")
1241 .gt(lit::<i32>(1))
1242 .and(col("value").lt_eq(lit::<i32>(3))),
1243 ), vec![simple_add(true, "5", "5").into()], vec![simple_add(true, "2", "2").into()],
1246 false,
1247 )
1248 .await;
1249
1250 assert!(
1251 result.is_ok(),
1252 "Disjoint replaceWhere-style transactions with empty reads should succeed"
1253 );
1254 }
1255}