use std::collections::HashSet;
use delta_kernel::table_properties::IsolationLevel;
use super::CommitInfo;
use crate::DeltaTableError;
#[cfg(feature = "datafusion")]
use crate::delta_datafusion::DataFusionMixins;
use crate::errors::DeltaResult;
use crate::kernel::{
Action, Add, LogDataHandler, Metadata, Protocol, Remove, Transaction, Version,
};
use crate::logstore::{LogStore, get_actions};
use crate::protocol::DeltaOperation;
use crate::table::config::TablePropertiesExt as _;
#[cfg(feature = "datafusion")]
use super::state::AddContainer;
#[cfg(feature = "datafusion")]
use datafusion::logical_expr::Expr;
#[cfg(feature = "datafusion")]
use itertools::Either;
#[derive(thiserror::Error, Debug)]
pub enum CommitConflictError {
#[error(
"Commit failed: a concurrent transactions added new data.\nHelp: This transaction's query must be rerun to include the new data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
)]
ConcurrentAppend,
#[error(
"Commit failed: a concurrent transaction deleted data this operation read.\nHelp: This transaction's query must be rerun to exclude the removed data. Also, if you don't care to require this check to pass in the future, the isolation level can be set to Snapshot Isolation."
)]
ConcurrentDeleteRead,
#[error(
"Commit failed: a concurrent transaction deleted the same data your transaction deletes.\nHelp: you should retry this write operation. If it was based on data contained in the table, you should rerun the query generating the data."
)]
ConcurrentDeleteDelete,
#[error("Metadata changed since last commit.")]
MetadataChanged,
#[error("Concurrent transaction failed.")]
ConcurrentTransaction,
#[error("Protocol changed since last commit: {0}")]
ProtocolChanged(String),
#[error("Delta-rs does not support writer version {0}")]
UnsupportedWriterVersion(i32),
#[error("Delta-rs does not support reader version {0}")]
UnsupportedReaderVersion(i32),
#[error("Snapshot is corrupted: {source}")]
CorruptedState {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("Error evaluating predicate: {source}")]
Predicate {
source: Box<dyn std::error::Error + Send + Sync + 'static>,
},
#[error("No metadata found, please make sure table is loaded.")]
NoMetadata,
}
#[allow(unused)]
pub(crate) struct TransactionInfo<'a> {
txn_id: String,
#[cfg(not(feature = "datafusion"))]
read_predicates: Option<String>,
#[cfg(feature = "datafusion")]
read_predicates: Option<Expr>,
read_app_ids: HashSet<String>,
actions: &'a [Action],
read_snapshot: LogDataHandler<'a>,
read_whole_table: bool,
}
impl<'a> TransactionInfo<'a> {
#[cfg(feature = "datafusion")]
pub fn try_new(
read_snapshot: LogDataHandler<'a>,
read_predicates: Option<String>,
actions: &'a [Action],
read_whole_table: bool,
) -> DeltaResult<Self> {
use datafusion::prelude::SessionContext;
let session = SessionContext::new();
let read_predicates = read_predicates
.map(|pred| read_snapshot.parse_predicate_expression(pred, &session.state()))
.transpose()?;
let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Transaction { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}
Ok(Self::new(
read_snapshot,
read_predicates,
actions,
read_whole_table,
))
}
#[cfg(feature = "datafusion")]
pub fn new(
read_snapshot: LogDataHandler<'a>,
read_predicates: Option<Expr>,
actions: &'a [Action],
read_whole_table: bool,
) -> Self {
let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Transaction { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}
Self {
txn_id: "".into(),
read_predicates,
read_app_ids,
actions,
read_snapshot,
read_whole_table,
}
}
#[cfg(not(feature = "datafusion"))]
pub fn try_new(
read_snapshot: LogDataHandler<'a>,
read_predicates: Option<String>,
actions: &'a Vec<Action>,
read_whole_table: bool,
) -> DeltaResult<Self> {
let mut read_app_ids = HashSet::<String>::new();
for action in actions.iter() {
if let Action::Txn(Transaction { app_id, .. }) = action {
read_app_ids.insert(app_id.clone());
}
}
Ok(Self {
txn_id: "".into(),
read_predicates,
read_app_ids,
actions,
read_snapshot,
read_whole_table,
})
}
pub fn metadata_changed(&self) -> bool {
self.actions
.iter()
.any(|a| matches!(a, Action::Metadata(_)))
}
#[cfg(feature = "datafusion")]
pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
use crate::delta_datafusion::files_matching_predicate;
if let Some(predicate) = &self.read_predicates {
Ok(Either::Left(
files_matching_predicate(
self.read_snapshot.clone(),
std::slice::from_ref(predicate),
)
.map_err(|err| CommitConflictError::Predicate {
source: Box::new(err),
})?,
))
} else {
Ok(Either::Right(self.read_snapshot.iter().map(|f| f.to_add())))
}
}
#[cfg(not(feature = "datafusion"))]
pub fn read_files(&self) -> Result<impl Iterator<Item = Add> + '_, CommitConflictError> {
Ok(self.read_snapshot.iter().map(|f| f.to_add()))
}
pub fn read_whole_table(&self) -> bool {
self.read_whole_table
}
}
#[derive(Debug)]
pub(crate) struct WinningCommitSummary {
pub actions: Vec<Action>,
pub commit_info: Option<CommitInfo>,
}
impl WinningCommitSummary {
pub async fn try_new(
log_store: &dyn LogStore,
read_version: Version,
winning_commit_version: Version,
) -> DeltaResult<Self> {
assert_eq!(winning_commit_version, read_version + 1);
let commit_log_bytes = log_store.read_commit_entry(winning_commit_version).await?;
match commit_log_bytes {
Some(bytes) => {
let actions = get_actions(winning_commit_version, &bytes)?; let commit_info = actions
.iter()
.find(|action| matches!(action, Action::CommitInfo(_)))
.map(|action| match action {
Action::CommitInfo(info) => info.clone(),
_ => unreachable!(),
});
Ok(Self {
actions,
commit_info,
})
}
None => Err(DeltaTableError::InvalidVersion(winning_commit_version)),
}
}
pub fn metadata_updates(&self) -> Vec<Metadata> {
self.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Metadata(metadata) => Some(metadata),
_ => None,
})
.collect()
}
pub fn app_level_transactions(&self) -> HashSet<String> {
self.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Txn(txn) => Some(txn.app_id),
_ => None,
})
.collect()
}
pub fn protocol(&self) -> Vec<Protocol> {
self.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Protocol(protocol) => Some(protocol),
_ => None,
})
.collect()
}
pub fn removed_files(&self) -> Vec<Remove> {
self.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Remove(remove) => Some(remove),
_ => None,
})
.collect()
}
pub fn added_files(&self) -> Vec<Add> {
self.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Add(add) => Some(add),
_ => None,
})
.collect()
}
pub fn blind_append_added_files(&self) -> Vec<Add> {
if self.is_blind_append().unwrap_or(false) {
self.added_files()
} else {
vec![]
}
}
pub fn changed_data_added_files(&self) -> Vec<Add> {
if self.is_blind_append().unwrap_or(false) {
vec![]
} else {
self.added_files()
}
}
pub fn is_blind_append(&self) -> Option<bool> {
self.commit_info
.as_ref()
.map(|opt| opt.is_blind_append.unwrap_or(false))
}
}
pub(crate) struct ConflictChecker<'a> {
txn_info: TransactionInfo<'a>,
winning_commit_summary: WinningCommitSummary,
isolation_level: IsolationLevel,
}
impl<'a> ConflictChecker<'a> {
pub fn new(
transaction_info: TransactionInfo<'a>,
winning_commit_summary: WinningCommitSummary,
operation: Option<&DeltaOperation>,
) -> ConflictChecker<'a> {
let isolation_level = operation
.and_then(|op| {
if can_downgrade_to_snapshot_isolation(
&winning_commit_summary.actions,
op,
&transaction_info
.read_snapshot
.table_properties()
.isolation_level(),
) {
Some(IsolationLevel::SnapshotIsolation)
} else {
None
}
})
.unwrap_or_else(|| {
transaction_info
.read_snapshot
.table_properties()
.isolation_level()
});
Self {
txn_info: transaction_info,
winning_commit_summary,
isolation_level,
}
}
pub fn check_conflicts(&self) -> Result<(), CommitConflictError> {
self.check_protocol_compatibility()?;
self.check_no_metadata_updates()?;
self.check_for_added_files_that_should_have_been_read_by_current_txn()?;
self.check_for_deleted_files_against_current_txn_read_files()?;
self.check_for_deleted_files_against_current_txn_deleted_files()?;
self.check_for_updated_application_transaction_ids_that_current_txn_depends_on()?;
Ok(())
}
fn check_protocol_compatibility(&self) -> Result<(), CommitConflictError> {
for p in self.winning_commit_summary.protocol() {
let (win_read, curr_read) = (
p.min_reader_version(),
self.txn_info.read_snapshot.protocol().min_reader_version(),
);
let (win_write, curr_write) = (
p.min_writer_version(),
self.txn_info.read_snapshot.protocol().min_writer_version(),
);
if curr_read < win_read || win_write < curr_write {
return Err(CommitConflictError::ProtocolChanged(format!(
"required read/write {win_read}/{win_write}, current read/write {curr_read}/{curr_write}"
)));
};
}
if !self.winning_commit_summary.protocol().is_empty()
&& self
.txn_info
.actions
.iter()
.any(|a| matches!(a, Action::Protocol(_)))
{
return Err(CommitConflictError::ProtocolChanged(
"protocol changed".into(),
));
};
Ok(())
}
fn check_no_metadata_updates(&self) -> Result<(), CommitConflictError> {
if !self.winning_commit_summary.metadata_updates().is_empty() {
Err(CommitConflictError::MetadataChanged)
} else {
Ok(())
}
}
fn check_for_added_files_that_should_have_been_read_by_current_txn(
&self,
) -> Result<(), CommitConflictError> {
if matches!(self.isolation_level, IsolationLevel::SnapshotIsolation) {
return Ok(());
}
let added_files_to_check = match self.isolation_level {
IsolationLevel::WriteSerializable if !self.txn_info.metadata_changed() => {
self.winning_commit_summary.changed_data_added_files()
}
IsolationLevel::Serializable | IsolationLevel::WriteSerializable => {
let mut files = self.winning_commit_summary.changed_data_added_files();
files.extend(self.winning_commit_summary.blind_append_added_files());
files
}
IsolationLevel::SnapshotIsolation => vec![],
};
cfg_if::cfg_if! {
if #[cfg(feature = "datafusion")] {
let added_files_matching_predicates = if let (Some(predicate), false) = (
&self.txn_info.read_predicates,
self.txn_info.read_whole_table(),
) {
let arrow_schema = self.txn_info.read_snapshot.read_schema();
let partition_columns = self
.txn_info
.read_snapshot
.metadata()
.partition_columns()
.to_vec();
AddContainer::new(&added_files_to_check, &partition_columns, arrow_schema)
.predicate_matches(predicate.clone())
.map_err(|err| CommitConflictError::Predicate {
source: Box::new(err),
})?
.cloned()
.collect::<Vec<_>>()
} else if self.txn_info.read_whole_table() {
added_files_to_check
} else {
vec![]
};
} else {
let added_files_matching_predicates = if self.txn_info.read_whole_table()
{
added_files_to_check
} else {
vec![]
};
}
}
if !added_files_matching_predicates.is_empty() {
Err(CommitConflictError::ConcurrentAppend)
} else {
Ok(())
}
}
fn check_for_deleted_files_against_current_txn_read_files(
&self,
) -> Result<(), CommitConflictError> {
let read_file_path: HashSet<String> = self
.txn_info
.read_files()?
.map(|f| f.path.clone())
.collect();
let removed_files_with_data_change: Vec<Remove> = self
.winning_commit_summary
.removed_files()
.into_iter()
.filter(|r| r.data_change)
.collect();
let deleted_read_overlap = removed_files_with_data_change
.iter()
.find(|f| read_file_path.contains(&f.path));
if deleted_read_overlap.is_some()
|| (!removed_files_with_data_change.is_empty() && self.txn_info.read_whole_table())
{
Err(CommitConflictError::ConcurrentDeleteRead)
} else {
Ok(())
}
}
fn check_for_deleted_files_against_current_txn_deleted_files(
&self,
) -> Result<(), CommitConflictError> {
let txn_deleted_files: HashSet<String> = self
.txn_info
.actions
.iter()
.cloned()
.filter_map(|action| match action {
Action::Remove(remove) => Some(remove.path),
_ => None,
})
.collect();
let winning_deleted_files: HashSet<String> = self
.winning_commit_summary
.removed_files()
.iter()
.cloned()
.map(|r| r.path)
.collect();
let intersection: HashSet<&String> = txn_deleted_files
.intersection(&winning_deleted_files)
.collect();
if !intersection.is_empty() {
Err(CommitConflictError::ConcurrentDeleteDelete)
} else {
Ok(())
}
}
fn check_for_updated_application_transaction_ids_that_current_txn_depends_on(
&self,
) -> Result<(), CommitConflictError> {
let winning_txns = self.winning_commit_summary.app_level_transactions();
let txn_overlap: HashSet<&String> = winning_txns
.intersection(&self.txn_info.read_app_ids)
.collect();
if !txn_overlap.is_empty() {
Err(CommitConflictError::ConcurrentTransaction)
} else {
Ok(())
}
}
}
pub(super) fn can_downgrade_to_snapshot_isolation<'a>(
actions: impl IntoIterator<Item = &'a Action>,
operation: &DeltaOperation,
isolation_level: &IsolationLevel,
) -> bool {
let mut data_changed = false;
let mut has_non_file_actions = false;
for action in actions {
match action {
Action::Add(act) if act.data_change => data_changed = true,
Action::Remove(rem) if rem.data_change => data_changed = true,
_ => has_non_file_actions = true,
}
}
if has_non_file_actions {
return false;
}
match isolation_level {
IsolationLevel::Serializable => !data_changed,
IsolationLevel::WriteSerializable => !data_changed && !operation.changes_data(),
IsolationLevel::SnapshotIsolation => false, }
}
#[cfg(test)]
#[allow(unused)]
mod tests {
use std::collections::HashMap;
#[cfg(feature = "datafusion")]
use datafusion::logical_expr::{col, lit};
use serde_json::json;
use super::*;
use crate::kernel::Action;
use crate::test_utils::{ActionFactory, TestSchemas};
fn simple_add(data_change: bool, min: &str, max: &str) -> Add {
ActionFactory::add(
TestSchemas::simple(),
HashMap::from_iter([("value", (min, max))]),
Default::default(),
true,
)
}
fn init_table_actions() -> Vec<Action> {
vec![
ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into(),
ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into(),
]
}
#[test]
fn test_can_downgrade_to_snapshot_isolation() {
let isolation = IsolationLevel::WriteSerializable;
let operation = DeltaOperation::Optimize {
predicate: None,
target_size: 0,
};
let add =
ActionFactory::add(TestSchemas::simple(), HashMap::new(), Vec::new(), true).into();
let res = can_downgrade_to_snapshot_isolation(&[add], &operation, &isolation);
assert!(!res)
}
#[cfg(feature = "datafusion")]
async fn execute_test(
setup: Option<Vec<Action>>,
reads: Option<Expr>,
concurrent: Vec<Action>,
actions: Vec<Action>,
read_whole_table: bool,
) -> Result<(), CommitConflictError> {
use crate::table::state::DeltaTableState;
use object_store::path::Path;
let setup_actions = setup.unwrap_or_else(init_table_actions);
let state = DeltaTableState::from_actions(setup_actions).await.unwrap();
let snapshot = state.snapshot();
let transaction_info =
TransactionInfo::new(snapshot.log_data(), reads, &actions, read_whole_table);
let summary = WinningCommitSummary {
actions: concurrent,
commit_info: None,
};
let checker = ConflictChecker::new(transaction_info, summary, None);
checker.check_conflicts()
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_append_append() {
let file1 = simple_add(true, "1", "10").into();
let file2 = simple_add(true, "1", "10").into();
let result = execute_test(None, None, vec![file1], vec![file2], false).await;
assert!(result.is_ok());
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_disjoint_delete_read() {
let file_not_read = simple_add(true, "1", "10");
let file_read = simple_add(true, "100", "10000").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_not_read.clone().into());
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Some(col("value").gt(lit::<i32>(10))),
vec![ActionFactory::remove(&file_not_read, true).into()],
vec![],
false,
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_disjoint_add_read() {
let file_added = simple_add(true, "1", "10").into();
let file_read = simple_add(true, "100", "10000").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Some(col("value").gt(lit::<i32>(10))),
vec![file_added],
vec![],
false,
)
.await;
assert!(result.is_ok());
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_delete_delete() {
let removed_file = simple_add(true, "1", "10");
let removed_file: Action = ActionFactory::remove(&removed_file, true).into();
let result = execute_test(
None,
None,
vec![removed_file.clone()],
vec![removed_file],
false,
)
.await;
assert!(matches!(
result,
Err(CommitConflictError::ConcurrentDeleteDelete)
));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_add_conflicts_with_read_and_write() {
let file_added = simple_add(true, "1", "10").into();
let file_should_have_read = simple_add(true, "1", "10").into();
let result = execute_test(
None,
Some(col("value").lt_eq(lit::<i32>(10))),
vec![file_should_have_read],
vec![file_added],
false,
)
.await;
assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_delete_conflicts_with_read() {
let file_read = simple_add(true, "1", "10");
let mut setup_actions = init_table_actions();
setup_actions.push(file_read.clone().into());
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(10))),
vec![ActionFactory::remove(&file_read, true).into()],
vec![],
false,
)
.await;
assert!(matches!(
result,
Err(CommitConflictError::ConcurrentDeleteRead)
));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_metadata_change() {
let result = execute_test(
None,
None,
vec![ActionFactory::metadata(TestSchemas::simple(), None::<Vec<&str>>, None).into()],
vec![],
false,
)
.await;
assert!(matches!(result, Err(CommitConflictError::MetadataChanged)));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_protocol_upgrade() {
let result = execute_test(
None,
None,
vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
vec![ActionFactory::protocol(None, None, None::<Vec<_>>, None::<Vec<_>>).into()],
false,
)
.await;
assert!(matches!(
result,
Err(CommitConflictError::ProtocolChanged(_))
));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_read_whole_table_disallows_concurrent_append() {
let file_part1 = simple_add(true, "1", "10").into();
let file_part2 = simple_add(true, "11", "100").into();
let file_part3 = simple_add(true, "101", "1000").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
Some(col("value").lt(lit::<i32>(0))),
vec![file_part2],
vec![file_part3],
true,
)
.await;
assert!(matches!(result, Err(CommitConflictError::ConcurrentAppend)));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_read_whole_table_disallows_concurrent_remove() {
let file_part1 = simple_add(true, "1", "10");
let file_part2 = simple_add(true, "11", "100").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_part1.clone().into());
let result = execute_test(
Some(setup_actions),
None,
vec![ActionFactory::remove(&file_part1, true).into()],
vec![file_part2],
true,
)
.await;
assert!(matches!(
result,
Err(CommitConflictError::ConcurrentDeleteRead)
));
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_compaction_remove_does_not_conflict_with_read() {
let file_read = simple_add(true, "1", "10");
let mut setup_actions = init_table_actions();
setup_actions.push(file_read.clone().into());
let mut compaction_remove = ActionFactory::remove(&file_read, false);
compaction_remove.data_change = false;
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(10))),
vec![compaction_remove.into()],
vec![],
false,
)
.await;
assert!(
result.is_ok(),
"Compaction with data_change=false should not conflict with reads"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_data_delete_conflicts_with_read() {
let file_read = simple_add(true, "1", "10");
let mut setup_actions = init_table_actions();
setup_actions.push(file_read.clone().into());
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(10))),
vec![ActionFactory::remove(&file_read, true).into()],
vec![],
false,
)
.await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
"Delete with data_change=true should conflict with reads"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_compaction_does_not_conflict_with_whole_table_read() {
let file_part1 = simple_add(true, "1", "10");
let file_part2 = simple_add(true, "11", "100").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_part1.clone().into());
let compaction_remove = ActionFactory::remove(&file_part1, false);
let result = execute_test(
Some(setup_actions),
None,
vec![compaction_remove.into()],
vec![file_part2],
true, )
.await;
assert!(
result.is_ok(),
"Compaction with data_change=false should not conflict even with read_whole_table"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_multiple_compaction_removes_do_not_conflict() {
let file1 = simple_add(true, "1", "10");
let file2 = simple_add(true, "11", "20");
let mut setup_actions = init_table_actions();
setup_actions.push(file1.clone().into());
setup_actions.push(file2.clone().into());
let mut remove1 = ActionFactory::remove(&file1, false);
remove1.data_change = false;
let mut remove2 = ActionFactory::remove(&file2, false);
remove2.data_change = false;
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(20))),
vec![remove1.into(), remove2.into()],
vec![],
false,
)
.await;
assert!(
result.is_ok(),
"Multiple compaction removes with data_change=false should not conflict"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_mixed_removes_conflict_if_any_data_change() {
let file1 = simple_add(true, "1", "10");
let file2 = simple_add(true, "11", "20");
let mut setup_actions = init_table_actions();
setup_actions.push(file1.clone().into());
setup_actions.push(file2.clone().into());
let mut compaction_remove = ActionFactory::remove(&file1, false);
compaction_remove.data_change = false;
let data_remove = ActionFactory::remove(&file2, true);
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(20))),
vec![compaction_remove.into(), data_remove.into()],
vec![],
false,
)
.await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentDeleteRead)),
"Mixed removes should conflict if any have data_change=true"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_concurrent_compaction_double_delete_still_conflicts() {
let removed_file = simple_add(true, "1", "10");
let mut setup_actions = init_table_actions();
setup_actions.push(removed_file.clone().into());
let mut remove1 = ActionFactory::remove(&removed_file, false);
remove1.data_change = false;
let mut remove2 = ActionFactory::remove(&removed_file, false);
remove2.data_change = false;
let result = execute_test(
Some(setup_actions),
None,
vec![remove1.into()],
vec![remove2.into()],
false,
)
.await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentDeleteDelete)),
"Concurrent double delete should conflict even with data_change=false"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_disjoint_partitions_add_and_write() {
let file_part1_existing = simple_add(true, "1", "10");
let file_part2_added = simple_add(true, "100", "200").into();
let file_part1_new = simple_add(true, "5", "15").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_part1_existing.into());
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(200))),
vec![file_part2_added],
vec![file_part1_new],
false,
)
.await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentAppend)),
"Adding file matching read predicate should conflict"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_disjoint_partitions_read_write_different_ranges() {
let file_part1 = simple_add(true, "1", "10");
let file_part2_added = simple_add(true, "100", "200").into();
let file_part1_new = simple_add(true, "5", "15").into();
let mut setup_actions = init_table_actions();
setup_actions.push(file_part1.into());
let result = execute_test(
Some(setup_actions),
Some(col("value").lt_eq(lit::<i32>(50))),
vec![file_part2_added],
vec![file_part1_new],
false,
)
.await;
assert!(
result.is_ok(),
"Disjoint partition writes should not conflict"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_conflicting_app_transactions() {
let file1 = simple_add(true, "1", "10").into();
let app_id = "streaming_query_1".to_string();
let txn_action = Action::Txn(Transaction {
app_id: app_id.clone(),
version: 1,
last_updated: None,
});
let current_actions = vec![txn_action.clone()];
let concurrent_actions = vec![txn_action, file1];
let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentTransaction)),
"Conflicting app transactions should fail"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_non_conflicting_different_app_transactions() {
let file1 = simple_add(true, "1", "10").into();
let app_id1 = "streaming_query_1".to_string();
let app_id2 = "streaming_query_2".to_string();
let txn_action1 = Action::Txn(Transaction {
app_id: app_id1,
version: 1,
last_updated: None,
});
let txn_action2 = Action::Txn(Transaction {
app_id: app_id2,
version: 1,
last_updated: None,
});
let current_actions = vec![txn_action1];
let concurrent_actions = vec![txn_action2, file1];
let result = execute_test(None, None, concurrent_actions, current_actions, false).await;
assert!(
result.is_ok(),
"Non-conflicting app transactions should succeed"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_replace_where_initial_empty_conflicts_on_concurrent_add() {
let mut setup_actions = init_table_actions();
setup_actions.push(simple_add(true, "1", "1").into());
let result = execute_test(
Some(setup_actions),
Some(col("value").gt_eq(lit::<i32>(2))), vec![simple_add(true, "3", "3").into()], vec![simple_add(true, "2", "2").into()],
false,
)
.await;
assert!(
matches!(result, Err(CommitConflictError::ConcurrentAppend)),
"ReplaceWhere-style empty read should conflict when a matching row is concurrently added"
);
}
#[tokio::test]
#[cfg(feature = "datafusion")]
async fn test_replace_where_disjoint_empty_allows_commit() {
let mut setup_actions = init_table_actions();
setup_actions.push(simple_add(true, "1", "1").into());
let result = execute_test(
Some(setup_actions),
Some(
col("value")
.gt(lit::<i32>(1))
.and(col("value").lt_eq(lit::<i32>(3))),
), vec![simple_add(true, "5", "5").into()], vec![simple_add(true, "2", "2").into()],
false,
)
.await;
assert!(
result.is_ok(),
"Disjoint replaceWhere-style transactions with empty reads should succeed"
);
}
}