1use std::collections::HashSet;
3
4use super::CommitInfo;
5#[cfg(feature = "datafusion")]
6use crate::delta_datafusion::DataFusionMixins;
7use crate::errors::DeltaResult;
8use crate::kernel::EagerSnapshot;
9use crate::kernel::Transaction;
10use crate::kernel::{Action, Add, Metadata, Protocol, Remove};
11use crate::logstore::{get_actions, LogStore};
12use crate::protocol::DeltaOperation;
13use crate::table::config::IsolationLevel;
14use crate::DeltaTableError;
15
16#[cfg(feature = "datafusion")]
17use super::state::AddContainer;
18#[cfg(feature = "datafusion")]
19use datafusion_expr::Expr;
20#[cfg(feature = "datafusion")]
21use itertools::Either;
22
23#[derive(thiserror::Error, Debug)]
25pub enum CommitConflictError {
26 #[error("Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")]
30 ConcurrentAppend,
31
32 #[error("Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation.")]
35 ConcurrentDeleteRead,
36
37 #[error("Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data.")]
40 ConcurrentDeleteDelete,
41
42 #[error("Metadata changed since last commit.")]
45 MetadataChanged,
46
47 #[error("Concurrent transaction failed.")]
51 ConcurrentTransaction,
52
53 #[error("Protocol changed since last commit: {0}")]
59 ProtocolChanged(String),
60
61 #[error("Delta-rs does not support writer version {0}")]
63 UnsupportedWriterVersion(i32),
64
65 #[error("Delta-rs does not support reader version {0}")]
67 UnsupportedReaderVersion(i32),
68
69 #[error("Snapshot is corrupted: {source}")]
71 CorruptedState {
72 source: Box<dyn std::error::Error + Send + Sync + 'static>,
74 },
75
76 #[error("Error evaluating predicate: {source}")]
78 Predicate {
79 source: Box<dyn std::error::Error + Send + Sync + 'static>,
81 },
82
83 #[error("No metadata found, please make sure table is loaded.")]
85 NoMetadata,
86}
87
88#[allow(unused)]
90pub(crate) struct TransactionInfo<'a> {
91 txn_id: String,
92 #[cfg(not(feature = "datafusion"))]
97 read_predicates: Option<String>,
98 #[cfg(feature = "datafusion")]
100 read_predicates: Option<Expr>,
101 pub(crate) read_app_ids: HashSet<String>,
103 actions: &'a [Action],
105 pub(crate) read_snapshot: &'a EagerSnapshot,
107 read_whole_table: bool,
109}
110
111impl<'a> TransactionInfo<'a> {
112 #[cfg(feature = "datafusion")]
113 pub fn try_new(
114 read_snapshot: &'a EagerSnapshot,
115 read_predicates: Option<String>,
116 actions: &'a [Action],
117 read_whole_table: bool,
118 ) -> DeltaResult<Self> {
119 use datafusion::prelude::SessionContext;
120
121 let session = SessionContext::new();
122 let read_predicates = read_predicates
123 .map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
124 .transpose()?;
125
126 let mut read_app_ids = HashSet::<String>::new();
127 for action in actions.iter() {
128 if let Action::Txn(Transaction { app_id, .. }) = action {
129 read_app_ids.insert(app_id.clone());
130 }
131 }
132
133 Ok(Self {
134 txn_id: "".into(),
135 read_predicates,
136 read_app_ids,
137 actions,
138 read_snapshot,
139 read_whole_table,
140 })
141 }
142
143 #[cfg(feature = "datafusion")]
144 #[allow(unused)]
145 pub fn new(
146 read_snapshot: &'a EagerSnapshot,
147 read_predicates: Option<Expr>,
148 actions: &'a Vec<Action>,
149 read_whole_table: bool,
150 ) -> Self {
151 let mut read_app_ids = HashSet::<String>::new();
152 for action in actions.iter() {
153 if let Action::Txn(Transaction { app_id, .. }) = action {
154 read_app_ids.insert(app_id.clone());
155 }
156 }
157 Self {
158 txn_id: "".into(),
159 read_predicates,
160 read_app_ids,
161 actions,
162 read_snapshot,
163 read_whole_table,
164 }
165 }
166
167 #[cfg(not(feature = "datafusion"))]
168 pub fn try_new(
169 read_snapshot: &'a EagerSnapshot,
170 read_predicates: Option<String>,
171 actions: &'a Vec<Action>,
172 read_whole_table: bool,
173 ) -> DeltaResult<Self> {
174 let mut read_app_ids = HashSet::<String>::new();
175 for action in actions.iter() {
176 if let Action::Txn(Transaction { app_id, .. }) = action {
177 read_app_ids.insert(app_id.clone());
178 }
179 }
180 Ok(Self {
181 txn_id: "".into(),
182 read_predicates,
183 read_app_ids,
184 actions,
185 read_snapshot,
186 read_whole_table,
187 })
188 }
189
190 pub fn metadata_changed(&self) -> bool {
192 self.actions
193 .iter()
194 .any(|a| matches!(a, Action::Metadata(_)))
195 }
196
197 #[cfg(feature = "datafusion")]
198 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
200 use crate::delta_datafusion::files_matching_predicate;
201
202 if let Some(predicate) = &self.read_predicates {
203 Ok(Either::Left(
204 files_matching_predicate(self.read_snapshot, &[predicate.clone()]).map_err(
205 |err| CommitConflictError::Predicate {
206 source: Box::new(err),
207 },
208 )?,
209 ))
210 } else {
211 Ok(Either::Right(std::iter::empty()))
212 }
213 }
214
215 #[cfg(not(feature = "datafusion"))]
216 pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
218 Ok(self.read_snapshot.file_actions().unwrap())
219 }
220
221 pub fn read_whole_table(&self) -> bool {
223 self.read_whole_table
224 }
225}
226
227#[derive(Debug)]
229pub(crate) struct WinningCommitSummary {
230 pub actions: Vec<Action>,
231 pub commit_info: Option<CommitInfo>,
232}
233
234impl WinningCommitSummary {
235 pub async fn try_new(
236 log_store: &dyn LogStore,
237 read_version: i64,
238 winning_commit_version: i64,
239 ) -> DeltaResult<Self> {
240 assert_eq!(winning_commit_version, read_version + 1);
242
243 let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
244 match commit_log_bytes {
245 Some(bytes) => {
246 let actions = get_actions(winning_commit_version, bytes).await?;
247 let commit_info = actions
248 .iter()
249 .find(|action| matches!(action, Action::CommitInfo(_)))
250 .map(|action| match action {
251 Action::CommitInfo(info) => info.clone(),
252 _ => unreachable!(),
253 });
254
255 Ok(Self {
256 actions,
257 commit_info,
258 })
259 }
260 None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
261 }
262 }
263
264 pub fn metadata_updates(&self) -> Vec<Metadata> {
265 self.actions
266 .iter()
267 .cloned()
268 .filter_map(|action| match action {
269 Action::Metadata(metadata) => Some(metadata),
270 _ => None,
271 })
272 .collect()
273 }
274
275 pub fn app_level_transactions(&self) -> HashSet<String> {
276 self.actions
277 .iter()
278 .cloned()
279 .filter_map(|action| match action {
280 Action::Txn(txn) => Some(txn.app_id),
281 _ => None,
282 })
283 .collect()
284 }
285
286 pub fn protocol(&self) -> Vec<Protocol> {
287 self.actions
288 .iter()
289 .cloned()
290 .filter_map(|action| match action {
291 Action::Protocol(protocol) => Some(protocol),
292 _ => None,
293 })
294 .collect()
295 }
296
297 pub fn removed_files(&self) -> Vec<Remove> {
298 self.actions
299 .iter()
300 .cloned()
301 .filter_map(|action| match action {
302 Action::Remove(remove) => Some(remove),
303 _ => None,
304 })
305 .collect()
306 }
307
308 pub fn added_files(&self) -> Vec<Add> {
309 self.actions
310 .iter()
311 .cloned()
312 .filter_map(|action| match action {
313 Action::Add(add) => Some(add),
314 _ => None,
315 })
316 .collect()
317 }
318
319 pub fn blind_append_added_files(&self) -> Vec<Add> {
320 if self.is_blind_append().unwrap_or(false) {
321 self.added_files()
322 } else {
323 vec![]
324 }
325 }
326
327 pub fn changed_data_added_files(&self) -> Vec<Add> {
328 if self.is_blind_append().unwrap_or(false) {
329 vec![]
330 } else {
331 self.added_files()
332 }
333 }
334
335 pub fn is_blind_append(&self) -> Option<bool> {
336 self.commit_info
337 .as_ref()
338 .map(|opt| opt.is_blind_append.unwrap_or(false))
339 }
340}
341
342pub(crate) struct ConflictChecker<'a> {
344 txn_info: TransactionInfo<'a>,
346 winning_commit_summary: WinningCommitSummary,
348 isolation_level: IsolationLevel,
350}
351
352impl<'a> ConflictChecker<'a> {
353 pub fn new(
354 transaction_info: TransactionInfo<'a>,
355 winning_commit_summary: WinningCommitSummary,
356 operation: Option<&DeltaOperation>,
357 ) -> ConflictChecker<'a> {
358 let isolation_level = operation
359 .and_then(|op| {
360 if can_downgrade_to_snapshot_isolation(
361 &winning_commit_summary.actions,
362 op,
363 &transaction_info
364 .read_snapshot
365 .table_config()
366 .isolation_level(),
367 ) {
368 Some(IsolationLevel::SnapshotIsolation)
369 } else {
370 None
371 }
372 })
373 .unwrap_or_else(|| {
374 transaction_info
375 .read_snapshot
376 .table_config()
377 .isolation_level()
378 });
379
380 Self {
381 txn_info: transaction_info,
382 winning_commit_summary,
383 isolation_level,
384 }
385 }
386
387 pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
391 self.check_protocol_compatibility()?;
392 self.check_no_metadata_updates()?;
393 self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
394 self.check_for_deleted_files_against_current_txn_read_files()?;
395 self.check_for_deleted_files_against_current_txn_deleted_files()?;
396 self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
397 Ok(())
398 }
399
400 fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
403 for p in self.winning_commit_summary.protocol() {
404 let (win_read, curr_read) = (
405 p.min_reader_version,
406 self.txn_info.read_snapshot.protocol().min_reader_version,
407 );
408 let (win_write, curr_write) = (
409 p.min_writer_version,
410 self.txn_info.read_snapshot.protocol().min_writer_version,
411 );
412 if curr_read < win_read || win_write < curr_write {
413 return Err(CommitConflictError::ProtocolChanged(
414 format!("required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"),
415 ));
416 };
417 }
418 if !self.winning_commit_summary.protocol().is_empty()
419 && self
420 .txn_info
421 .actions
422 .iter()
423 .any(|a| matches!(a, Action::Protocol(_)))
424 {
425 return Err(CommitConflictError::ProtocolChanged(
426 "protocol changed".into(),
427 ));
428 };
429 Ok(())
430 }
431
432 fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
434 if !self.winning_commit_summary.metadata_updates().is_empty() {
436 Err(CommitConflictError::MetadataChanged)
437 } else {
438 Ok(())
439 }
440 }
441
442 fn check_for_added_files_that_should_have_been_read_by_current_txn(
445 &self,
446 ) -> Result<(), CommitConflictError> {
447 if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
449 return Ok(());
450 }
451
452 let added_files_to_check = match self.isolation_level {
454 IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
455 self.winning_commit_summary.changed_data_added_files()
457 }
458 IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
459 let mut files = self.winning_commit_summary.changed_data_added_files();
460 files.extend(self.winning_commit_summary.blind_append_added_files());
461 files
462 }
463 IsolationLevel::SnapshotIsolation => vec![],
464 };
465
466 cfg_if::cfg_if! {
470 if #[cfg(feature = "datafusion")] {
471 let added_files_matching_predicates = if let (Some(predicate), false) = (
472 &self.txn_info.read_predicates,
473 self.txn_info.read_whole_table(),
474 ) {
475 let arrow_schema = self.txn_info.read_snapshot.arrow_schema().map_err(|err| {
476 CommitConflictError::CorruptedState {
477 source: Box::new(err),
478 }
479 })?;
480 let partition_columns = &self
481 .txn_info
482 .read_snapshot
483 .metadata()
484 .partition_columns;
485 AddContainer::new(&added_files_to_check, partition_columns, arrow_schema)
486 .predicate_matches(predicate.clone())
487 .map_err(|err| CommitConflictError::Predicate {
488 source: Box::new(err),
489 })?
490 .cloned()
491 .collect::<Vec<_>>()
492 } else if self.txn_info.read_whole_table() {
493 added_files_to_check
494 } else {
495 vec![]
496 };
497 } else {
498 let added_files_matching_predicates = if self.txn_info.read_whole_table()
499 {
500 added_files_to_check
501 } else {
502 vec![]
503 };
504 }
505 }
506
507 if !added_files_matching_predicates.is_empty() {
508 Err(CommitConflictError::ConcurrentAppend)
509 } else {
510 Ok(())
511 }
512 }
513
514 fn check_for_deleted_files_against_current_txn_read_files(
517 &self,
518 ) -> Result<(), CommitConflictError> {
519 let read_file_path: HashSet<String> = self
521 .txn_info
522 .read_files()?
523 .map(|f| f.path.clone())
524 .collect();
525 let deleted_read_overlap = self
526 .winning_commit_summary
527 .removed_files()
528 .iter()
529 .find(|&f| read_file_path.contains(&f.path))
530 .cloned();
531 if deleted_read_overlap.is_some()
532 || (!self.winning_commit_summary.removed_files().is_empty()
533 && self.txn_info.read_whole_table())
534 {
535 Err(CommitConflictError::ConcurrentDeleteRead)
536 } else {
537 Ok(())
538 }
539 }
540
541 fn check_for_deleted_files_against_current_txn_deleted_files(
544 &self,
545 ) -> Result<(), CommitConflictError> {
546 let txn_deleted_files: HashSet<String> = self
548 .txn_info
549 .actions
550 .iter()
551 .cloned()
552 .filter_map(|action| match action {
553 Action::Remove(remove) => Some(remove.path),
554 _ => None,
555 })
556 .collect();
557 let winning_deleted_files: HashSet<String> = self
558 .winning_commit_summary
559 .removed_files()
560 .iter()
561 .cloned()
562 .map(|r| r.path)
563 .collect();
564 let intersection: HashSet<&String> = txn_deleted_files
565 .intersection(&winning_deleted_files)
566 .collect();
567
568 if !intersection.is_empty() {
569 Err(CommitConflictError::ConcurrentDeleteDelete)
570 } else {
571 Ok(())
572 }
573 }
574
575 fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
578 &self,
579 ) -> Result<(), CommitConflictError> {
580 let winning_txns = self.winning_commit_summary.app_level_transactions();
585 let txn_overlap: HashSet<&String> = winning_txns
586 .intersection(&self.txn_info.read_app_ids)
587 .collect();
588 if !txn_overlap.is_empty() {
589 Err(CommitConflictError::ConcurrentTransaction)
590 } else {
591 Ok(())
592 }
593 }
594}
595
596pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
619 actions: impl IntoIterator<Item = &'a Action>,
620 operation: &DeltaOperation,
621 isolation_level: &IsolationLevel,
622) -> bool {
623 let mut data_changed = false;
624 let mut has_non_file_actions = false;
625 for action in actions {
626 match action {
627 Action::Add(act) if act.data_change => data_changed = true,
628 Action::Remove(rem) if rem.data_change => data_changed = true,
629 _ => has_non_file_actions = true,
630 }
631 }
632
633 if has_non_file_actions {
634 return false;
636 }
637
638 match isolation_level {
639 IsolationLevel::Serializable => !data_changed,
640 IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
641 IsolationLevel::SnapshotIsolation => false, }
643}
644
645#[cfg(test)]
646#[allow(unused)]
647mod tests {
648 use std::collections::HashMap;
649
650 #[cfg(feature = "datafusion")]
651 use datafusion_expr::{col, lit};
652 use serde_json::json;
653
654 use super::*;
655 use crate::kernel::Action;
656 use crate::test_utils::{ActionFactory, TestSchemas};
657
658 fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
659 ActionFactory::add(
660 TestSchemas::simple(),
661 HashMap::from_iter([("value", (min, max))]),
662 Default::default(),
663 true,
664 )
665 }
666
667 fn init_table_actions() -> Vec<Action> {
668 vec![
669 ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
670 ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
671 ]
672 }
673
674 #[test]
675 fn test_can_downgrade_to_snapshot_isolation() {
676 let isolation = IsolationLevel::WriteSerializable;
677 let operation = DeltaOperation::Optimize {
678 predicate: None,
679 target_size: 0,
680 };
681 let add =
682 ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
683 let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
684 assert!(!res)
685 }
686
687 #[cfg(feature = "datafusion")]
694 fn execute_test(
695 setup: Option<Vec<Action>>,
696 reads: Option<Expr>,
697 concurrent: Vec<Action>,
698 actions: Vec<Action>,
699 read_whole_table: bool,
700 ) -> Result<(), CommitConflictError> {
701 use crate::table::state::DeltaTableState;
702
703 let setup_actions = setup.unwrap_or_else(init_table_actions);
704 let state = DeltaTableState::from_actions(setup_actions).unwrap();
705 let snapshot = state.snapshot();
706 let transaction_info = TransactionInfo::new(snapshot, reads, &actions, read_whole_table);
707 let summary = WinningCommitSummary {
708 actions: concurrent,
709 commit_info: None,
710 };
711 let checker = ConflictChecker::new(transaction_info, summary, None);
712 checker.check_conflicts()
713 }
714
715 #[tokio::test]
716 #[cfg(feature = "datafusion")]
717 async fn test_allowed_concurrent_actions() {
719 let file1 = simple_add(true, "1", "10").into();
722 let file2 = simple_add(true, "1", "10").into();
723
724 let result = execute_test(None, None, vec![file1], vec![file2], false);
725 assert!(result.is_ok());
726
727 let file_not_read = simple_add(true, "1", "10");
730 let file_read = simple_add(true, "100", "10000").into();
731 let mut setup_actions = init_table_actions();
732 setup_actions.push(file_not_read.clone().into());
733 setup_actions.push(file_read);
734 let result = execute_test(
735 Some(setup_actions),
736 Some(col("value").gt(lit::<i32>(10))),
737 vec![ActionFactory::remove(&file_not_read, true).into()],
738 vec![],
739 false,
740 );
741 assert!(result.is_ok());
742
743 let file_added = simple_add(true, "1", "10").into();
746 let file_read = simple_add(true, "100", "10000").into();
747 let mut setup_actions = init_table_actions();
748 setup_actions.push(file_read);
749 let result = execute_test(
750 Some(setup_actions),
751 Some(col("value").gt(lit::<i32>(10))),
752 vec![file_added],
753 vec![],
754 false,
755 );
756 assert!(result.is_ok());
757
758 }
774
775 #[tokio::test]
776 #[cfg(feature = "datafusion")]
777 async fn test_disallowed_concurrent_actions() {
779 let removed_file = simple_add(true, "1", "10");
782 let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
783 let result = execute_test(
784 None,
785 None,
786 vec![removed_file.clone()],
787 vec![removed_file],
788 false,
789 );
790 assert!(matches!(
791 result,
792 Err(CommitConflictError::ConcurrentDeleteDelete)
793 ));
794
795 let file_added = simple_add(true, "1", "10").into();
798 let file_should_have_read = simple_add(true, "1", "10").into();
799 let result = execute_test(
800 None,
801 Some(col("value").lt_eq(lit::<i32>(10))),
802 vec![file_should_have_read],
803 vec![file_added],
804 false,
805 );
806 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
807
808 let file_read = simple_add(true, "1", "10");
811 let mut setup_actions = init_table_actions();
812 setup_actions.push(file_read.clone().into());
813 let result = execute_test(
814 Some(setup_actions),
815 Some(col("value").lt_eq(lit::<i32>(10))),
816 vec![ActionFactory::remove(&file_read, true).into()],
817 vec![],
818 false,
819 );
820 assert!(matches!(
821 result,
822 Err(CommitConflictError::ConcurrentDeleteRead)
823 ));
824
825 let result = execute_test(
828 None,
829 None,
830 vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
831 vec![],
832 false,
833 );
834 assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
835
836 let result = execute_test(
839 None,
840 None,
841 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
842 vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
843 false,
844 );
845 assert!(matches!(
846 result,
847 Err(CommitConflictError::ProtocolChanged(_))
848 ));
849
850 let file_part1 = simple_add(true, "1", "10").into();
854 let file_part2 = simple_add(true, "11", "100").into();
855 let file_part3 = simple_add(true, "101", "1000").into();
856 let mut setup_actions = init_table_actions();
857 setup_actions.push(file_part1);
858 let result = execute_test(
859 Some(setup_actions),
860 Some(col("value").lt(lit::<i32>(0))),
862 vec![file_part2],
863 vec![file_part3],
864 true,
865 );
866 assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
867
868 let file_part1 = simple_add(true, "1", "10");
871 let file_part2 = simple_add(true, "11", "100").into();
872 let mut setup_actions = init_table_actions();
873 setup_actions.push(file_part1.clone().into());
874 let result = execute_test(
875 Some(setup_actions),
876 None,
877 vec![ActionFactory::remove(&file_part1, true).into()],
878 vec![file_part2],
879 true,
880 );
881 assert!(matches!(
882 result,
883 Err(CommitConflictError::ConcurrentDeleteRead)
884 ));
885
886 }
890}