use std::collections::{HashMap, HashSet};
use std::iter;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, LazyLock, OnceLock};
use delta_kernel_derive::internal_api;
use tracing::{info, instrument};
use crate::actions::{
as_log_add_schema, get_commit_schema, get_log_remove_schema, get_log_txn_schema, CommitInfo,
DomainMetadata, Metadata, Protocol, SetTransaction, METADATA_NAME, PROTOCOL_NAME,
};
use crate::committer::{
CommitMetadata, CommitProtocolMetadata, CommitResponse, CommitType, Committer,
};
use crate::crc::{CrcDelta, FileStatsDelta, LazyCrc};
use crate::engine_data::FilteredEngineData;
use crate::error::Error;
use crate::expressions::UnaryExpressionOp::ToJson;
use crate::expressions::{ArrayData, ColumnName, Scalar, Transform};
use crate::log_segment::LogSegment;
use crate::partition::serialization::serialize_partition_value;
use crate::partition::validation::validate_partition_values;
use crate::path::{LogRoot, ParsedLogPath};
use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor};
use crate::scan::data_skipping::stats_schema::schema_with_all_fields_nullable;
use crate::scan::log_replay::{
BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, FILE_CONSTANT_VALUES_NAME,
STATS_PARSED_NAME, TAGS_NAME,
};
use crate::scan::scan_row_schema;
use crate::schema::{ArrayType, MapType, SchemaRef, StructField, StructType, StructTypeBuilder};
use crate::snapshot::{Snapshot, SnapshotRef};
use crate::table_configuration::TableConfiguration;
use crate::table_features::TableFeature;
use crate::utils::require;
use crate::{
DataType, DeltaResult, Engine, EngineData, Expression, FileMeta, IntoEngineData, RowVisitor,
Version,
};
#[cfg(feature = "internal-api")]
pub mod builder;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod builder;
#[cfg(feature = "internal-api")]
pub mod create_table;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod create_table;
#[cfg(feature = "internal-api")]
pub mod data_layout;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod data_layout;
mod commit_info;
mod domain_metadata;
mod stats_verifier;
mod update;
mod write_context;
use stats_verifier::StatsVerifier;
use write_context::SharedWriteState;
pub use write_context::WriteContext;
pub(crate) type EngineDataResultIterator<'a> =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;
pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
]))
});
pub(crate) fn mandatory_add_file_schema() -> &'static SchemaRef {
&MANDATORY_ADD_FILE_SCHEMA
}
pub(crate) static BASE_ADD_FILES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
let stats = StructField::nullable(
"stats",
DataType::struct_type_unchecked(vec![
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", DataType::struct_type_unchecked(vec![])),
StructField::nullable("minValues", DataType::struct_type_unchecked(vec![])),
StructField::nullable("maxValues", DataType::struct_type_unchecked(vec![])),
StructField::nullable("tightBounds", DataType::BOOLEAN),
]),
);
StructTypeBuilder::from_schema(mandatory_add_file_schema())
.add_field(stats)
.build_arc_unchecked()
});
static DATA_CHANGE_COLUMN: LazyLock<StructField> =
LazyLock::new(|| StructField::not_null("dataChange", DataType::BOOLEAN));
static ADD_FILES_SCHEMA_WITH_DATA_CHANGE: LazyLock<SchemaRef> = LazyLock::new(|| {
let mut fields = BASE_ADD_FILES_SCHEMA.fields().collect::<Vec<_>>();
let len = fields.len();
let insert_position = fields
.iter()
.position(|f| f.name() == "modificationTime")
.unwrap_or(len);
fields.insert(insert_position + 1, &DATA_CHANGE_COLUMN);
Arc::new(StructType::new_unchecked(fields.into_iter().cloned()))
});
fn with_stats_col(schema: &SchemaRef) -> SchemaRef {
StructTypeBuilder::from_schema(schema)
.add_field(StructField::nullable("stats", DataType::STRING))
.build_arc_unchecked()
}
fn with_row_tracking_cols(schema: &SchemaRef) -> SchemaRef {
StructTypeBuilder::from_schema(schema)
.add_field(StructField::nullable("baseRowId", DataType::LONG))
.add_field(StructField::nullable(
"defaultRowCommitVersion",
DataType::LONG,
))
.build_arc_unchecked()
}
#[derive(Debug)]
pub struct ExistingTable;
#[derive(Debug)]
pub struct CreateTable;
pub struct Transaction<S = ExistingTable> {
span: tracing::Span,
read_snapshot_opt: Option<SnapshotRef>,
effective_table_config: TableConfiguration,
should_emit_protocol: bool,
should_emit_metadata: bool,
committer: Box<dyn Committer>,
operation: Option<String>,
engine_info: Option<String>,
engine_commit_info: Option<(Box<dyn EngineData>, SchemaRef)>,
add_files_metadata: Vec<Box<dyn EngineData>>,
remove_files_metadata: Vec<FilteredEngineData>,
set_transactions: Vec<SetTransaction>,
commit_timestamp: i64,
user_domain_metadata_additions: Vec<DomainMetadata>,
system_domain_metadata_additions: Vec<DomainMetadata>,
user_domain_removals: Vec<String>,
data_change: bool,
is_blind_append: bool,
dv_matched_files: Vec<FilteredEngineData>,
physical_clustering_columns: Option<Vec<ColumnName>>,
shared_write_state: OnceLock<Arc<SharedWriteState>>,
_state: PhantomData<S>,
}
impl<S> std::fmt::Debug for Transaction<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let version_info = match &self.read_snapshot_opt {
Some(snap) => format!("{}", snap.version()),
None => "create_table".to_string(),
};
f.write_str(&format!(
"Transaction {{ read_snapshot version: {}, engine_info: {} }}",
version_info,
self.engine_info.is_some()
))
}
}
fn build_add_actions<'a, I, T>(
engine: &dyn Engine,
add_files_metadata: I,
input_schema: SchemaRef,
output_schema: SchemaRef,
data_change: bool,
) -> impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a
where
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
T: Deref<Target = dyn EngineData> + Send + 'a,
{
let evaluation_handler = engine.evaluation_handler();
add_files_metadata.map(move |add_files_batch| {
let transform = Expression::transform(
Transform::new_top_level()
.with_inserted_field(
Some("modificationTime"),
Expression::literal(data_change).into(),
)
.with_replaced_field(
"stats",
Expression::unary(ToJson, Expression::column(["stats"])).into(),
),
);
let adds_expr = Expression::struct_from([transform]);
let adds_evaluator = evaluation_handler.new_expression_evaluator(
input_schema.clone(),
Arc::new(adds_expr),
as_log_add_schema(output_schema.clone()).into(),
)?;
adds_evaluator.evaluate(add_files_batch?.deref())
})
}
impl<S> Transaction<S> {
#[instrument(
parent = &self.span,
name = "txn.commit",
skip_all,
fields(
commit_version = self.get_commit_version(),
),
err
)]
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult<S>> {
info!(
num_add_files = self.add_files_metadata.len(),
num_remove_files = self.remove_files_metadata.len(),
num_dv_updates = self.dv_matched_files.len(),
);
let mut app_ids = HashSet::with_capacity(self.set_transactions.len());
if let Some(dup) = self
.set_transactions
.iter()
.find(|t| !app_ids.insert(&t.app_id))
{
return Err(Error::generic(format!(
"app_id {} already exists in transaction",
dup.app_id
)));
}
self.validate_blind_append_semantics()?;
if !self.is_create_table()
&& !self.add_files_metadata.is_empty()
&& !self.remove_files_metadata.is_empty()
&& self.data_change
{
let cdf_enabled = self
.effective_table_config
.table_properties()
.enable_change_data_feed
.unwrap_or(false);
require!(
!cdf_enabled,
Error::generic(
"Cannot add and remove data in the same transaction when Change Data Feed is enabled (delta.enableChangeDataFeed = true). \
This would require writing CDC files for DML operations, which is not yet supported. \
Consider using separate transactions: one to add files, another to remove files."
)
);
}
self.validate_add_files_stats(&self.add_files_metadata)?;
let set_transaction_actions = self
.set_transactions
.clone()
.into_iter()
.map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine));
let in_commit_timestamp = self.get_in_commit_timestamp(engine)?;
let kernel_commit_info = CommitInfo::new(
self.commit_timestamp,
in_commit_timestamp,
self.operation.clone(),
self.engine_info.clone(),
self.is_blind_append,
);
let commit_info_action = self.generate_commit_info(engine, kernel_commit_info);
let (protocol_action, protocol) = if self.should_emit_protocol {
let protocol = self.effective_table_config.protocol().clone();
let schema = get_commit_schema().project(&[PROTOCOL_NAME])?;
let action = protocol.clone().into_engine_data(schema, engine)?;
(Some(action), Some(protocol))
} else {
(None, None)
};
let (metadata_action, metadata) = if self.should_emit_metadata {
let metadata = self.effective_table_config.metadata().clone();
let schema = get_commit_schema().project(&[METADATA_NAME])?;
let action = metadata.clone().into_engine_data(schema, engine)?;
(Some(action), Some(metadata))
} else {
(None, None)
};
let commit_version = self.get_commit_version();
let (add_actions, row_tracking_domain_metadata) =
self.generate_adds(engine, commit_version)?;
let (domain_metadata_actions, dm_changes) =
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;
let dv_update_actions = self.generate_dv_update_actions(engine)?;
let remove_actions =
self.generate_remove_actions(engine, self.remove_files_metadata.iter(), &[])?;
let actions = iter::once(commit_info_action)
.chain(protocol_action.map(Ok))
.chain(metadata_action.map(Ok))
.chain(add_actions)
.chain(set_transaction_actions)
.chain(domain_metadata_actions);
let filtered_actions = actions
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected))
.chain(remove_actions)
.chain(dv_update_actions);
let commit_metadata = self.create_commit_metadata(
commit_version,
in_commit_timestamp,
protocol,
metadata,
dm_changes.clone(),
)?;
match self
.committer
.commit(engine, Box::new(filtered_actions), commit_metadata)
{
Ok(CommitResponse::Committed { file_meta }) => {
let bin_boundaries = self
.read_snapshot_opt
.as_ref()
.and_then(|snap| snap.get_file_stats_if_loaded())
.and_then(|s| s.file_size_histogram)
.map(|h| h.sorted_bin_boundaries);
let crc_delta = self.build_crc_delta(
in_commit_timestamp,
dm_changes,
bin_boundaries.as_deref(),
)?;
Ok(CommitResult::CommittedTransaction(
self.into_committed(file_meta, crc_delta)?,
))
}
Ok(CommitResponse::Conflict { version }) => Ok(CommitResult::ConflictedTransaction(
self.into_conflicted(version),
)),
Err(e @ Error::IOError(_)) => {
Ok(CommitResult::RetryableTransaction(self.into_retryable(e)))
}
Err(e) => Err(e),
}
}
pub fn with_data_change(mut self, data_change: bool) -> Self {
self.data_change = data_change;
self
}
#[internal_api]
#[allow(dead_code)] pub(crate) fn set_data_change(&mut self, data_change: bool) {
self.data_change = data_change;
}
pub fn with_engine_info(mut self, engine_info: impl Into<String>) -> Self {
self.engine_info = Some(engine_info.into());
self
}
pub fn with_commit_info(
mut self,
engine_commit_info: Box<dyn EngineData>,
commit_info_schema: SchemaRef,
) -> Self {
self.engine_commit_info = Some((engine_commit_info, commit_info_schema));
self
}
pub fn with_transaction_id(mut self, app_id: String, version: i64) -> Self {
let set_transaction = SetTransaction::new(app_id, version, Some(self.commit_timestamp));
self.set_transactions.push(set_transaction);
self
}
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
self.user_domain_metadata_additions
.push(DomainMetadata::new(domain, configuration));
self
}
fn determine_commit_type(
is_create: bool,
table_config: &crate::table_configuration::TableConfiguration,
) -> CommitType {
let is_catalog_managed = table_config.is_catalog_managed();
match (is_create, is_catalog_managed) {
(true, true) => CommitType::CatalogManagedCreate,
(true, false) => CommitType::PathBasedCreate,
(false, true) => CommitType::CatalogManagedWrite,
(false, false) => CommitType::PathBasedWrite,
}
}
fn validate_commit_type(
is_catalog_committer: bool,
commit_type: &CommitType,
) -> DeltaResult<()> {
match (
is_catalog_committer,
commit_type.requires_catalog_committer(),
) {
(true, true) | (false, false) => Ok(()),
(false, true) => Err(Error::generic(
"This table is catalog-managed and requires a catalog committer. \
Please provide a catalog committer via Snapshot::transaction().",
)),
(true, false) => Err(Error::generic(
"This table is path-based and cannot be committed to with a catalog committer.",
)),
}
}
fn create_commit_metadata(
&self,
commit_version: Version,
in_commit_timestamp: Option<i64>,
new_protocol: Option<Protocol>,
new_metadata: Option<Metadata>,
domain_metadata_changes: Vec<crate::actions::DomainMetadata>,
) -> DeltaResult<CommitMetadata> {
let log_root = LogRoot::new(self.effective_table_config.table_root().clone())?;
let is_create = self.is_create_table();
let commit_type = Self::determine_commit_type(is_create, &self.effective_table_config);
Self::validate_commit_type(self.committer.is_catalog_committer(), &commit_type)?;
let (read_protocol, read_metadata, max_published_version) = if is_create {
(None, None, None)
} else {
let snap = self.read_snapshot()?;
let read_config = snap.table_configuration();
(
Some(read_config.protocol().clone()),
Some(read_config.metadata().clone()),
snap.log_segment().listed.max_published_version,
)
};
let protocol_metadata = CommitProtocolMetadata::try_new(
read_protocol,
read_metadata,
new_protocol,
new_metadata,
)?;
Ok(CommitMetadata::new(
log_root,
commit_version,
commit_type,
in_commit_timestamp.unwrap_or(self.commit_timestamp),
max_published_version,
protocol_metadata,
domain_metadata_changes,
))
}
fn validate_blind_append_semantics(&self) -> DeltaResult<()> {
if !self.is_blind_append {
return Ok(());
}
require!(
!self.is_create_table(),
Error::invalid_transaction_state(
"Blind append is not supported for create-table transactions",
)
);
require!(
!self.add_files_metadata.is_empty(),
Error::invalid_transaction_state("Blind append requires at least one added data file")
);
require!(
self.data_change,
Error::invalid_transaction_state("Blind append requires data_change to be true")
);
require!(
self.remove_files_metadata.is_empty(),
Error::invalid_transaction_state("Blind append cannot remove files")
);
require!(
self.dv_matched_files.is_empty(),
Error::invalid_transaction_state("Blind append cannot update deletion vectors")
);
Ok(())
}
fn is_create_table(&self) -> bool {
debug_assert!(
self.operation.as_deref() != Some("CREATE TABLE") || self.read_snapshot_opt.is_none(),
"CREATE TABLE operation should not have a read snapshot"
);
self.read_snapshot_opt.is_none()
}
fn read_snapshot(&self) -> DeltaResult<&Snapshot> {
self.read_snapshot_opt.as_deref().ok_or_else(|| {
Error::internal_error("read_snapshot() called on create-table transaction")
})
}
fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
let has_ict = self
.effective_table_config
.is_feature_enabled(&TableFeature::InCommitTimestamp);
if !has_ict {
return Ok(None);
}
if self.is_create_table() {
return Ok(Some(self.commit_timestamp));
}
Ok(self
.read_snapshot()?
.get_in_commit_timestamp(engine)?
.map(|prev_ict| self.commit_timestamp.max(prev_ict + 1)))
}
fn get_commit_version(&self) -> Version {
match &self.read_snapshot_opt {
Some(snap) => snap.version() + 1,
None => 0,
}
}
pub fn add_files_schema(&self) -> &'static SchemaRef {
&BASE_ADD_FILES_SCHEMA
}
#[allow(unused)]
pub fn stats_schema(&self) -> DeltaResult<SchemaRef> {
let stats_schemas = self
.effective_table_config
.build_expected_stats_schemas(self.physical_clustering_columns.as_deref(), None)?;
Ok(stats_schemas.physical)
}
#[allow(unused)]
pub fn stats_columns(&self) -> Vec<ColumnName> {
self.effective_table_config
.physical_stats_column_names(self.physical_clustering_columns.as_deref())
}
fn generate_logical_to_physical(&self) -> Expression {
let partition_cols = self.effective_table_config.partition_columns().to_vec();
let materialize_partition_columns = self
.effective_table_config
.is_feature_enabled(&TableFeature::MaterializePartitionColumns);
let mut transform = Transform::new_top_level();
if !materialize_partition_columns {
for col in &partition_cols {
transform = transform.with_dropped_field_if_exists(col);
}
}
Expression::transform(transform)
}
pub fn logical_partition_columns(&self) -> &[String] {
self.effective_table_config.partition_columns()
}
fn shared_write_state(&self) -> &Arc<SharedWriteState> {
self.shared_write_state.get_or_init(|| {
let table_config = &self.effective_table_config;
Arc::new(SharedWriteState {
table_root: table_config.table_root().clone(),
logical_schema: table_config.logical_schema(),
physical_schema: table_config.physical_write_schema(),
logical_to_physical: Arc::new(self.generate_logical_to_physical()),
column_mapping_mode: table_config.column_mapping_mode(),
stats_columns: self.stats_columns(),
logical_partition_columns: table_config.partition_columns().to_vec(),
})
})
}
pub fn partitioned_write_context(
&self,
partition_values: HashMap<String, Scalar>,
) -> DeltaResult<WriteContext> {
let shared = self.shared_write_state();
require!(
!shared.logical_partition_columns.is_empty(),
Error::generic("table is not partitioned; use unpartitioned_write_context() instead")
);
let normalized = validate_partition_values(
&shared.logical_partition_columns,
&shared.logical_schema,
partition_values,
)?;
let mut serialized = HashMap::with_capacity(normalized.len());
for logical_name in &shared.logical_partition_columns {
let scalar = normalized.get(logical_name).ok_or_else(|| {
Error::internal_error(format!(
"partition column '{logical_name}' missing after validation"
))
})?;
let value = serialize_partition_value(scalar)?;
let physical_name = shared
.logical_schema
.field(logical_name)
.ok_or_else(|| {
Error::internal_error(format!(
"partition column '{logical_name}' not found in schema after validation"
))
})?
.physical_name(shared.column_mapping_mode)
.to_string();
serialized.insert(physical_name, value);
}
Ok(WriteContext {
shared: shared.clone(),
physical_partition_values: serialized,
})
}
pub fn unpartitioned_write_context(&self) -> DeltaResult<WriteContext> {
let shared = self.shared_write_state();
require!(
shared.logical_partition_columns.is_empty(),
Error::generic("table is partitioned; use partitioned_write_context() instead")
);
Ok(WriteContext {
shared: shared.clone(),
physical_partition_values: HashMap::new(),
})
}
pub fn add_files(&mut self, add_metadata: Box<dyn EngineData>) {
self.add_files_metadata.push(add_metadata);
}
fn validate_add_files_stats(&self, add_files: &[Box<dyn EngineData>]) -> DeltaResult<()> {
if add_files.is_empty() {
return Ok(());
}
if let Some(ref clustering_cols) = self.physical_clustering_columns {
if !clustering_cols.is_empty() {
let physical_schema = self.effective_table_config.physical_schema();
let columns_with_types: Vec<(ColumnName, DataType)> = clustering_cols
.iter()
.map(|col| {
let data_type = physical_schema
.walk_column_fields(col)?
.last()
.map(|field| field.data_type().clone())
.ok_or_else(|| {
Error::internal_error(format!(
"Required column '{col}' not found in table schema"
))
})?;
Ok((col.clone(), data_type))
})
.collect::<DeltaResult<_>>()?;
let verifier = StatsVerifier::new(columns_with_types);
verifier.verify(add_files)?;
}
}
Ok(())
}
#[instrument(name = "txn.gen_adds", skip_all, err)]
fn generate_adds<'a>(
&'a self,
engine: &dyn Engine,
commit_version: u64,
) -> DeltaResult<(
EngineDataResultIterator<'a>,
Option<RowTrackingDomainMetadata>,
)> {
let row_tracking_supported = self.effective_table_config.should_write_row_tracking();
if self.add_files_metadata.is_empty() {
let row_tracking_dm = (row_tracking_supported && self.is_create_table())
.then(RowTrackingDomainMetadata::initial);
return Ok((Box::new(iter::empty()), row_tracking_dm));
}
let commit_version = i64::try_from(commit_version)
.map_err(|_| Error::generic("Commit version too large to fit in i64"))?;
if row_tracking_supported {
self.generate_adds_with_row_tracking(engine, commit_version)
} else {
let add_actions = build_add_actions(
engine,
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
self.add_files_schema().clone(),
with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone()),
self.data_change,
);
Ok((Box::new(add_actions), None))
}
}
fn generate_adds_with_row_tracking<'a>(
&'a self,
engine: &dyn Engine,
commit_version: i64,
) -> DeltaResult<(
EngineDataResultIterator<'a>,
Option<RowTrackingDomainMetadata>,
)> {
let row_id_high_water_mark = if self.is_create_table() {
None
} else {
RowTrackingDomainMetadata::get_high_water_mark(self.read_snapshot()?, engine)?
};
let mut row_tracking_visitor =
RowTrackingVisitor::new(row_id_high_water_mark, Some(self.add_files_metadata.len()));
for add_files_batch in &self.add_files_metadata {
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
}
let RowTrackingVisitor {
base_row_id_batches,
row_id_high_water_mark,
} = row_tracking_visitor;
let extended_add_files = self.add_files_metadata.iter().zip(base_row_id_batches).map(
move |(add_files_batch, base_row_ids)| {
let commit_versions = vec![commit_version; base_row_ids.len()];
let base_row_ids_array =
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
let commit_versions_array =
ArrayData::try_new(ArrayType::new(DataType::LONG, true), commit_versions)?;
add_files_batch.append_columns(
with_row_tracking_cols(&Arc::new(StructType::new_unchecked(vec![]))),
vec![base_row_ids_array, commit_versions_array],
)
},
);
let add_actions = build_add_actions(
engine,
extended_add_files,
with_row_tracking_cols(self.add_files_schema()),
with_row_tracking_cols(&with_stats_col(&ADD_FILES_SCHEMA_WITH_DATA_CHANGE.clone())),
self.data_change,
);
let row_tracking_domain_metadata: RowTrackingDomainMetadata =
RowTrackingDomainMetadata::new(row_id_high_water_mark);
Ok((Box::new(add_actions), Some(row_tracking_domain_metadata)))
}
fn into_committed(
self,
file_meta: FileMeta,
crc_delta: CrcDelta,
) -> DeltaResult<CommittedTransaction> {
let parsed_commit = ParsedLogPath::parse_commit(file_meta)?;
let commit_version = parsed_commit.version;
let (post_commit_stats, post_commit_snapshot) = match &self.read_snapshot_opt {
Some(snap) => {
let stats = PostCommitStats {
commits_since_checkpoint: snap.log_segment().commits_since_checkpoint() + 1,
commits_since_log_compaction: snap
.log_segment()
.commits_since_log_compaction_or_checkpoint()
+ 1,
};
let snapshot = snap.new_post_commit(parsed_commit, crc_delta)?;
(stats, Arc::new(snapshot))
}
None => {
let log_root = self
.effective_table_config
.table_root()
.join("_delta_log/")?;
let log_segment = LogSegment::new_for_version_zero(log_root, parsed_commit)?;
let crc = crc_delta.into_crc_for_version_zero().ok_or_else(|| {
Error::internal_error("CREATE TABLE CRC delta is missing protocol or metadata")
})?;
let stats = PostCommitStats {
commits_since_checkpoint: 1,
commits_since_log_compaction: 1,
};
let snapshot = Snapshot::new_with_crc(
log_segment,
self.effective_table_config,
Arc::new(LazyCrc::new_precomputed(crc, 0)),
);
(stats, Arc::new(snapshot))
}
};
Ok(CommittedTransaction {
commit_version,
post_commit_stats,
post_commit_snapshot: Some(post_commit_snapshot),
})
}
fn build_crc_delta(
&self,
in_commit_timestamp: Option<i64>,
dm_changes: Vec<DomainMetadata>,
bin_boundaries: Option<&[i64]>,
) -> DeltaResult<CrcDelta> {
let file_stats = FileStatsDelta::try_compute_for_txn(
&self.add_files_metadata,
&self.remove_files_metadata,
bin_boundaries,
)?;
Ok(CrcDelta {
file_stats,
protocol: self
.should_emit_protocol
.then(|| self.effective_table_config.protocol().clone()),
metadata: self
.should_emit_metadata
.then(|| self.effective_table_config.metadata().clone()),
domain_metadata_changes: dm_changes,
set_transaction_changes: self.set_transactions.clone(),
in_commit_timestamp,
operation: self.operation.clone(),
has_missing_file_size: false, })
}
fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction<S> {
ConflictedTransaction {
transaction: self,
conflict_version,
}
}
fn into_retryable(self, error: Error) -> RetryableTransaction<S> {
RetryableTransaction {
transaction: self,
error,
}
}
#[instrument(name = "txn.gen_removes", skip_all, err)]
fn generate_remove_actions<'a>(
&'a self,
engine: &dyn Engine,
remove_files_metadata: impl Iterator<Item = &'a FilteredEngineData> + Send + 'a,
columns_to_drop: &'a [&str],
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
if self.is_create_table() && !self.remove_files_metadata.is_empty() {
return Err(Error::internal_error(
"CREATE TABLE transaction cannot have remove actions",
));
}
let input_schema = scan_row_schema();
let target_schema = schema_with_all_fields_nullable(get_log_remove_schema())?;
let evaluation_handler = engine.evaluation_handler();
let make_eval = |coalesce_stats_with_parsed: bool| -> DeltaResult<_> {
let transform = build_remove_transform(
self.commit_timestamp,
self.data_change,
columns_to_drop,
coalesce_stats_with_parsed,
);
let expr = Arc::new(Expression::struct_from([Expression::transform(transform)]));
evaluation_handler.new_expression_evaluator(
input_schema.clone(),
expr,
target_schema.clone().into(),
)
};
let base_eval = Arc::new(make_eval(false)?);
let stats_parsed_eval = Arc::new(make_eval(true)?);
let stats_parsed_col = ColumnName::new([STATS_PARSED_NAME]);
Ok(remove_files_metadata.map(move |file_metadata_batch| {
let data = file_metadata_batch.data();
let evaluator = if data.has_field(&stats_parsed_col) {
&stats_parsed_eval
} else {
&base_eval
};
let updated_engine_data = evaluator.evaluate(data)?;
FilteredEngineData::try_new(
updated_engine_data,
file_metadata_batch.selection_vector().to_vec(),
)
}))
}
}
fn build_remove_transform(
commit_timestamp: i64,
data_change: bool,
columns_to_drop: &[&str],
coalesce_stats_with_parsed: bool,
) -> Transform {
let mut transform = Transform::new_top_level()
.with_inserted_field(Some("path"), Expression::literal(commit_timestamp).into())
.with_inserted_field(Some("path"), Expression::literal(data_change).into())
.with_inserted_field(Some("path"), Expression::literal(true).into())
.with_inserted_field(
Some("path"),
Expression::column([FILE_CONSTANT_VALUES_NAME, "partitionValues"]).into(),
);
if coalesce_stats_with_parsed {
let coalesce_stats = Expression::coalesce([
Expression::column(["stats"]),
Expression::unary(ToJson, Expression::column([STATS_PARSED_NAME])),
]);
transform = transform
.with_replaced_field("stats", coalesce_stats.into())
.with_inserted_field(
Some("stats"),
Expression::column([FILE_CONSTANT_VALUES_NAME, TAGS_NAME]).into(),
)
.with_dropped_field_if_exists(STATS_PARSED_NAME);
} else {
transform = transform.with_inserted_field(
Some("stats"),
Expression::column([FILE_CONSTANT_VALUES_NAME, TAGS_NAME]).into(),
);
}
transform = transform
.with_inserted_field(
Some("deletionVector"),
Expression::column([FILE_CONSTANT_VALUES_NAME, BASE_ROW_ID_NAME]).into(),
)
.with_inserted_field(
Some("deletionVector"),
Expression::column([FILE_CONSTANT_VALUES_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME]).into(),
)
.with_dropped_field(FILE_CONSTANT_VALUES_NAME)
.with_dropped_field("modificationTime");
for column_to_drop in columns_to_drop {
transform = transform.with_dropped_field(*column_to_drop);
}
transform
}
#[derive(Debug)]
pub struct PostCommitStats {
pub commits_since_checkpoint: u64,
pub commits_since_log_compaction: u64,
}
#[derive(Debug)]
#[must_use]
pub enum CommitResult<S = ExistingTable> {
CommittedTransaction(CommittedTransaction),
ConflictedTransaction(ConflictedTransaction<S>),
RetryableTransaction(RetryableTransaction<S>),
}
impl<S> CommitResult<S> {
pub fn is_committed(&self) -> bool {
matches!(self, CommitResult::CommittedTransaction(_))
}
}
impl<S: std::fmt::Debug> CommitResult<S> {
#[cfg(any(test, feature = "test-utils"))]
#[allow(clippy::panic)]
pub fn unwrap_committed(self) -> CommittedTransaction {
match self {
CommitResult::CommittedTransaction(c) => c,
other => panic!("Expected CommittedTransaction, got: {other:?}"),
}
}
}
#[derive(Debug)]
pub struct CommittedTransaction {
commit_version: Version,
post_commit_stats: PostCommitStats,
post_commit_snapshot: Option<SnapshotRef>,
}
impl CommittedTransaction {
pub fn commit_version(&self) -> Version {
self.commit_version
}
pub fn post_commit_stats(&self) -> &PostCommitStats {
&self.post_commit_stats
}
pub fn post_commit_snapshot(&self) -> Option<&SnapshotRef> {
self.post_commit_snapshot.as_ref()
}
}
#[derive(Debug)]
pub struct ConflictedTransaction<S = ExistingTable> {
#[allow(dead_code)]
transaction: Transaction<S>,
conflict_version: Version,
}
impl<S> ConflictedTransaction<S> {
pub fn conflict_version(&self) -> Version {
self.conflict_version
}
}
#[derive(Debug)]
pub struct RetryableTransaction<S = ExistingTable> {
pub transaction: Transaction<S>,
pub error: Error,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use rstest::rstest;
use url::Url;
use super::*;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::CommitInfo;
use crate::arrow::array::{ArrayRef, Int64Array, StringArray};
use crate::arrow::datatypes::Schema as ArrowSchema;
use crate::arrow::record_batch::RecordBatch;
use crate::committer::{FileSystemCommitter, PublishMetadata};
use crate::engine::arrow_conversion::TryIntoArrow;
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_expression::ArrowEvaluationHandler;
use crate::engine::sync::SyncEngine;
use crate::expressions::{MapData, Scalar, StructData};
use crate::object_store::local::LocalFileSystem;
use crate::object_store::memory::InMemory;
use crate::object_store::path::Path;
use crate::object_store::ObjectStoreExt as _;
use crate::schema::MapType;
use crate::table_features::ColumnMappingMode;
use crate::transaction::create_table::create_table;
use crate::utils::test_utils::{
load_test_table, string_array_to_engine_data, test_schema_flat, test_schema_nested,
test_schema_with_array, test_schema_with_map,
};
use crate::{EvaluationHandler, Snapshot};
impl Transaction {
fn with_clustering_columns_for_test(mut self, columns: Vec<ColumnName>) -> Self {
self.physical_clustering_columns = Some(columns);
self
}
}
struct IoErrorCommitter;
impl Committer for IoErrorCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
_commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
Err(Error::IOError(std::io::Error::other("simulated IO error")))
}
fn is_catalog_committer(&self) -> bool {
false
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
struct MockCatalogCommitter;
impl Committer for MockCatalogCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
_commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
Ok(CommitResponse::Conflict { version: 0 })
}
fn is_catalog_committer(&self) -> bool {
true
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
fn setup_dv_enabled_table() -> (SyncEngine, Arc<Snapshot>) {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(&engine)
.unwrap();
(engine, snapshot)
}
fn setup_non_dv_table() -> (SyncEngine, Arc<Snapshot>) {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
(engine, snapshot)
}
fn create_test_dv_descriptor(path_suffix: &str) -> DeletionVectorDescriptor {
use crate::actions::deletion_vector::{
DeletionVectorDescriptor, DeletionVectorStorageType,
};
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: format!("dv_{path_suffix}"),
offset: Some(0),
size_in_bytes: 100,
cardinality: 1,
}
}
fn create_dv_transaction(
snapshot: Arc<Snapshot>,
engine: &dyn Engine,
) -> DeltaResult<Transaction> {
Ok(snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine)?
.with_operation("DELETE".to_string())
.with_engine_info("test_engine"))
}
#[test]
fn test_add_files_schema() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(&engine)
.unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let schema = txn.add_files_schema();
let expected = StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::nullable(
"stats",
DataType::struct_type_unchecked(vec![
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", DataType::struct_type_unchecked(vec![])),
StructField::nullable("minValues", DataType::struct_type_unchecked(vec![])),
StructField::nullable("maxValues", DataType::struct_type_unchecked(vec![])),
StructField::nullable("tightBounds", DataType::BOOLEAN),
]),
),
]);
assert_eq!(*schema, expected.into());
Ok(())
}
#[test]
fn test_new_deletion_vector_path() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url.clone())
.at_version(1)
.build(&engine)
.unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let write_context = txn.unpartitioned_write_context().unwrap();
let dv_path1 = write_context.new_deletion_vector_path(String::from(""));
let abs_path1 = dv_path1.absolute_path()?;
assert!(abs_path1.as_str().contains(url.as_str()));
let prefix = String::from("dv_test");
let dv_path2 = write_context.new_deletion_vector_path(prefix.clone());
let abs_path2 = dv_path2.absolute_path()?;
assert!(abs_path2.as_str().contains(url.as_str()));
assert!(abs_path2.as_str().contains(&prefix));
let dv_path3 = write_context.new_deletion_vector_path(prefix.clone());
let abs_path3 = dv_path3.absolute_path()?;
assert_ne!(abs_path2, abs_path3);
Ok(())
}
#[test]
fn test_physical_schema_excludes_partition_columns() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let write_context = txn.partitioned_write_context(HashMap::from([(
"letter".to_string(),
Scalar::String("a".into()),
)]))?;
let logical_schema = write_context.logical_schema();
let physical_schema = write_context.physical_schema();
assert!(
logical_schema.contains("letter"),
"Logical schema should contain partition column 'letter'"
);
assert!(
!physical_schema.contains("letter"),
"Physical schema should not contain partition column 'letter' (stored in path)"
);
assert!(
logical_schema.contains("number"),
"Logical schema should contain data column 'number'"
);
assert!(
physical_schema.contains("number"),
"Physical schema should contain data column 'number'"
);
Ok(())
}
fn snapshot_and_partitioned_write_context(
table_path: &str,
) -> Result<(Arc<Snapshot>, WriteContext), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from(table_path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let partition_cols = txn.logical_partition_columns();
assert!(
!partition_cols.is_empty(),
"expected a partitioned table at {table_path}"
);
let schema = snapshot.schema();
let partition_vals: HashMap<String, Scalar> = partition_cols
.iter()
.map(|col| {
let dt = schema.field(col).unwrap().data_type().clone();
(col.clone(), Scalar::Null(dt))
})
.collect();
let wc = txn.partitioned_write_context(partition_vals)?;
Ok((snapshot, wc))
}
fn eval_logical_to_physical(
wc: &WriteContext,
batch: RecordBatch,
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
let logical_schema = wc.logical_schema();
let physical_schema = wc.physical_schema();
let l2p = wc.logical_to_physical();
let handler = ArrowEvaluationHandler;
let evaluator = handler.new_expression_evaluator(
logical_schema.clone(),
l2p,
physical_schema.clone().into(),
)?;
let result = ArrowEngineData::try_from_engine_data(
evaluator.evaluate(&ArrowEngineData::new(batch))?,
)?;
Ok(result.record_batch().clone())
}
#[test]
fn test_materialize_partition_columns_in_write_context(
) -> Result<(), Box<dyn std::error::Error>> {
let (snap_without, wc_without) =
snapshot_and_partitioned_write_context("./tests/data/basic_partitioned/")?;
let partition_cols = snap_without.table_configuration().partition_columns();
assert_eq!(partition_cols.len(), 1);
assert_eq!(partition_cols[0], "letter");
assert!(
!snap_without
.table_configuration()
.protocol()
.has_table_feature(&TableFeature::MaterializePartitionColumns),
"basic_partitioned should not have materializePartitionColumns feature"
);
let expr_str = format!("{}", wc_without.logical_to_physical());
assert!(
expr_str.contains("drop letter"),
"Partition column 'letter' should be dropped. Expression: {expr_str}"
);
let (snap_with, wc_with) = snapshot_and_partitioned_write_context(
"./tests/data/partitioned_with_materialize_feature/",
)?;
let partition_cols = snap_with.table_configuration().partition_columns();
assert_eq!(partition_cols.len(), 1);
assert_eq!(partition_cols[0], "letter");
assert!(
snap_with
.table_configuration()
.protocol()
.has_table_feature(&TableFeature::MaterializePartitionColumns),
"partitioned_with_materialize_feature should have materializePartitionColumns feature"
);
let expr_str = format!("{}", wc_with.logical_to_physical());
assert!(
!expr_str.contains("drop"),
"No columns should be dropped with materializePartitionColumns. Expression: {expr_str}"
);
Ok(())
}
#[test]
fn test_physical_schema_includes_partition_columns_when_materialized(
) -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from(
"./tests/data/partitioned_with_materialize_feature/",
))
.unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).at_version(1).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let write_context = txn.partitioned_write_context(HashMap::from([(
"letter".to_string(),
Scalar::String("a".into()),
)]))?;
let physical_schema = write_context.physical_schema();
assert!(
physical_schema.contains("letter"),
"Partition column 'letter' should be in physical schema when materialized"
);
assert!(
physical_schema.contains("number"),
"Non-partition column 'number' should be in physical schema"
);
Ok(())
}
#[rstest]
#[case::partitioned_on_unpartitioned(
"./tests/data/table-without-dv-small/",
true,
"not partitioned"
)]
#[case::unpartitioned_on_partitioned(
"./tests/data/basic_partitioned/",
false,
"table is partitioned"
)]
fn test_wrong_write_context_method_returns_error(
#[case] table_path: &str,
#[case] call_partitioned: bool,
#[case] expected_msg: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from(table_path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let result = if call_partitioned {
txn.partitioned_write_context(HashMap::from([("x".to_string(), Scalar::Integer(1))]))
} else {
txn.unpartitioned_write_context()
};
let err = result.unwrap_err().to_string();
assert!(
err.contains(expected_msg),
"expected '{expected_msg}' in error, got: {err}"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_unsupported_table() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_non_dv_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let dv_map = HashMap::new();
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
let err = result.expect_err("Should fail on table without DV support");
let err_msg = err.to_string();
assert!(
err_msg.contains("Deletion vector")
&& (err_msg.contains("require") || err_msg.contains("version")),
"Expected protocol error about DV requirements, got: {err_msg}"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_mismatch_count() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let mut dv_map = HashMap::new();
let descriptor = create_test_dv_descriptor("non_existent");
dv_map.insert("non_existent_file.parquet".to_string(), descriptor);
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
assert!(
result.is_err(),
"Should fail when DV descriptors don't match scan files"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("matched") && err_msg.contains("does not match"),
"Expected error about mismatched count (expected 1 descriptor, 0 matched files), got: {err_msg}");
Ok(())
}
#[test]
fn test_update_deletion_vectors_empty_inputs() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let dv_map = HashMap::new();
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
assert!(
result.is_ok(),
"Empty DV updates should succeed as no-op, got error: {result:?}"
);
Ok(())
}
fn add_dummy_file<S>(txn: &mut Transaction<S>) {
let data = string_array_to_engine_data(StringArray::from(vec!["dummy"]));
txn.add_files(data);
}
fn create_existing_table_txn(
) -> DeltaResult<(Arc<dyn Engine>, Transaction, Option<tempfile::TempDir>)> {
let (engine, snapshot, tempdir) = load_test_table("table-without-dv-small")?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?;
Ok((engine, txn, tempdir))
}
#[test]
fn test_validate_blind_append_success() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
txn.validate_blind_append_semantics()?;
Ok(())
}
#[test]
fn test_validate_blind_append_requires_adds() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_requires_data_change() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
txn.set_data_change(false);
add_dummy_file(&mut txn);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_removes() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let remove_data = FilteredEngineData::with_all_rows_selected(string_array_to_engine_data(
StringArray::from(vec!["remove"]),
));
txn.remove_files(remove_data);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_dv_updates() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let dv_data = FilteredEngineData::with_all_rows_selected(string_array_to_engine_data(
StringArray::from(vec!["dv"]),
));
txn.dv_matched_files.push(dv_data);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_create_table() -> DeltaResult<()> {
let tempdir = tempfile::tempdir()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let store = Arc::new(LocalFileSystem::new());
let engine = Arc::new(crate::engine::default::DefaultEngineBuilder::new(store).build());
let mut txn = create_table(
tempdir.path().to_str().expect("valid temp path"),
schema,
"test_engine",
)
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
txn.is_blind_append = true;
add_dummy_file(&mut txn);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_blind_append_sets_commit_info_flag() -> Result<(), Box<dyn std::error::Error>> {
let commit_info = CommitInfo::new(1, None, None, None, true);
assert_eq!(commit_info.is_blind_append, Some(true));
let commit_info_false = CommitInfo::new(1, None, None, None, false);
assert_eq!(commit_info_false.is_blind_append, None);
Ok(())
}
#[test]
fn test_blind_append_commit_rejects_no_adds() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
let err = txn
.commit(_engine.as_ref())
.expect_err("Blind append with no adds should fail");
assert!(
err.to_string()
.contains("Blind append requires at least one added data file"),
"Unexpected error: {err}"
);
Ok(())
}
#[test]
fn test_blind_append_commit_success() -> DeltaResult<()> {
let (engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let result = txn.commit(engine.as_ref());
if let Err(e) = result {
assert!(
!matches!(e, Error::InvalidTransactionState(_)),
"Blind append validation should have passed, got: {e}"
);
}
Ok(())
}
#[test]
fn test_commit_io_error_returns_retryable_transaction() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let mut txn = snapshot.transaction(Box::new(IoErrorCommitter), engine.as_ref())?;
add_dummy_file(&mut txn);
let result = txn.commit(engine.as_ref())?;
assert!(
matches!(result, CommitResult::RetryableTransaction(_)),
"Expected RetryableTransaction, got: {result:?}"
);
if let CommitResult::RetryableTransaction(retryable) = result {
assert!(
retryable.error.to_string().contains("simulated IO error"),
"Unexpected error: {}",
retryable.error
);
}
Ok(())
}
#[test]
fn test_existing_table_txn_debug() -> DeltaResult<()> {
let (_engine, txn, _tempdir) = create_existing_table_txn()?;
let debug_str = format!("{txn:?}");
assert!(
debug_str.contains("Transaction") && debug_str.contains("read_snapshot version"),
"Debug output should contain Transaction info: {debug_str}"
);
assert!(
!debug_str.contains("create_table"),
"Existing table debug should not contain create_table: {debug_str}"
);
Ok(())
}
#[rstest]
#[case::flat_none(test_schema_flat(), ColumnMappingMode::None)]
#[case::flat_name(test_schema_flat(), ColumnMappingMode::Name)]
#[case::flat_id(test_schema_flat(), ColumnMappingMode::Id)]
#[case::nested_none(test_schema_nested(), ColumnMappingMode::None)]
#[case::nested_name(test_schema_nested(), ColumnMappingMode::Name)]
#[case::nested_id(test_schema_nested(), ColumnMappingMode::Id)]
#[case::map_none(test_schema_with_map(), ColumnMappingMode::None)]
#[case::map_name(test_schema_with_map(), ColumnMappingMode::Name)]
#[case::map_id(test_schema_with_map(), ColumnMappingMode::Id)]
#[case::array_none(test_schema_with_array(), ColumnMappingMode::None)]
#[case::array_name(test_schema_with_array(), ColumnMappingMode::Name)]
#[case::array_id(test_schema_with_array(), ColumnMappingMode::Id)]
fn test_physical_schema_column_mapping(
#[case] schema: SchemaRef,
#[case] mode: ColumnMappingMode,
) -> DeltaResult<()> {
let (_engine, txn) = crate::utils::test_utils::setup_column_mapping_txn(schema, mode)?;
let write_context = txn.unpartitioned_write_context().unwrap();
crate::utils::test_utils::validate_physical_schema_column_mapping(
write_context.logical_schema(),
write_context.physical_schema(),
mode,
);
Ok(())
}
fn build_test_record_batch() -> DeltaResult<Box<dyn EngineData>> {
let schema = test_schema_nested();
let tag_type = MapType::new(DataType::STRING, DataType::STRING, true);
let score_type = ArrayType::new(DataType::INTEGER, true);
let info_fields = vec![
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
StructField::nullable("tags", tag_type.clone()),
StructField::nullable("scores", score_type.clone()),
];
let info1 = Scalar::Struct(StructData::try_new(
info_fields.clone(),
vec![
"alice".into(),
30i32.into(),
Scalar::Map(MapData::try_new(tag_type.clone(), [("k1", "v1")])?),
Scalar::Array(ArrayData::try_new(score_type.clone(), [10i32, 20i32])?),
],
)?);
let info2 = Scalar::Struct(StructData::try_new(
info_fields,
vec![
"bob".into(),
25i32.into(),
Scalar::Map(MapData::try_new(tag_type, [("k2", "v2")])?),
Scalar::Array(ArrayData::try_new(score_type, [30i32])?),
],
)?);
ArrowEvaluationHandler.create_many(schema, &[&[1i64.into(), info1], &[2i64.into(), info2]])
}
fn validate_logical_to_physical_transform(mode: ColumnMappingMode) -> DeltaResult<()> {
let schema = test_schema_nested();
let (_engine, txn) = crate::utils::test_utils::setup_column_mapping_txn(schema, mode)?;
let write_context = txn.unpartitioned_write_context().unwrap();
let logical_schema = write_context.logical_schema();
let physical_schema = write_context.physical_schema();
let logical_to_physical_expression = write_context.logical_to_physical();
if mode != ColumnMappingMode::None {
assert_ne!(
logical_schema, physical_schema,
"Physical schema should differ from logical schema when column mapping is enabled"
);
}
let data = build_test_record_batch()?;
let input_schema: SchemaRef = logical_schema.clone();
let handler = ArrowEvaluationHandler;
let evaluator = handler.new_expression_evaluator(
input_schema,
logical_to_physical_expression.clone(),
physical_schema.clone().into(),
)?;
let result = evaluator.evaluate(data.as_ref())?;
let result = ArrowEngineData::try_from_engine_data(result)?;
let result_batch = result.record_batch();
let expected_arrow_schema: ArrowSchema = physical_schema.as_ref().try_into_arrow()?;
assert_eq!(result_batch.schema().as_ref(), &expected_arrow_schema);
let id_col = result_batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("id column should be Int64");
assert_eq!(id_col.values(), &[1i64, 2]);
Ok(())
}
#[rstest]
#[case::name_mode(ColumnMappingMode::Name)]
#[case::id_mode(ColumnMappingMode::Id)]
#[case::none_mode(ColumnMappingMode::None)]
fn test_logical_to_physical_transform(#[case] mode: ColumnMappingMode) -> DeltaResult<()> {
validate_logical_to_physical_transform(mode)
}
#[rstest]
#[case::dropped("./tests/data/basic_partitioned/", 2, &[])]
#[case::kept("./tests/data/partitioned_with_materialize_feature/", 3, &["letter"])]
fn test_partition_column_in_eval_output(
#[case] table_path: &str,
#[case] expected_cols: usize,
#[case] expected_partition_cols: &[&str],
) -> Result<(), Box<dyn std::error::Error>> {
use crate::arrow::array::Float64Array;
let (_snap, wc) = snapshot_and_partitioned_write_context(table_path)?;
let batch = RecordBatch::try_new(
Arc::new(wc.logical_schema().as_ref().try_into_arrow()?),
vec![
Arc::new(StringArray::from(vec!["x"])) as ArrayRef,
Arc::new(Int64Array::from(vec![42])),
Arc::new(Float64Array::from(vec![1.5])),
],
)?;
let rb = eval_logical_to_physical(&wc, batch)?;
assert_eq!(rb.num_columns(), expected_cols);
for col in expected_partition_cols {
assert!(rb.schema().fields().iter().any(|f| f.name() == *col));
}
Ok(())
}
enum TestFileStats {
None,
Present,
AllNull,
}
fn create_test_add_files(paths: Vec<&str>, stats: Vec<TestFileStats>) -> Box<dyn EngineData> {
let value_fields = vec![StructField::nullable("value", DataType::LONG)];
let value_struct_type = DataType::struct_type_unchecked(value_fields.clone());
let stats_type = DataType::struct_type_unchecked(vec![
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", value_struct_type.clone()),
StructField::nullable("minValues", value_struct_type.clone()),
StructField::nullable("maxValues", value_struct_type.clone()),
]);
let stats_fields = vec![
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", value_struct_type.clone()),
StructField::nullable("minValues", value_struct_type.clone()),
StructField::nullable("maxValues", value_struct_type),
];
let schema = Arc::new(StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::nullable("stats", stats_type.clone()),
]));
let empty_map = Scalar::Map(
MapData::try_new(
MapType::new(DataType::STRING, DataType::STRING, true),
Vec::<(&str, &str)>::new(),
)
.unwrap(),
);
let rows: Vec<Vec<Scalar>> = paths
.iter()
.zip(stats.iter())
.map(|(path, stat)| {
let stats_scalar = match stat {
TestFileStats::None => Scalar::Null(stats_type.clone()),
TestFileStats::Present | TestFileStats::AllNull => {
let value_struct = |v: Option<i64>| {
let scalar = v.map_or(Scalar::Null(DataType::LONG), |n| n.into());
Scalar::Struct(
StructData::try_new(value_fields.clone(), vec![scalar]).unwrap(),
)
};
let (null_count, min, max) = match stat {
TestFileStats::Present => (
value_struct(Some(0)),
value_struct(Some(1)),
value_struct(Some(100)),
),
_ => (
value_struct(Some(100)),
value_struct(None),
value_struct(None),
),
};
Scalar::Struct(
StructData::try_new(
stats_fields.clone(),
vec![100i64.into(), null_count, min, max],
)
.unwrap(),
)
}
};
vec![
(*path).into(),
empty_map.clone(),
1024i64.into(),
1000000i64.into(),
stats_scalar,
]
})
.collect();
let row_refs: Vec<&[Scalar]> = rows.iter().map(|r| r.as_slice()).collect();
ArrowEvaluationHandler
.create_many(schema, &row_refs)
.unwrap()
}
#[test]
fn test_stats_validation_allows_all_null_clustering_column() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::AllNull]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should pass for all-null clustering columns, got: {result:?}",
);
}
#[test]
fn test_stats_validation_when_clustering_cols_missing_stats() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::None]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_err(),
"Expected validation to fail when stats are missing for clustering columns"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Stats validation error") || err_msg.contains("no stats"),
"Expected stats validation error, got: {err_msg}"
);
}
#[test]
fn test_stats_validation_when_clustering_stats_present() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::Present]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should pass when stats are present, got: {result:?}"
);
}
#[test]
fn test_stats_validation_skipped_without_clustering() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string());
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::None]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should be skipped without clustering, got: {result:?}"
);
}
#[test]
fn disallow_catalog_committer_for_non_catalog_managed_table() {
let storage = Arc::new(InMemory::new());
let table_root = url::Url::parse("memory:///").unwrap();
let engine = crate::engine::default::DefaultEngineBuilder::new(storage.clone()).build();
let actions = [
r#"{"commitInfo":{"timestamp":12345678900,"inCommitTimestamp":12345678900}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":["inCommitTimestamp"]}}"#,
r#"{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[]}","partitionColumns":[],"configuration":{"delta.enableInCommitTimestamps":"true"},"createdTime":1234567890}}"#,
].join("\n");
let commit_path = Path::from("_delta_log/00000000000000000000.json");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(storage.put(&commit_path, actions.into()))
.unwrap();
let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap();
let committer = Box::new(MockCatalogCommitter);
let err = snapshot
.transaction(committer, &engine)
.unwrap()
.commit(&engine)
.unwrap_err();
assert!(matches!(
err,
crate::Error::Generic(e) if e.contains("This table is path-based and cannot be committed to with a catalog committer")
));
}
#[test]
fn disallow_catalog_committer_for_non_catalog_managed_create_table() {
let storage = Arc::new(InMemory::new());
let engine = crate::engine::default::DefaultEngineBuilder::new(storage).build();
let schema = Arc::new(crate::schema::StructType::new_unchecked(vec![
crate::schema::StructField::new("id", crate::schema::DataType::INTEGER, true),
]));
let committer = Box::new(MockCatalogCommitter);
let err = create_table("memory:///", schema, "test-engine")
.build(&engine, committer)
.unwrap()
.commit(&engine)
.unwrap_err();
assert!(matches!(
err,
crate::Error::Generic(e) if e.contains("This table is path-based and cannot be committed to with a catalog committer")
));
}
struct CapturingCommitter {
captured: Arc<Mutex<Option<i64>>>,
}
impl CapturingCommitter {
fn new() -> (Self, Arc<Mutex<Option<i64>>>) {
let captured = Arc::new(Mutex::new(None));
(
Self {
captured: captured.clone(),
},
captured,
)
}
}
impl Committer for CapturingCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: Box<dyn Iterator<Item = DeltaResult<FilteredEngineData>> + Send + '_>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
*self.captured.lock().unwrap() = Some(commit_metadata.in_commit_timestamp());
Ok(CommitResponse::Conflict {
version: commit_metadata.version(),
})
}
fn is_catalog_committer(&self) -> bool {
false
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
#[test]
fn test_commit_metadata_receives_ict_not_wall_time() -> DeltaResult<()> {
let tempdir = tempfile::tempdir().unwrap();
let log_dir = tempdir.path().join("_delta_log");
std::fs::create_dir_all(&log_dir).unwrap();
let future_ict: i64 = 9_999_999_999_999; let commit_info = serde_json::json!({
"commitInfo": {
"timestamp": 1000,
"operation": "WRITE",
"inCommitTimestamp": future_ict
}
});
let protocol = serde_json::json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": [],
"writerFeatures": ["inCommitTimestamp"]
}
});
let schema_json = serde_json::json!({
"type": "struct",
"fields": [{
"name": "id",
"type": "integer",
"nullable": true,
"metadata": {}
}]
});
let metadata = serde_json::json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": schema_json.to_string(),
"partitionColumns": [],
"configuration": {
"delta.enableInCommitTimestamps": "true"
}
}
});
let commit0 = format!("{commit_info}\n{protocol}\n{metadata}\n");
std::fs::write(log_dir.join("00000000000000000000.json"), commit0).unwrap();
let table_url = Url::from_directory_path(tempdir.path()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(table_url).build(&engine)?;
let prev_ict = snapshot.get_in_commit_timestamp(&engine)?;
assert_eq!(prev_ict, Some(future_ict));
let (committer, captured_ts) = CapturingCommitter::new();
let mut txn = snapshot.transaction(Box::new(committer), &engine)?;
add_dummy_file(&mut txn);
let result = txn.commit(&engine)?;
assert!(
matches!(result, CommitResult::ConflictedTransaction(_)),
"Expected ConflictedTransaction from capturing committer"
);
let captured = captured_ts
.lock()
.unwrap()
.expect("should have captured a timestamp");
assert_eq!(
captured,
future_ict + 1,
"CommitMetadata.in_commit_timestamp should be the computed ICT (prev_ict + 1), \
not the wall-clock time"
);
Ok(())
}
}