use std::collections::{HashMap, HashSet};
use std::iter;
use std::marker::PhantomData;
use std::ops::Deref;
use std::sync::{Arc, LazyLock};
use std::time::{Duration, Instant};
use delta_kernel_derive::internal_api;
use tracing::{info, instrument};
use crate::actions::{
as_log_add_schema, get_commit_schema, get_log_remove_schema, get_log_txn_schema, CommitInfo,
DomainMetadata, Metadata, Protocol, SetTransaction, MAX_VALUES, METADATA_NAME, MIN_VALUES,
NULL_COUNT, NUM_RECORDS, PROTOCOL_NAME, TIGHT_BOUNDS,
};
use crate::committer::{
CommitMetadata, CommitProtocolMetadata, CommitResponse, CommitType, Committer,
};
use crate::crc::{is_incremental_safe_operation, CrcDelta, FileStatsDelta};
use crate::engine_data::FilteredEngineData;
use crate::error::Error;
use crate::expressions::UnaryExpressionOp::ToJson;
use crate::expressions::{
col, lit, ArrayData, ColumnName, ExpressionStructPatch, ExpressionStructPatchBuilder, Scalar,
};
use crate::log_segment::LogSegment;
use crate::metrics::events::TRANSACTION_COMMIT_SPAN;
use crate::metrics::{CommitFailureReason, MetricId};
use crate::partition::serialization::serialize_partition_value;
use crate::partition::validation::validate_partition_values;
use crate::path::{LogRoot, ParsedLogPath};
use crate::row_tracking::{RowTrackingDomainMetadata, RowTrackingVisitor};
use crate::scan::data_skipping::stats_schema::schema_with_all_fields_nullable;
use crate::scan::log_replay::{
BASE_ROW_ID_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME, FILE_CONSTANT_VALUES_NAME,
PARTITION_VALUES_PARSED_NAME, STATS_PARSED_NAME, TAGS_NAME,
};
use crate::scan::scan_row_schema;
use crate::schema::void_utils::{add_void_stripping, validate_schema_for_write};
use crate::schema::{
ArrayType, MapType, SchemaRef, SchemaStructPatchBuilder, StructField, StructType,
StructTypeBuilder,
};
use crate::snapshot::{Snapshot, SnapshotRef};
use crate::struct_patch::ProjectionStructPatchBuilder;
use crate::table_configuration::TableConfiguration;
use crate::table_features::TableFeature;
use crate::utils::require;
use crate::{
DataType, DeltaResult, Engine, EngineData, Expression, FileMeta, IntoEngineData, RowVisitor,
Version,
};
#[cfg(feature = "internal-api")]
pub mod builder;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod builder;
#[cfg(feature = "internal-api")]
pub mod create_table;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod create_table;
#[cfg(feature = "internal-api")]
pub mod data_layout;
#[cfg(not(feature = "internal-api"))]
pub(crate) mod data_layout;
pub(crate) mod alter_table;
pub use alter_table::AlterTableTransaction;
mod commit_info;
mod domain_metadata;
pub(crate) mod schema_evolution;
#[cfg(feature = "internal-api")]
pub mod stats_verifier;
#[cfg(not(feature = "internal-api"))]
mod stats_verifier;
mod update;
mod write_context;
use stats_verifier::StatsColumnVerifier;
use write_context::SharedWriteState;
pub use write_context::WriteContext;
pub(crate) type EngineDataResultIterator<'a> =
Box<dyn Iterator<Item = DeltaResult<Box<dyn EngineData>>> + Send + 'a>;
pub(crate) static MANDATORY_ADD_FILE_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
Arc::new(StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
]))
});
pub(crate) fn mandatory_add_file_schema() -> &'static SchemaRef {
&MANDATORY_ADD_FILE_SCHEMA
}
pub(crate) static BASE_ADD_FILES_SCHEMA: LazyLock<SchemaRef> = LazyLock::new(|| {
let stats = StructField::nullable(
"stats",
DataType::struct_type_unchecked(vec![
StructField::nullable(NUM_RECORDS, DataType::LONG),
StructField::nullable(NULL_COUNT, DataType::struct_type_unchecked(vec![])),
StructField::nullable(MIN_VALUES, DataType::struct_type_unchecked(vec![])),
StructField::nullable(MAX_VALUES, DataType::struct_type_unchecked(vec![])),
StructField::nullable(TIGHT_BOUNDS, DataType::BOOLEAN),
]),
);
StructTypeBuilder::from_schema(mandatory_add_file_schema())
.add_field(stats)
.build_arc_unchecked()
});
static DATA_CHANGE_COLUMN: LazyLock<StructField> =
LazyLock::new(|| StructField::not_null("dataChange", DataType::BOOLEAN));
fn with_row_tracking_cols(schema: &SchemaRef) -> DeltaResult<SchemaRef> {
let patch = SchemaStructPatchBuilder::new()
.append(StructField::nullable("baseRowId", DataType::LONG))
.append(StructField::nullable(
"defaultRowCommitVersion",
DataType::LONG,
));
Ok(Arc::new(patch.build(schema)?))
}
#[derive(Debug)]
pub struct ExistingTable;
#[derive(Debug)]
pub struct CreateTable;
#[derive(Debug)]
pub struct AlterTable;
pub trait SupportsDataFiles {}
impl SupportsDataFiles for ExistingTable {}
impl SupportsDataFiles for CreateTable {}
pub struct Transaction<S = ExistingTable> {
span: tracing::Span,
operation_id: MetricId,
correlation_id: Option<Arc<str>>,
read_snapshot_opt: Option<SnapshotRef>,
effective_table_config: TableConfiguration,
should_emit_protocol: bool,
should_emit_metadata: bool,
committer: Box<dyn Committer>,
operation: Option<String>,
engine_info: Option<String>,
engine_commit_info: Option<(Box<dyn EngineData>, SchemaRef)>,
add_files_metadata: Vec<Box<dyn EngineData>>,
remove_files_metadata: Vec<FilteredEngineData>,
set_transactions: Vec<SetTransaction>,
commit_timestamp: i64,
user_domain_metadata_additions: Vec<DomainMetadata>,
system_domain_metadata_additions: Vec<DomainMetadata>,
user_domain_removals: Vec<String>,
data_change: bool,
is_blind_append: bool,
dv_matched_files: Vec<FilteredEngineData>,
physical_clustering_columns: Option<Vec<ColumnName>>,
_state: PhantomData<S>,
}
impl<S> std::fmt::Debug for Transaction<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let version_info = match &self.read_snapshot_opt {
Some(snap) => format!("{}", snap.version()),
None => "create_table".to_string(),
};
f.write_str(&format!(
"Transaction {{ read_snapshot version: {}, engine_info: {} }}",
version_info,
self.engine_info.is_some()
))
}
}
fn build_add_action_projection(
input_schema: &StructType,
data_change: bool,
) -> DeltaResult<(SchemaRef, Expression)> {
let (output_schema, patch) = ProjectionStructPatchBuilder::new(input_schema)
.insert_after(
"modificationTime",
DATA_CHANGE_COLUMN.clone(),
lit(data_change),
)
.replace(
"stats",
StructField::nullable("stats", DataType::STRING),
Expression::unary(ToJson, col!("stats")),
)
.build()?;
let patch = Expression::struct_from([patch]);
Ok((output_schema, patch))
}
fn build_add_actions<'a, I, T>(
engine: &dyn Engine,
add_files_metadata: I,
input_schema: SchemaRef,
data_change: bool,
) -> DeltaResult<impl Iterator<Item = DeltaResult<Box<dyn EngineData>>> + 'a>
where
I: Iterator<Item = DeltaResult<T>> + Send + 'a,
T: Deref<Target = dyn EngineData> + Send + 'a,
{
let evaluation_handler = engine.evaluation_handler();
let (output_schema, adds_expr) = build_add_action_projection(&input_schema, data_change)?;
let adds_expr = Arc::new(adds_expr);
Ok(add_files_metadata.map(move |add_files_batch| {
let adds_evaluator = evaluation_handler.new_expression_evaluator(
input_schema.clone(),
adds_expr.clone(),
as_log_add_schema(output_schema.clone()).into(),
)?;
adds_evaluator.evaluate(add_files_batch?.deref())
}))
}
impl<S> Transaction<S> {
#[instrument(
parent = &self.span,
name = TRANSACTION_COMMIT_SPAN,
skip_all,
fields(
report,
operation_id = %self.operation_id,
is_catalog_managed = self.effective_table_config.is_catalog_managed(),
correlation_id = self.correlation_id.as_deref().unwrap_or(""),
commit_version = self.get_commit_version(),
num_add_files,
num_remove_files,
add_files_bytes,
remove_files_bytes,
is_blind_append,
data_change,
operation,
prepare_duration_ns,
committer_duration_ns,
failure_reason,
),
err
)]
pub fn commit(self, engine: &dyn Engine) -> DeltaResult<CommitResult<S>> {
let commit_start = Instant::now();
info!(
num_add_files = self.add_files_metadata.len(),
num_remove_files = self.remove_files_metadata.len(),
num_dv_updates = self.dv_matched_files.len(),
);
if !self.remove_files_metadata.is_empty() {
self.effective_table_config
.validate_feature_support_for_remove()?;
}
let mut app_ids = HashSet::with_capacity(self.set_transactions.len());
if let Some(dup) = self
.set_transactions
.iter()
.find(|t| !app_ids.insert(&t.app_id))
{
return Err(Error::generic(format!(
"app_id {} already exists in transaction",
dup.app_id
)));
}
self.validate_blind_append_semantics()?;
self.ensure_schema_non_empty_for_data_writes()?;
if !self.add_files_metadata.is_empty() {
validate_schema_for_write(&self.effective_table_config.logical_schema())?;
}
if !self.is_create_table()
&& !self.add_files_metadata.is_empty()
&& !self.remove_files_metadata.is_empty()
&& self.data_change
{
let cdf_enabled = self
.effective_table_config
.table_properties()
.enable_change_data_feed
.unwrap_or(false);
require!(
!cdf_enabled,
Error::generic(
"Cannot add and remove data in the same transaction when Change Data Feed is enabled (delta.enableChangeDataFeed = true). \
This would require writing CDC files for DML operations, which is not yet supported. \
Consider using separate transactions: one to add files, another to remove files."
)
);
}
self.validate_add_files_stats(&self.add_files_metadata)?;
let set_transaction_actions = self
.set_transactions
.clone()
.into_iter()
.map(|txn| txn.into_engine_data(get_log_txn_schema().clone(), engine));
let in_commit_timestamp = self.get_in_commit_timestamp(engine)?;
let kernel_commit_info = CommitInfo::new(
self.commit_timestamp,
in_commit_timestamp,
self.operation.clone(),
self.engine_info.clone(),
self.is_blind_append,
);
let commit_info_action = self.generate_commit_info(engine, kernel_commit_info);
let (protocol_action, protocol) = if self.should_emit_protocol {
let protocol = self.effective_table_config.protocol().clone();
let schema = get_commit_schema().project(&[PROTOCOL_NAME])?;
let action = protocol.clone().into_engine_data(schema, engine)?;
(Some(action), Some(protocol))
} else {
(None, None)
};
let (metadata_action, metadata) = if self.should_emit_metadata {
let metadata = self.effective_table_config.metadata().clone();
let schema = get_commit_schema().project(&[METADATA_NAME])?;
let action = metadata.clone().into_engine_data(schema, engine)?;
(Some(action), Some(metadata))
} else {
(None, None)
};
let commit_version = self.get_commit_version();
let (add_actions, row_tracking_domain_metadata) =
self.generate_adds(engine, commit_version)?;
let (domain_metadata_actions, dm_changes) =
self.generate_domain_metadata_actions(engine, row_tracking_domain_metadata)?;
let dv_update_actions = self.generate_dv_update_actions(engine)?;
let remove_actions =
self.generate_remove_actions(engine, self.remove_files_metadata.iter(), &[])?;
let actions = iter::once(commit_info_action)
.chain(protocol_action.map(Ok))
.chain(metadata_action.map(Ok))
.chain(add_actions)
.chain(set_transaction_actions)
.chain(domain_metadata_actions);
let filtered_actions = actions
.map(|action_result| action_result.map(FilteredEngineData::with_all_rows_selected))
.chain(remove_actions)
.chain(dv_update_actions);
let commit_metadata = self.create_commit_metadata(
commit_version,
in_commit_timestamp,
protocol,
metadata,
dm_changes.clone(),
)?;
let prepare_duration = commit_start.elapsed();
let committer_start = Instant::now();
let commit_response =
self.committer
.commit(engine, Box::new(filtered_actions), commit_metadata);
let committer_duration = committer_start.elapsed();
match commit_response {
Ok(CommitResponse::Committed { file_meta }) => {
let bin_boundaries = self
.read_snapshot_opt
.as_ref()
.and_then(|snap| snap.get_file_stats_if_present())
.and_then(|s| s.file_size_histogram)
.map(|h| h.sorted_bin_boundaries);
let file_stats = FileStatsDelta::try_compute_for_txn(
&self.add_files_metadata,
&self.remove_files_metadata,
bin_boundaries.as_deref(),
)?;
self.record_commit_success_metrics(
&file_stats,
prepare_duration,
committer_duration,
);
let crc_delta =
self.build_crc_delta(file_stats, in_commit_timestamp, dm_changes)?;
Ok(CommitResult::CommittedTransaction(
self.into_committed(file_meta, crc_delta)?,
))
}
Ok(CommitResponse::Conflict { version }) => {
tracing::Span::current()
.record("failure_reason", CommitFailureReason::Conflict.as_ref());
Ok(CommitResult::ConflictedTransaction(
self.into_conflicted(version),
))
}
Err(e @ Error::IOError(_)) => {
tracing::Span::current()
.record("failure_reason", CommitFailureReason::RetryableIo.as_ref());
Ok(CommitResult::RetryableTransaction(self.into_retryable(e)))
}
Err(e) => Err(e),
}
}
fn record_commit_success_metrics(
&self,
file_stats: &FileStatsDelta,
prepare_duration: Duration,
committer_duration: Duration,
) {
let span = tracing::Span::current();
span.record("num_add_files", file_stats.gross_add_files);
span.record("num_remove_files", file_stats.gross_remove_files);
span.record("add_files_bytes", file_stats.gross_add_bytes);
span.record("remove_files_bytes", file_stats.gross_remove_bytes);
span.record("is_blind_append", self.is_blind_append);
span.record("data_change", self.data_change);
if let Some(operation) = self.operation.as_deref() {
span.record("operation", operation);
}
span.record("prepare_duration_ns", prepare_duration.as_nanos() as u64);
span.record(
"committer_duration_ns",
committer_duration.as_nanos() as u64,
);
}
pub fn with_data_change(mut self, data_change: bool) -> Self {
self.data_change = data_change;
self
}
#[internal_api]
#[allow(dead_code)] pub(crate) fn set_data_change(&mut self, data_change: bool) {
self.data_change = data_change;
}
pub fn with_engine_info(mut self, engine_info: impl Into<String>) -> Self {
self.engine_info = Some(engine_info.into());
self
}
pub fn with_correlation_id(mut self, correlation_id: impl Into<Arc<str>>) -> Self {
self.correlation_id = Some(correlation_id.into()).filter(|id| !id.is_empty());
self
}
pub fn with_commit_info(
mut self,
engine_commit_info: Box<dyn EngineData>,
commit_info_schema: SchemaRef,
) -> Self {
self.engine_commit_info = Some((engine_commit_info, commit_info_schema));
self
}
pub fn with_transaction_id(mut self, app_id: String, version: i64) -> Self {
let set_transaction = SetTransaction::new(app_id, version, Some(self.commit_timestamp));
self.set_transactions.push(set_transaction);
self
}
pub fn with_domain_metadata(mut self, domain: String, configuration: String) -> Self {
self.user_domain_metadata_additions
.push(DomainMetadata::new(domain, configuration));
self
}
fn determine_commit_type(
is_create: bool,
table_config: &crate::table_configuration::TableConfiguration,
) -> CommitType {
let is_catalog_managed = table_config.is_catalog_managed();
match (is_create, is_catalog_managed) {
(true, true) => CommitType::CatalogManagedCreate,
(true, false) => CommitType::PathBasedCreate,
(false, true) => CommitType::CatalogManagedWrite,
(false, false) => CommitType::PathBasedWrite,
}
}
fn validate_commit_type(
is_catalog_committer: bool,
commit_type: &CommitType,
) -> DeltaResult<()> {
match (
is_catalog_committer,
commit_type.requires_catalog_committer(),
) {
(true, true) | (false, false) => Ok(()),
(false, true) => Err(Error::generic(
"This table is catalog-managed and requires a catalog committer. \
Please provide a catalog committer via Snapshot::transaction().",
)),
(true, false) => Err(Error::generic(
"This table is path-based and cannot be committed to with a catalog committer.",
)),
}
}
fn create_commit_metadata(
&self,
commit_version: Version,
in_commit_timestamp: Option<i64>,
new_protocol: Option<Protocol>,
new_metadata: Option<Metadata>,
domain_metadata_changes: Vec<crate::actions::DomainMetadata>,
) -> DeltaResult<CommitMetadata> {
let log_root = LogRoot::new(self.effective_table_config.table_root().clone())?;
let is_create = self.is_create_table();
let commit_type = Self::determine_commit_type(is_create, &self.effective_table_config);
Self::validate_commit_type(self.committer.is_catalog_committer(), &commit_type)?;
let (read_protocol, read_metadata, max_published_version) = if is_create {
(None, None, None)
} else {
let snap = self.read_snapshot()?;
let read_config = snap.table_configuration();
(
Some(read_config.protocol().clone()),
Some(read_config.metadata().clone()),
snap.log_segment().listed.max_published_version,
)
};
let protocol_metadata = CommitProtocolMetadata::try_new(
read_protocol,
read_metadata,
new_protocol,
new_metadata,
)?;
Ok(CommitMetadata::new(
log_root,
commit_version,
commit_type,
in_commit_timestamp.unwrap_or(self.commit_timestamp),
max_published_version,
protocol_metadata,
domain_metadata_changes,
))
}
fn validate_blind_append_semantics(&self) -> DeltaResult<()> {
if !self.is_blind_append {
return Ok(());
}
require!(
!self.is_create_table(),
Error::invalid_transaction_state(
"Blind append is not supported for create-table transactions",
)
);
require!(
!self.add_files_metadata.is_empty(),
Error::invalid_transaction_state("Blind append requires at least one added data file")
);
require!(
self.data_change,
Error::invalid_transaction_state("Blind append requires data_change to be true")
);
require!(
self.remove_files_metadata.is_empty(),
Error::invalid_transaction_state("Blind append cannot remove files")
);
require!(
self.dv_matched_files.is_empty(),
Error::invalid_transaction_state("Blind append cannot update deletion vectors")
);
Ok(())
}
fn ensure_schema_non_empty_for_data_writes(&self) -> DeltaResult<()> {
if self.is_create_table() {
return Ok(());
}
if self.has_data_file_actions() {
self.ensure_schema_non_empty_for_write_context()?;
}
Ok(())
}
fn ensure_schema_non_empty_for_write_context(&self) -> DeltaResult<()> {
if self.is_create_table() {
return Ok(());
}
if self.effective_table_config.logical_schema().num_fields() == 0 {
return Err(Error::generic(
"Cannot write data files to a Delta table with empty schema; \
use `snapshot.alter_table().add_column(...)` to add at least one \
column before writing data",
));
}
Ok(())
}
fn is_create_table(&self) -> bool {
debug_assert!(
self.operation.as_deref() != Some("CREATE TABLE") || self.read_snapshot_opt.is_none(),
"CREATE TABLE operation should not have a read snapshot"
);
self.read_snapshot_opt.is_none()
}
fn has_data_file_actions(&self) -> bool {
!self.add_files_metadata.is_empty()
|| !self.remove_files_metadata.is_empty()
|| !self.dv_matched_files.is_empty()
}
fn read_snapshot(&self) -> DeltaResult<&Snapshot> {
self.read_snapshot_opt.as_deref().ok_or_else(|| {
Error::internal_error("read_snapshot() called on create-table transaction")
})
}
fn get_in_commit_timestamp(&self, engine: &dyn Engine) -> DeltaResult<Option<i64>> {
let has_ict = self
.effective_table_config
.is_feature_enabled(&TableFeature::InCommitTimestamp);
if !has_ict {
return Ok(None);
}
if self.is_create_table() {
return Ok(Some(self.commit_timestamp));
}
Ok(self
.read_snapshot()?
.get_in_commit_timestamp(engine)?
.map(|prev_ict| self.commit_timestamp.max(prev_ict + 1)))
}
fn get_commit_version(&self) -> Version {
match &self.read_snapshot_opt {
Some(snap) => snap.version() + 1,
None => 0,
}
}
pub fn add_files_schema(&self) -> &'static SchemaRef {
&BASE_ADD_FILES_SCHEMA
}
}
impl<S: SupportsDataFiles> Transaction<S> {
#[allow(unused)]
pub fn stats_schema(&self) -> DeltaResult<SchemaRef> {
let stats_schemas = self
.effective_table_config
.build_expected_stats_schemas(self.physical_clustering_columns.as_deref(), None)?;
Ok(stats_schemas.physical)
}
#[allow(unused)]
pub fn stats_columns(&self) -> Vec<ColumnName> {
self.effective_table_config
.physical_stats_column_names(self.physical_clustering_columns.as_deref())
}
fn generate_logical_to_physical(
&self,
partition_values: Option<&HashMap<String, Scalar>>,
) -> DeltaResult<Expression> {
let logical_schema = self.effective_table_config.logical_schema();
let mut patch = ExpressionStructPatchBuilder::new();
if self
.effective_table_config
.should_materialize_partition_columns()
{
let partition_cols: HashSet<&str> = self
.effective_table_config
.partition_columns()
.iter()
.map(String::as_str)
.collect();
let mut predecessor: Option<&str> = None;
for field in logical_schema.fields() {
let name = field.name().as_str();
if partition_cols.contains(name) {
let value = partition_values.and_then(|m| m.get(name)).ok_or_else(|| {
Error::internal_error(format!(
"partition column '{name}' missing while building logical-to-physical \
expression"
))
})?;
let literal = lit(value.clone());
patch = match predecessor {
Some(predecessor) => patch.insert_after(predecessor, literal),
None => patch.prepend(literal),
};
} else if *field.data_type() != DataType::VOID {
predecessor = Some(name);
}
}
}
let patch = add_void_stripping(patch, &logical_schema);
Expression::struct_patch(patch)
}
pub fn logical_partition_columns(&self) -> &[String] {
self.effective_table_config.partition_columns()
}
fn validate_for_data_write(&self) -> DeltaResult<()> {
validate_schema_for_write(&self.effective_table_config.logical_schema())
}
fn shared_write_state(&self) -> DeltaResult<Arc<SharedWriteState>> {
let table_config = &self.effective_table_config;
let props = table_config.table_properties();
Ok(Arc::new(SharedWriteState {
table_root: table_config.table_root().clone(),
logical_schema: table_config.logical_schema_without_partition_columns(),
physical_schema: table_config.physical_write_schema(),
column_mapping_mode: table_config.column_mapping_mode(),
stats_columns: self.stats_columns(),
logical_partition_columns: table_config.partition_columns().to_vec(),
randomize_file_prefixes: props.should_randomize_file_prefixes(),
random_prefix_length: props.random_prefix_length(),
}))
}
pub fn partitioned_write_context(
&self,
partition_values: HashMap<String, Scalar>,
) -> DeltaResult<WriteContext> {
self.ensure_schema_non_empty_for_write_context()?;
self.validate_for_data_write()?;
let shared = self.shared_write_state()?;
require!(
!shared.logical_partition_columns.is_empty(),
Error::generic("table is not partitioned; use unpartitioned_write_context() instead")
);
let full_logical_schema = self.effective_table_config.logical_schema();
let normalized = validate_partition_values(
&shared.logical_partition_columns,
&full_logical_schema,
partition_values,
)?;
let mut serialized = HashMap::with_capacity(normalized.len());
for logical_name in &shared.logical_partition_columns {
let scalar = normalized.get(logical_name).ok_or_else(|| {
Error::internal_error(format!(
"partition column '{logical_name}' missing after validation"
))
})?;
let value = serialize_partition_value(scalar)?;
let physical_name = full_logical_schema
.field(logical_name)
.ok_or_else(|| {
Error::internal_error(format!(
"partition column '{logical_name}' not found in schema after validation"
))
})?
.physical_name(shared.column_mapping_mode)
.to_string();
serialized.insert(physical_name, value);
}
let logical_to_physical = Arc::new(self.generate_logical_to_physical(Some(&normalized))?);
Ok(WriteContext {
shared,
logical_to_physical,
physical_partition_values: serialized,
})
}
pub fn unpartitioned_write_context(&self) -> DeltaResult<WriteContext> {
self.ensure_schema_non_empty_for_write_context()?;
self.validate_for_data_write()?;
let shared = self.shared_write_state()?;
require!(
shared.logical_partition_columns.is_empty(),
Error::generic("table is partitioned; use partitioned_write_context() instead")
);
let logical_to_physical = Arc::new(self.generate_logical_to_physical(None)?);
Ok(WriteContext {
shared,
logical_to_physical,
physical_partition_values: HashMap::new(),
})
}
pub fn add_files(&mut self, add_metadata: Box<dyn EngineData>) {
self.add_files_metadata.push(add_metadata);
}
}
impl<S> Transaction<S> {
fn validate_add_files_stats(&self, add_files: &[Box<dyn EngineData>]) -> DeltaResult<()> {
if add_files.is_empty() {
return Ok(());
}
if self.effective_table_config.requires_stats_num_records() {
stats_verifier::verify_num_records_present(add_files)?;
}
if let Some(ref clustering_cols) = self.physical_clustering_columns {
if !clustering_cols.is_empty() {
let physical_schema = self.effective_table_config.physical_schema();
let columns_with_types: Vec<(ColumnName, DataType)> = clustering_cols
.iter()
.map(|col| {
let data_type = physical_schema
.fields_of_path(col)?
.last()
.map(|field| field.data_type().clone())
.ok_or_else(|| {
Error::internal_error(format!(
"Required column '{col}' not found in table schema"
))
})?;
Ok((col.clone(), data_type))
})
.collect::<DeltaResult<_>>()?;
let verifier = StatsColumnVerifier::new(columns_with_types);
verifier.verify(add_files)?;
}
}
Ok(())
}
#[instrument(name = "txn.gen_adds", skip_all, err)]
fn generate_adds<'a>(
&'a self,
engine: &dyn Engine,
commit_version: u64,
) -> DeltaResult<(
EngineDataResultIterator<'a>,
Option<RowTrackingDomainMetadata>,
)> {
let row_tracking_supported = self.effective_table_config.should_write_row_tracking();
if self.add_files_metadata.is_empty() {
let row_tracking_dm = (row_tracking_supported && self.is_create_table())
.then(RowTrackingDomainMetadata::initial);
return Ok((Box::new(iter::empty()), row_tracking_dm));
}
let commit_version = i64::try_from(commit_version)
.map_err(|_| Error::generic("Commit version too large to fit in i64"))?;
if row_tracking_supported {
self.generate_adds_with_row_tracking(engine, commit_version)
} else {
let add_actions = build_add_actions(
engine,
self.add_files_metadata.iter().map(|a| Ok(a.deref())),
self.add_files_schema().clone(),
self.data_change,
)?;
Ok((Box::new(add_actions), None))
}
}
fn generate_adds_with_row_tracking<'a>(
&'a self,
engine: &dyn Engine,
commit_version: i64,
) -> DeltaResult<(
EngineDataResultIterator<'a>,
Option<RowTrackingDomainMetadata>,
)> {
let row_id_high_water_mark = if self.is_create_table() {
None
} else {
RowTrackingDomainMetadata::get_high_water_mark(self.read_snapshot()?, engine)?
};
let mut row_tracking_visitor =
RowTrackingVisitor::new(row_id_high_water_mark, Some(self.add_files_metadata.len()));
for add_files_batch in &self.add_files_metadata {
row_tracking_visitor.visit_rows_of(add_files_batch.deref())?;
}
let RowTrackingVisitor {
base_row_id_batches,
row_id_high_water_mark,
} = row_tracking_visitor;
let extended_add_files = self.add_files_metadata.iter().zip(base_row_id_batches).map(
move |(add_files_batch, base_row_ids)| {
let commit_versions = vec![commit_version; base_row_ids.len()];
let base_row_ids_array =
ArrayData::try_new(ArrayType::new(DataType::LONG, true), base_row_ids)?;
let commit_versions_array =
ArrayData::try_new(ArrayType::new(DataType::LONG, true), commit_versions)?;
let row_tracking_schema =
with_row_tracking_cols(&Arc::new(StructType::new_unchecked(vec![])))?;
add_files_batch.append_columns(
row_tracking_schema,
vec![base_row_ids_array, commit_versions_array],
)
},
);
let add_actions = build_add_actions(
engine,
extended_add_files,
with_row_tracking_cols(self.add_files_schema())?,
self.data_change,
)?;
let row_tracking_domain_metadata: RowTrackingDomainMetadata =
RowTrackingDomainMetadata::new(row_id_high_water_mark);
Ok((Box::new(add_actions), Some(row_tracking_domain_metadata)))
}
fn into_committed(
self,
file_meta: FileMeta,
crc_delta: CrcDelta,
) -> DeltaResult<CommittedTransaction> {
let parsed_commit = ParsedLogPath::parse_commit(file_meta)?;
let commit_version = parsed_commit.version;
let (post_commit_stats, post_commit_snapshot) = match &self.read_snapshot_opt {
Some(snap) => {
let stats = PostCommitStats {
commits_since_checkpoint: snap.log_segment().commits_since_checkpoint() + 1,
commits_since_log_compaction: snap
.log_segment()
.commits_since_log_compaction_or_checkpoint()
+ 1,
};
let snapshot = snap.new_post_commit(parsed_commit, crc_delta)?;
(stats, Arc::new(snapshot))
}
None => {
let log_root = self
.effective_table_config
.table_root()
.join("_delta_log/")?;
let log_segment = LogSegment::new_for_version_zero(log_root, parsed_commit)?;
let crc = crc_delta.into_crc_for_version_zero().ok_or_else(|| {
Error::internal_error("CREATE TABLE CRC delta is missing protocol or metadata")
})?;
let stats = PostCommitStats {
commits_since_checkpoint: 1,
commits_since_log_compaction: 1,
};
let snapshot = Snapshot::new_with_crc(
log_segment,
self.effective_table_config,
Some(Arc::new(crc)),
)?;
(stats, Arc::new(snapshot))
}
};
Ok(CommittedTransaction {
commit_version,
post_commit_stats,
post_commit_snapshot: Some(post_commit_snapshot),
})
}
fn build_crc_delta(
&self,
file_stats: FileStatsDelta,
in_commit_timestamp: Option<i64>,
dm_changes: Vec<DomainMetadata>,
) -> DeltaResult<CrcDelta> {
let domain_metadata = dm_changes
.into_iter()
.map(|dm| (dm.domain().to_string(), dm))
.collect();
let set_transactions = self
.set_transactions
.iter()
.map(|txn| (txn.app_id.clone(), txn.clone()))
.collect();
let is_incremental_safe = self
.operation
.as_deref()
.is_some_and(is_incremental_safe_operation);
Ok(CrcDelta {
file_stats,
protocol: self
.should_emit_protocol
.then(|| self.effective_table_config.protocol().clone()),
metadata: self
.should_emit_metadata
.then(|| self.effective_table_config.metadata().clone()),
domain_metadata,
set_transactions,
in_commit_timestamp,
is_incremental_safe,
})
}
fn into_conflicted(self, conflict_version: Version) -> ConflictedTransaction<S> {
ConflictedTransaction {
transaction: self,
conflict_version,
}
}
fn into_retryable(self, error: Error) -> RetryableTransaction<S> {
RetryableTransaction {
transaction: self,
error,
}
}
#[instrument(name = "txn.gen_removes", skip_all, err)]
fn generate_remove_actions<'a>(
&'a self,
engine: &dyn Engine,
remove_files_metadata: impl Iterator<Item = &'a FilteredEngineData> + Send + 'a,
columns_to_drop: &'a [&str],
) -> DeltaResult<impl Iterator<Item = DeltaResult<FilteredEngineData>> + Send + 'a> {
if self.is_create_table() && !self.remove_files_metadata.is_empty() {
return Err(Error::internal_error(
"CREATE TABLE transaction cannot have remove actions",
));
}
let input_schema = scan_row_schema();
let target_schema = schema_with_all_fields_nullable(get_log_remove_schema());
let evaluation_handler = engine.evaluation_handler();
let make_eval = |coalesce_stats_with_parsed: bool| -> DeltaResult<_> {
let patch = build_remove_struct_patch(
self.commit_timestamp,
self.data_change,
columns_to_drop,
coalesce_stats_with_parsed,
)?;
let expr = Arc::new(Expression::struct_from([Expression::struct_patch(patch)?]));
evaluation_handler.new_expression_evaluator(
input_schema.clone(),
expr,
target_schema.clone().into(),
)
};
let base_eval = Arc::new(make_eval(false)?);
let stats_parsed_eval = Arc::new(make_eval(true)?);
let stats_parsed_col = ColumnName::new([STATS_PARSED_NAME]);
Ok(remove_files_metadata.map(move |file_metadata_batch| {
let data = file_metadata_batch.data();
let evaluator = if data.has_field(&stats_parsed_col) {
&stats_parsed_eval
} else {
&base_eval
};
let updated_engine_data = evaluator.evaluate(data)?;
FilteredEngineData::try_new(
updated_engine_data,
file_metadata_batch.selection_vector().to_vec(),
)
}))
}
}
fn build_remove_struct_patch(
commit_timestamp: i64,
data_change: bool,
columns_to_drop: &[&str],
coalesce_stats_with_parsed: bool,
) -> DeltaResult<ExpressionStructPatch> {
let mut patch = ExpressionStructPatchBuilder::new()
.insert_after("path", lit(commit_timestamp))
.insert_after("path", lit(data_change))
.insert_after("path", lit(true))
.insert_after("path", col!(FILE_CONSTANT_VALUES_NAME, "partitionValues"));
if coalesce_stats_with_parsed {
let coalesce_stats = Expression::coalesce([
col!("stats"),
Expression::unary(ToJson, col!(STATS_PARSED_NAME)),
]);
patch = patch
.replace("stats", coalesce_stats)
.drop(STATS_PARSED_NAME);
}
patch = patch
.insert_after("stats", col!(FILE_CONSTANT_VALUES_NAME, TAGS_NAME))
.insert_after(
"deletionVector",
col!(FILE_CONSTANT_VALUES_NAME, BASE_ROW_ID_NAME),
)
.insert_after(
"deletionVector",
col!(FILE_CONSTANT_VALUES_NAME, DEFAULT_ROW_COMMIT_VERSION_NAME),
)
.drop(FILE_CONSTANT_VALUES_NAME)
.drop("modificationTime")
.drop_if_exists(PARTITION_VALUES_PARSED_NAME);
for column_to_drop in columns_to_drop {
patch = patch.drop(*column_to_drop);
}
patch.build()
}
#[derive(Debug)]
pub struct PostCommitStats {
pub commits_since_checkpoint: u64,
pub commits_since_log_compaction: u64,
}
#[derive(Debug)]
#[must_use]
pub enum CommitResult<S = ExistingTable> {
CommittedTransaction(CommittedTransaction),
ConflictedTransaction(ConflictedTransaction<S>),
RetryableTransaction(RetryableTransaction<S>),
}
impl<S> CommitResult<S> {
pub fn is_committed(&self) -> bool {
matches!(self, CommitResult::CommittedTransaction(_))
}
}
impl<S: std::fmt::Debug> CommitResult<S> {
#[cfg(any(test, feature = "test-utils"))]
#[allow(clippy::panic)]
pub fn unwrap_committed(self) -> CommittedTransaction {
match self {
CommitResult::CommittedTransaction(c) => c,
other => panic!("Expected CommittedTransaction, got: {other:?}"),
}
}
#[cfg(any(test, feature = "test-utils"))]
#[allow(clippy::panic, clippy::expect_used)]
pub fn unwrap_post_commit_snapshot(self) -> SnapshotRef {
self.unwrap_committed()
.post_commit_snapshot()
.expect("expected post-commit snapshot")
.clone()
}
}
#[derive(Debug)]
pub struct CommittedTransaction {
commit_version: Version,
post_commit_stats: PostCommitStats,
post_commit_snapshot: Option<SnapshotRef>,
}
impl CommittedTransaction {
pub fn commit_version(&self) -> Version {
self.commit_version
}
pub fn post_commit_stats(&self) -> &PostCommitStats {
&self.post_commit_stats
}
pub fn post_commit_snapshot(&self) -> Option<&SnapshotRef> {
self.post_commit_snapshot.as_ref()
}
}
#[derive(Debug)]
pub struct ConflictedTransaction<S = ExistingTable> {
#[allow(dead_code)]
transaction: Transaction<S>,
conflict_version: Version,
}
impl<S> ConflictedTransaction<S> {
pub fn conflict_version(&self) -> Version {
self.conflict_version
}
}
#[derive(Debug)]
pub struct RetryableTransaction<S = ExistingTable> {
pub transaction: Transaction<S>,
pub error: Error,
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Mutex;
use ::test_utils::get_column;
use rstest::rstest;
use url::Url;
use super::*;
use crate::actions::deletion_vector::DeletionVectorDescriptor;
use crate::actions::CommitInfo;
use crate::arrow::array::{
ArrayRef, Float64Array, Int32Array, Int64Array, NullArray, StringArray,
};
use crate::arrow::datatypes::{
DataType as ArrowDataType, Field as ArrowField, Schema as ArrowSchema,
};
use crate::arrow::record_batch::RecordBatch;
use crate::committer::{FileSystemCommitter, PublishMetadata};
use crate::engine::arrow_conversion::{TryFromArrow, TryIntoArrow};
use crate::engine::arrow_data::ArrowEngineData;
use crate::engine::arrow_expression::ArrowEvaluationHandler;
use crate::engine::sync::SyncEngine;
use crate::expressions::{MapData, Scalar, StructData};
use crate::metrics::{MetricEvent, TableType, TransactionCommitFailure};
use crate::object_store::memory::InMemory;
use crate::object_store::path::Path;
use crate::object_store::ObjectStoreExt as _;
use crate::schema::MapType;
use crate::table_features::ColumnMappingMode;
use crate::transaction::create_table::create_table;
use crate::transaction::data_layout::DataLayout;
use crate::utils::test_utils::{
install_thread_local_metrics_reporter, load_test_table, string_array_to_engine_data,
test_schema_flat, test_schema_nested, test_schema_with_array, test_schema_with_map,
CapturingReporter,
};
use crate::{DeltaResultIterator, EvaluationHandler, Snapshot};
impl Transaction {
fn with_clustering_columns_for_test(mut self, columns: Vec<ColumnName>) -> Self {
self.physical_clustering_columns = Some(columns);
self
}
}
struct IoErrorCommitter;
impl Committer for IoErrorCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: DeltaResultIterator<'_, FilteredEngineData>,
_commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
Err(Error::IOError(std::io::Error::other("simulated IO error")))
}
fn is_catalog_committer(&self) -> bool {
false
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
struct GenericErrorCommitter;
impl Committer for GenericErrorCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: DeltaResultIterator<'_, FilteredEngineData>,
_commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
Err(Error::generic("simulated commit error"))
}
fn is_catalog_committer(&self) -> bool {
false
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
struct MockCatalogCommitter;
impl Committer for MockCatalogCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: DeltaResultIterator<'_, FilteredEngineData>,
_commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
Ok(CommitResponse::Conflict { version: 0 })
}
fn is_catalog_committer(&self) -> bool {
true
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
fn setup_dv_enabled_table() -> (SyncEngine, Arc<Snapshot>) {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(&engine)
.unwrap();
(engine, snapshot)
}
fn setup_non_dv_table() -> (SyncEngine, Arc<Snapshot>) {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-without-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
(engine, snapshot)
}
fn setup_dv_supported_but_disabled_table() -> DeltaResult<(Arc<dyn Engine>, Arc<Snapshot>)> {
let storage = Arc::new(InMemory::new());
let table_root = url::Url::parse("memory:///").unwrap();
let engine = Arc::new(SyncEngine::new_with_store(storage.clone()));
let schema_json = serde_json::json!({
"type": "struct",
"fields": [{
"name": "id",
"type": "integer",
"nullable": true,
"metadata": {}
}]
});
let actions = [
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":["deletionVectors"],"writerFeatures":["deletionVectors"]}}"#.to_string(),
serde_json::json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": schema_json.to_string(),
"partitionColumns": [],
"configuration": {},
"createdTime": 1234567890
}
})
.to_string(),
]
.join("\n");
let commit_path = Path::from("_delta_log/00000000000000000000.json");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(storage.put(&commit_path, actions.into()))?;
let engine: Arc<dyn Engine> = engine;
let snapshot = Snapshot::builder_for(table_root).build(engine.as_ref())?;
Ok((engine, snapshot))
}
fn create_test_dv_descriptor(path_suffix: &str) -> DeletionVectorDescriptor {
use crate::actions::deletion_vector::{
DeletionVectorDescriptor, DeletionVectorStorageType,
};
DeletionVectorDescriptor {
storage_type: DeletionVectorStorageType::PersistedRelative,
path_or_inline_dv: format!("dv_{path_suffix}"),
offset: Some(0),
size_in_bytes: 100,
cardinality: 1,
}
}
fn create_dv_transaction(
snapshot: Arc<Snapshot>,
engine: &dyn Engine,
) -> DeltaResult<Transaction> {
Ok(snapshot
.transaction(Box::new(FileSystemCommitter::new()), engine)?
.with_operation("DELETE".to_string())
.with_engine_info("test_engine"))
}
#[test]
fn test_add_files_schema() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url)
.at_version(1)
.build(&engine)
.unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let schema = txn.add_files_schema();
let expected = StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::nullable(
"stats",
DataType::struct_type_unchecked(vec![
StructField::nullable(NUM_RECORDS, DataType::LONG),
StructField::nullable(NULL_COUNT, DataType::struct_type_unchecked(vec![])),
StructField::nullable(MIN_VALUES, DataType::struct_type_unchecked(vec![])),
StructField::nullable(MAX_VALUES, DataType::struct_type_unchecked(vec![])),
StructField::nullable(TIGHT_BOUNDS, DataType::BOOLEAN),
]),
),
]);
assert_eq!(*schema, expected.into());
Ok(())
}
#[rstest]
#[case::base(false)]
#[case::row_tracking(true)]
fn test_add_action_projection_schema(#[case] row_tracking: bool) -> DeltaResult<()> {
let input_schema = if row_tracking {
with_row_tracking_cols(&BASE_ADD_FILES_SCHEMA)?
} else {
BASE_ADD_FILES_SCHEMA.clone()
};
let (schema, _) = build_add_action_projection(input_schema.as_ref(), true)?;
let field_names: Vec<_> = schema.fields().map(|f| f.name().as_str()).collect();
let expected_field_names = if row_tracking {
vec![
"path",
"partitionValues",
"size",
"modificationTime",
"dataChange",
"stats",
"baseRowId",
"defaultRowCommitVersion",
]
} else {
vec![
"path",
"partitionValues",
"size",
"modificationTime",
"dataChange",
"stats",
]
};
assert_eq!(field_names, expected_field_names);
assert_eq!(schema.field("dataChange"), Some(&*DATA_CHANGE_COLUMN));
assert_eq!(
schema.field("stats").unwrap().data_type(),
&DataType::STRING
);
Ok(())
}
#[test]
fn test_new_deletion_vector_path() -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path =
std::fs::canonicalize(PathBuf::from("./tests/data/table-with-dv-small/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url.clone())
.at_version(1)
.build(&engine)
.unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let write_context = txn.unpartitioned_write_context().unwrap();
let dv_path1 = write_context.new_deletion_vector_path(String::from(""));
let abs_path1 = dv_path1.absolute_path()?;
assert!(abs_path1.as_str().contains(url.as_str()));
let prefix = String::from("dv_test");
let dv_path2 = write_context.new_deletion_vector_path(prefix.clone());
let abs_path2 = dv_path2.absolute_path()?;
assert!(abs_path2.as_str().contains(url.as_str()));
assert!(abs_path2.as_str().contains(&prefix));
let dv_path3 = write_context.new_deletion_vector_path(prefix.clone());
let abs_path3 = dv_path3.absolute_path()?;
assert_ne!(abs_path2, abs_path3);
Ok(())
}
#[test]
fn write_context_reflects_updated_effective_table_config(
) -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_non_dv_table();
let mut txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let initial_write_context = txn.unpartitioned_write_context()?;
assert!(!initial_write_context
.logical_schema()
.contains("fresh_column"));
let mut evolved_fields: Vec<StructField> = txn
.effective_table_config
.logical_schema()
.fields()
.cloned()
.collect();
evolved_fields.push(StructField::nullable("fresh_column", DataType::INTEGER));
let evolved_schema = Arc::new(StructType::new_unchecked(evolved_fields));
let evolved_metadata = txn
.effective_table_config
.metadata()
.clone()
.with_schema(evolved_schema.clone())?;
txn.effective_table_config = TableConfiguration::try_new_with_schema(
&txn.effective_table_config,
evolved_metadata,
evolved_schema,
)?;
let updated_write_context = txn.unpartitioned_write_context()?;
assert!(updated_write_context
.logical_schema()
.contains("fresh_column"));
assert!(updated_write_context
.physical_schema()
.contains("fresh_column"));
assert!(!initial_write_context
.logical_schema()
.contains("fresh_column"));
Ok(())
}
#[test]
fn test_write_context_schemas_exclude_partition_columns(
) -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from("./tests/data/basic_partitioned/")).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine).unwrap();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)?
.with_engine_info("default engine");
let write_context = txn.partitioned_write_context(HashMap::from([(
"letter".to_string(),
Scalar::String("a".into()),
)]))?;
let logical_schema = write_context.logical_schema();
let physical_schema = write_context.physical_schema();
assert!(
!logical_schema.contains("letter"),
"Logical schema should not contain partition column 'letter'"
);
assert!(
!physical_schema.contains("letter"),
"Physical schema should not contain partition column 'letter' (stored in path)"
);
assert!(
logical_schema.contains("number"),
"Logical schema should contain data column 'number'"
);
assert!(
physical_schema.contains("number"),
"Physical schema should contain data column 'number'"
);
Ok(())
}
fn snapshot_and_partitioned_write_context(
table_path: &str,
partition_values: HashMap<String, Scalar>,
) -> Result<(Arc<Snapshot>, WriteContext), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from(table_path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let txn = snapshot
.clone()
.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let wc = txn.partitioned_write_context(partition_values)?;
Ok((snapshot, wc))
}
fn eval_logical_to_physical(
wc: &WriteContext,
batch: RecordBatch,
) -> Result<RecordBatch, Box<dyn std::error::Error>> {
let input_schema = StructType::try_from_arrow(batch.schema())?;
let physical_schema = wc.physical_schema();
let l2p = wc.logical_to_physical();
let handler = ArrowEvaluationHandler;
let evaluator = handler.new_expression_evaluator(
input_schema.into(),
l2p,
physical_schema.clone().into(),
)?;
let result = ArrowEngineData::try_from_engine_data(
evaluator.evaluate(&ArrowEngineData::new(batch))?,
)?;
Ok(result.record_batch().clone())
}
#[rstest]
#[case::not_materialized("./tests/data/basic_partitioned/", false)]
#[case::materialized("./tests/data/partitioned_with_materialize_feature/", true)]
fn test_partition_columns_materialized_in_logical_to_physical(
#[case] table_path: &str,
#[case] materialized: bool,
) -> Result<(), Box<dyn std::error::Error>> {
let (snapshot, wc) = snapshot_and_partitioned_write_context(
table_path,
HashMap::from([("letter".to_string(), Scalar::String("a".into()))]),
)?;
assert_eq!(
snapshot
.table_configuration()
.protocol()
.has_table_feature(&TableFeature::MaterializePartitionColumns),
materialized
);
let input_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("number", ArrowDataType::Int64, true),
ArrowField::new("a_float", ArrowDataType::Float64, true),
]));
let batch = RecordBatch::try_new(
input_schema,
vec![
Arc::new(Int64Array::from(vec![42])) as ArrayRef,
Arc::new(Float64Array::from(vec![1.5])),
],
)?;
let rb = eval_logical_to_physical(&wc, batch)?;
let rb_schema = rb.schema();
let names: Vec<&str> = rb_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
if materialized {
assert_eq!(names, vec!["letter", "number", "a_float"]);
assert_eq!(get_column!(rb, "letter", StringArray).value(0), "a");
} else {
assert_eq!(names, vec!["number", "a_float"]);
}
Ok(())
}
#[rstest]
#[case::cm_none(ColumnMappingMode::None)]
#[case::cm_name(ColumnMappingMode::Name)]
#[case::cm_id(ColumnMappingMode::Id)]
fn test_materialized_partition_column_insert(
#[case] cm_mode: ColumnMappingMode,
) -> Result<(), Box<dyn std::error::Error>> {
let cm = match cm_mode {
ColumnMappingMode::None => "none",
ColumnMappingMode::Name => "name",
ColumnMappingMode::Id => "id",
};
let engine: Arc<dyn Engine> =
Arc::new(SyncEngine::new_with_store(Arc::new(InMemory::new())));
let schema = Arc::new(StructType::try_new(vec![
StructField::nullable("p1", DataType::STRING),
StructField::nullable("p2", DataType::INTEGER),
StructField::nullable("d1", DataType::INTEGER),
StructField::nullable("v", DataType::VOID),
StructField::nullable("p3", DataType::STRING),
StructField::nullable("p4", DataType::INTEGER),
StructField::nullable("d2", DataType::INTEGER),
])?);
let txn = create_table("memory:///t", schema, "DefaultEngine")
.with_data_layout(DataLayout::partitioned(["p1", "p2", "p3", "p4"]))
.with_table_properties([
("delta.feature.materializePartitionColumns", "supported"),
("delta.columnMapping.mode", cm),
])
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
let wc = txn.partitioned_write_context(HashMap::from([
("p1".to_string(), Scalar::String("aa".into())),
("p2".to_string(), Scalar::Integer(7)),
("p3".to_string(), Scalar::String("cc".into())),
("p4".to_string(), Scalar::Integer(9)),
]))?;
let input_schema = Arc::new(ArrowSchema::new(vec![
ArrowField::new("d1", ArrowDataType::Int32, true),
ArrowField::new("v", ArrowDataType::Null, true),
ArrowField::new("d2", ArrowDataType::Int32, true),
]));
let batch = RecordBatch::try_new(
input_schema,
vec![
Arc::new(Int32Array::from(vec![10])) as ArrayRef,
Arc::new(NullArray::new(1)),
Arc::new(Int32Array::from(vec![20])),
],
)?;
let rb = eval_logical_to_physical(&wc, batch)?;
let rb_schema = rb.schema();
let names: Vec<&str> = rb_schema
.fields()
.iter()
.map(|f| f.name().as_str())
.collect();
let physical_schema = wc.physical_schema();
let expected_names: Vec<&str> = physical_schema
.fields()
.map(|f| f.name().as_str())
.collect();
assert_eq!(names, expected_names);
assert_eq!(get_column!(rb, names[0], StringArray).value(0), "aa"); assert_eq!(get_column!(rb, names[1], Int32Array).value(0), 7); assert_eq!(get_column!(rb, names[2], Int32Array).value(0), 10); assert_eq!(get_column!(rb, names[3], StringArray).value(0), "cc"); assert_eq!(get_column!(rb, names[4], Int32Array).value(0), 9); assert_eq!(get_column!(rb, names[5], Int32Array).value(0), 20); Ok(())
}
#[test]
fn test_physical_schema_includes_partition_columns_when_materialized(
) -> Result<(), Box<dyn std::error::Error>> {
let (_snapshot, write_context) = snapshot_and_partitioned_write_context(
"./tests/data/partitioned_with_materialize_feature/",
HashMap::from([("letter".to_string(), Scalar::String("a".into()))]),
)?;
let physical_schema = write_context.physical_schema();
assert!(
physical_schema.contains("letter"),
"Partition column 'letter' should be in physical schema when materialized"
);
assert!(
physical_schema.contains("number"),
"Non-partition column 'number' should be in physical schema"
);
Ok(())
}
#[rstest]
#[case::partitioned_on_unpartitioned(
"./tests/data/table-without-dv-small/",
true,
"not partitioned"
)]
#[case::unpartitioned_on_partitioned(
"./tests/data/basic_partitioned/",
false,
"table is partitioned"
)]
fn test_wrong_write_context_method_returns_error(
#[case] table_path: &str,
#[case] call_partitioned: bool,
#[case] expected_msg: &str,
) -> Result<(), Box<dyn std::error::Error>> {
let engine = SyncEngine::new();
let path = std::fs::canonicalize(PathBuf::from(table_path)).unwrap();
let url = url::Url::from_directory_path(path).unwrap();
let snapshot = Snapshot::builder_for(url).build(&engine)?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), &engine)?;
let result = if call_partitioned {
txn.partitioned_write_context(HashMap::from([("x".to_string(), Scalar::Integer(1))]))
} else {
txn.unpartitioned_write_context()
};
let err = result.unwrap_err().to_string();
assert!(
err.contains(expected_msg),
"expected '{expected_msg}' in error, got: {err}"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_unsupported_table() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_non_dv_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let dv_map = HashMap::new();
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
let err = result.expect_err("Should fail on table without DV support");
let err_msg = err.to_string();
assert!(
err_msg.contains("Deletion vector")
&& (err_msg.contains("require") || err_msg.contains("version")),
"Expected protocol error about DV requirements, got: {err_msg}"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_requires_enablement_property(
) -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_supported_but_disabled_table()?;
let mut txn = create_dv_transaction(snapshot, engine.as_ref())?;
let err = txn
.update_deletion_vectors(HashMap::new(), std::iter::empty())
.expect_err("DV updates should require delta.enableDeletionVectors=true");
assert!(
matches!(err, Error::Unsupported(_)),
"unexpected error: {err}"
);
assert!(
err.to_string().contains("delta.enableDeletionVectors"),
"error should mention the enablement property, got: {err}"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_mismatch_count() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let mut dv_map = HashMap::new();
let descriptor = create_test_dv_descriptor("non_existent");
dv_map.insert("non_existent_file.parquet".to_string(), descriptor);
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
assert!(
result.is_err(),
"Should fail when DV descriptors don't match scan files"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("matched") && err_msg.contains("does not match"),
"Expected error about mismatched count (expected 1 descriptor, 0 matched files), got: {err_msg}");
Ok(())
}
#[test]
fn test_update_deletion_vectors_mismatch_does_not_mutate_transaction(
) -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot.clone(), &engine)?;
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan
.scan_metadata(&engine)?
.collect::<DeltaResult<Vec<_>>>()?;
let mut paths = Vec::new();
for metadata in &scan_metadata {
paths =
metadata.visit_scan_files(paths, |paths, scan_file| paths.push(scan_file.path))?;
}
let existing_path = paths
.into_iter()
.next()
.ok_or_else(|| Error::generic("expected at least one scan file"))?;
let mut dv_map = HashMap::new();
dv_map.insert(existing_path, create_test_dv_descriptor("matched"));
dv_map.insert(
"non_existent_file.parquet".to_string(),
create_test_dv_descriptor("missing"),
);
let result = txn.update_deletion_vectors(
dv_map,
scan_metadata
.into_iter()
.map(|metadata| Ok(metadata.scan_files)),
);
assert!(
result.is_err(),
"Should fail when only some DV descriptors match scan files"
);
assert!(
txn.dv_matched_files.is_empty(),
"Failed DV update should not leave staged file updates"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_iter_error_does_not_mutate_transaction(
) -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot.clone(), &engine)?;
txn.dv_matched_files
.push(FilteredEngineData::with_all_rows_selected(
string_array_to_engine_data(StringArray::from(vec!["sentinel"])),
));
let staged_len_before = txn.dv_matched_files.len();
let scan = snapshot.scan_builder().build()?;
let scan_metadata = scan
.scan_metadata(&engine)?
.collect::<DeltaResult<Vec<_>>>()?;
let mut paths = Vec::new();
for metadata in &scan_metadata {
paths =
metadata.visit_scan_files(paths, |paths, scan_file| paths.push(scan_file.path))?;
}
let existing_path = paths
.into_iter()
.next()
.ok_or_else(|| Error::generic("expected at least one scan file"))?;
let mut dv_map = HashMap::new();
dv_map.insert(existing_path, create_test_dv_descriptor("matched"));
let result = txn.update_deletion_vectors(
dv_map,
scan_metadata
.into_iter()
.map(|metadata| Ok(metadata.scan_files))
.chain(std::iter::once(Err(Error::generic(
"simulated scan metadata failure",
)))),
);
assert!(result.is_err(), "iterator error should propagate");
assert_eq!(
txn.dv_matched_files.len(),
staged_len_before,
"Failed DV update should not stage additional file updates"
);
Ok(())
}
#[test]
fn test_update_deletion_vectors_empty_inputs() -> Result<(), Box<dyn std::error::Error>> {
let (engine, snapshot) = setup_dv_enabled_table();
let mut txn = create_dv_transaction(snapshot, &engine)?;
let dv_map = HashMap::new();
let result = txn.update_deletion_vectors(dv_map, std::iter::empty());
assert!(
result.is_ok(),
"Empty DV updates should succeed as no-op, got error: {result:?}"
);
Ok(())
}
fn add_dummy_file<S: SupportsDataFiles>(txn: &mut Transaction<S>) {
let data = string_array_to_engine_data(StringArray::from(vec!["dummy"]));
txn.add_files(data);
}
fn create_existing_table_txn(
) -> DeltaResult<(Arc<dyn Engine>, Transaction, Option<tempfile::TempDir>)> {
let (engine, snapshot, tempdir) = load_test_table("table-without-dv-small")?;
let txn = snapshot.transaction(Box::new(FileSystemCommitter::new()), engine.as_ref())?;
Ok((engine, txn, tempdir))
}
#[test]
fn test_validate_blind_append_success() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
txn.validate_blind_append_semantics()?;
Ok(())
}
#[test]
fn test_validate_blind_append_requires_adds() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_requires_data_change() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
txn.set_data_change(false);
add_dummy_file(&mut txn);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_removes() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let remove_data = FilteredEngineData::with_all_rows_selected(string_array_to_engine_data(
StringArray::from(vec!["remove"]),
));
txn.remove_files(remove_data);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_dv_updates() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let dv_data = FilteredEngineData::with_all_rows_selected(string_array_to_engine_data(
StringArray::from(vec!["dv"]),
));
txn.dv_matched_files.push(dv_data);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_validate_blind_append_rejects_create_table() -> DeltaResult<()> {
let tempdir = tempfile::tempdir()?;
let schema = Arc::new(StructType::try_new(vec![StructField::nullable(
"id",
DataType::INTEGER,
)])?);
let engine = Arc::new(crate::engine::sync::SyncEngine::new());
let mut txn = create_table(
tempdir.path().to_str().expect("valid temp path"),
schema,
"test_engine",
)
.build(engine.as_ref(), Box::new(FileSystemCommitter::new()))?;
txn.is_blind_append = true;
add_dummy_file(&mut txn);
let result = txn.validate_blind_append_semantics();
assert!(matches!(result, Err(Error::InvalidTransactionState(_))));
Ok(())
}
#[test]
fn test_blind_append_sets_commit_info_flag() -> Result<(), Box<dyn std::error::Error>> {
let commit_info = CommitInfo::new(1, None, None, None, true);
assert_eq!(commit_info.is_blind_append, Some(true));
let commit_info_false = CommitInfo::new(1, None, None, None, false);
assert_eq!(commit_info_false.is_blind_append, None);
Ok(())
}
#[test]
fn test_blind_append_commit_rejects_no_adds() -> DeltaResult<()> {
let (_engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
let err = txn
.commit(_engine.as_ref())
.expect_err("Blind append with no adds should fail");
assert!(
err.to_string()
.contains("Blind append requires at least one added data file"),
"Unexpected error: {err}"
);
Ok(())
}
#[test]
fn test_blind_append_commit_success() -> DeltaResult<()> {
let (engine, mut txn, _tempdir) = create_existing_table_txn()?;
txn = txn.with_blind_append();
add_dummy_file(&mut txn);
let result = txn.commit(engine.as_ref());
if let Err(e) = result {
assert!(
!matches!(e, Error::InvalidTransactionState(_)),
"Blind append validation should have passed, got: {e}"
);
}
Ok(())
}
#[test]
fn test_commit_io_error_returns_retryable_transaction() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let mut txn = snapshot.transaction(Box::new(IoErrorCommitter), engine.as_ref())?;
add_dummy_file(&mut txn);
let result = txn.commit(engine.as_ref())?;
assert!(
matches!(result, CommitResult::RetryableTransaction(_)),
"Expected RetryableTransaction, got: {result:?}"
);
if let CommitResult::RetryableTransaction(retryable) = result {
assert!(
retryable.error.to_string().contains("simulated IO error"),
"Unexpected error: {}",
retryable.error
);
}
Ok(())
}
#[test]
fn test_existing_table_txn_debug() -> DeltaResult<()> {
let (_engine, txn, _tempdir) = create_existing_table_txn()?;
let debug_str = format!("{txn:?}");
assert!(
debug_str.contains("Transaction") && debug_str.contains("read_snapshot version"),
"Debug output should contain Transaction info: {debug_str}"
);
assert!(
!debug_str.contains("create_table"),
"Existing table debug should not contain create_table: {debug_str}"
);
Ok(())
}
#[rstest]
#[case::flat_none(test_schema_flat(), ColumnMappingMode::None)]
#[case::flat_name(test_schema_flat(), ColumnMappingMode::Name)]
#[case::flat_id(test_schema_flat(), ColumnMappingMode::Id)]
#[case::nested_none(test_schema_nested(), ColumnMappingMode::None)]
#[case::nested_name(test_schema_nested(), ColumnMappingMode::Name)]
#[case::nested_id(test_schema_nested(), ColumnMappingMode::Id)]
#[case::map_none(test_schema_with_map(), ColumnMappingMode::None)]
#[case::map_name(test_schema_with_map(), ColumnMappingMode::Name)]
#[case::map_id(test_schema_with_map(), ColumnMappingMode::Id)]
#[case::array_none(test_schema_with_array(), ColumnMappingMode::None)]
#[case::array_name(test_schema_with_array(), ColumnMappingMode::Name)]
#[case::array_id(test_schema_with_array(), ColumnMappingMode::Id)]
fn test_physical_schema_column_mapping(
#[case] schema: SchemaRef,
#[case] mode: ColumnMappingMode,
) -> DeltaResult<()> {
let (_engine, txn) = crate::utils::test_utils::setup_column_mapping_txn(schema, mode)?;
let write_context = txn.unpartitioned_write_context().unwrap();
crate::utils::test_utils::validate_physical_schema_column_mapping(
write_context.logical_schema(),
write_context.physical_schema(),
mode,
);
Ok(())
}
fn build_test_record_batch() -> DeltaResult<Box<dyn EngineData>> {
let schema = test_schema_nested();
let tag_type = MapType::new(DataType::STRING, DataType::STRING, true);
let score_type = ArrayType::new(DataType::INTEGER, true);
let info_fields = vec![
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
StructField::nullable("tags", tag_type.clone()),
StructField::nullable("scores", score_type.clone()),
];
let info1 = Scalar::Struct(StructData::try_new(
info_fields.clone(),
vec![
"alice".into(),
30i32.into(),
Scalar::Map(MapData::try_new(tag_type.clone(), [("k1", "v1")])?),
Scalar::Array(ArrayData::try_new(score_type.clone(), [10i32, 20i32])?),
],
)?);
let info2 = Scalar::Struct(StructData::try_new(
info_fields,
vec![
"bob".into(),
25i32.into(),
Scalar::Map(MapData::try_new(tag_type, [("k2", "v2")])?),
Scalar::Array(ArrayData::try_new(score_type, [30i32])?),
],
)?);
ArrowEvaluationHandler.create_many(schema, &[&[1i64.into(), info1], &[2i64.into(), info2]])
}
fn validate_logical_to_physical_transform(mode: ColumnMappingMode) -> DeltaResult<()> {
let schema = test_schema_nested();
let (_engine, txn) = crate::utils::test_utils::setup_column_mapping_txn(schema, mode)?;
let write_context = txn.unpartitioned_write_context().unwrap();
let logical_schema = write_context.logical_schema();
let physical_schema = write_context.physical_schema();
let logical_to_physical_expression = write_context.logical_to_physical();
if mode != ColumnMappingMode::None {
assert_ne!(
logical_schema, physical_schema,
"Physical schema should differ from logical schema when column mapping is enabled"
);
}
let data = build_test_record_batch()?;
let input_schema: SchemaRef = logical_schema.clone();
let handler = ArrowEvaluationHandler;
let evaluator = handler.new_expression_evaluator(
input_schema,
logical_to_physical_expression.clone(),
physical_schema.clone().into(),
)?;
let result = evaluator.evaluate(data.as_ref())?;
let result = ArrowEngineData::try_from_engine_data(result)?;
let result_batch = result.record_batch();
let expected_arrow_schema: ArrowSchema = physical_schema.as_ref().try_into_arrow()?;
assert_eq!(result_batch.schema().as_ref(), &expected_arrow_schema);
let id_col = result_batch
.column(0)
.as_any()
.downcast_ref::<Int64Array>()
.expect("id column should be Int64");
assert_eq!(id_col.values(), &[1i64, 2]);
Ok(())
}
#[rstest]
#[case::name_mode(ColumnMappingMode::Name)]
#[case::id_mode(ColumnMappingMode::Id)]
#[case::none_mode(ColumnMappingMode::None)]
fn test_logical_to_physical_transform(#[case] mode: ColumnMappingMode) -> DeltaResult<()> {
validate_logical_to_physical_transform(mode)
}
enum TestFileStats {
None,
Present,
AllNull,
}
fn create_test_add_files(paths: Vec<&str>, stats: Vec<TestFileStats>) -> Box<dyn EngineData> {
let value_fields = vec![StructField::nullable("value", DataType::LONG)];
let value_struct_type = DataType::struct_type_unchecked(value_fields.clone());
let stats_type = DataType::struct_type_unchecked(vec![
StructField::nullable(NUM_RECORDS, DataType::LONG),
StructField::nullable(NULL_COUNT, value_struct_type.clone()),
StructField::nullable(MIN_VALUES, value_struct_type.clone()),
StructField::nullable(MAX_VALUES, value_struct_type.clone()),
]);
let stats_fields = vec![
StructField::nullable(NUM_RECORDS, DataType::LONG),
StructField::nullable(NULL_COUNT, value_struct_type.clone()),
StructField::nullable(MIN_VALUES, value_struct_type.clone()),
StructField::nullable(MAX_VALUES, value_struct_type),
];
let schema = Arc::new(StructType::new_unchecked(vec![
StructField::not_null("path", DataType::STRING),
StructField::not_null(
"partitionValues",
MapType::new(DataType::STRING, DataType::STRING, true),
),
StructField::not_null("size", DataType::LONG),
StructField::not_null("modificationTime", DataType::LONG),
StructField::nullable("stats", stats_type.clone()),
]));
let empty_map = Scalar::Map(
MapData::try_new(
MapType::new(DataType::STRING, DataType::STRING, true),
Vec::<(&str, &str)>::new(),
)
.unwrap(),
);
let rows: Vec<Vec<Scalar>> = paths
.iter()
.zip(stats.iter())
.map(|(path, stat)| {
let stats_scalar = match stat {
TestFileStats::None => Scalar::Null(stats_type.clone()),
TestFileStats::Present | TestFileStats::AllNull => {
let value_struct = |v: Option<i64>| {
let scalar = v.map_or(Scalar::Null(DataType::LONG), |n| n.into());
Scalar::Struct(
StructData::try_new(value_fields.clone(), vec![scalar]).unwrap(),
)
};
let (null_count, min, max) = match stat {
TestFileStats::Present => (
value_struct(Some(0)),
value_struct(Some(1)),
value_struct(Some(100)),
),
_ => (
value_struct(Some(100)),
value_struct(None),
value_struct(None),
),
};
Scalar::Struct(
StructData::try_new(
stats_fields.clone(),
vec![100i64.into(), null_count, min, max],
)
.unwrap(),
)
}
};
vec![
(*path).into(),
empty_map.clone(),
1024i64.into(),
1000000i64.into(),
stats_scalar,
]
})
.collect();
let row_refs: Vec<&[Scalar]> = rows.iter().map(|r| r.as_slice()).collect();
ArrowEvaluationHandler
.create_many(schema, &row_refs)
.unwrap()
}
#[test]
fn test_stats_validation_allows_all_null_clustering_column() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::AllNull]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should pass for all-null clustering columns, got: {result:?}",
);
}
#[test]
fn test_stats_validation_when_clustering_cols_missing_stats() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::None]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_err(),
"Expected validation to fail when stats are missing for clustering columns"
);
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("Stats validation error") || err_msg.contains("no stats"),
"Expected stats validation error, got: {err_msg}"
);
}
#[test]
fn test_stats_validation_when_clustering_stats_present() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string())
.with_clustering_columns_for_test(vec![ColumnName::new(["value"])]);
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::Present]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should pass when stats are present, got: {result:?}"
);
}
#[test]
fn test_stats_validation_skipped_without_clustering() {
let (engine, snapshot) = setup_non_dv_table();
let txn = snapshot
.transaction(Box::new(FileSystemCommitter::new()), &engine)
.unwrap()
.with_operation("WRITE".to_string());
let add_files = create_test_add_files(vec!["file1.parquet"], vec![TestFileStats::None]);
let result = txn.validate_add_files_stats(&[add_files]);
assert!(
result.is_ok(),
"Stats validation should be skipped without clustering, got: {result:?}"
);
}
#[test]
fn disallow_catalog_committer_for_non_catalog_managed_table() {
let storage = Arc::new(InMemory::new());
let table_root = url::Url::parse("memory:///").unwrap();
let engine = crate::engine::sync::SyncEngine::new_with_store(storage.clone());
let actions = [
r#"{"commitInfo":{"timestamp":12345678900,"inCommitTimestamp":12345678900}}"#,
r#"{"protocol":{"minReaderVersion":3,"minWriterVersion":7,"readerFeatures":[],"writerFeatures":["inCommitTimestamp"]}}"#,
r#"{"metaData":{"id":"test-id","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[]}","partitionColumns":[],"configuration":{"delta.enableInCommitTimestamps":"true"},"createdTime":1234567890}}"#,
].join("\n");
let commit_path = Path::from("_delta_log/00000000000000000000.json");
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(storage.put(&commit_path, actions.into()))
.unwrap();
let snapshot = Snapshot::builder_for(table_root).build(&engine).unwrap();
let committer = Box::new(MockCatalogCommitter);
let err = snapshot
.transaction(committer, &engine)
.unwrap()
.commit(&engine)
.unwrap_err();
assert!(matches!(
err,
crate::Error::Generic(e) if e.contains("This table is path-based and cannot be committed to with a catalog committer")
));
}
#[test]
fn disallow_catalog_committer_for_non_catalog_managed_create_table() {
let storage = Arc::new(InMemory::new());
let engine = crate::engine::sync::SyncEngine::new_with_store(storage);
let schema = Arc::new(crate::schema::StructType::new_unchecked(vec![
crate::schema::StructField::new("id", crate::schema::DataType::INTEGER, true),
]));
let committer = Box::new(MockCatalogCommitter);
let err = create_table("memory:///", schema, "test-engine")
.build(&engine, committer)
.unwrap()
.commit(&engine)
.unwrap_err();
assert!(matches!(
err,
crate::Error::Generic(e) if e.contains("This table is path-based and cannot be committed to with a catalog committer")
));
}
struct CapturingCommitter {
captured: Arc<Mutex<Option<i64>>>,
}
impl CapturingCommitter {
fn new() -> (Self, Arc<Mutex<Option<i64>>>) {
let captured = Arc::new(Mutex::new(None));
(
Self {
captured: captured.clone(),
},
captured,
)
}
}
impl Committer for CapturingCommitter {
fn commit(
&self,
_engine: &dyn Engine,
_actions: DeltaResultIterator<'_, FilteredEngineData>,
commit_metadata: CommitMetadata,
) -> DeltaResult<CommitResponse> {
*self.captured.lock().unwrap() = Some(commit_metadata.in_commit_timestamp());
Ok(CommitResponse::Conflict {
version: commit_metadata.version(),
})
}
fn is_catalog_committer(&self) -> bool {
false
}
fn publish(
&self,
_engine: &dyn Engine,
_publish_metadata: PublishMetadata,
) -> DeltaResult<()> {
Ok(())
}
}
#[test]
fn test_commit_metadata_receives_ict_not_wall_time() -> DeltaResult<()> {
let tempdir = tempfile::tempdir().unwrap();
let log_dir = tempdir.path().join("_delta_log");
std::fs::create_dir_all(&log_dir).unwrap();
let future_ict: i64 = 9_999_999_999_999; let commit_info = serde_json::json!({
"commitInfo": {
"timestamp": 1000,
"operation": "WRITE",
"inCommitTimestamp": future_ict
}
});
let protocol = serde_json::json!({
"protocol": {
"minReaderVersion": 3,
"minWriterVersion": 7,
"readerFeatures": [],
"writerFeatures": ["inCommitTimestamp"]
}
});
let schema_json = serde_json::json!({
"type": "struct",
"fields": [{
"name": "id",
"type": "integer",
"nullable": true,
"metadata": {}
}]
});
let metadata = serde_json::json!({
"metaData": {
"id": "test-id",
"format": {"provider": "parquet", "options": {}},
"schemaString": schema_json.to_string(),
"partitionColumns": [],
"configuration": {
"delta.enableInCommitTimestamps": "true"
}
}
});
let commit0 = format!("{commit_info}\n{protocol}\n{metadata}\n");
std::fs::write(log_dir.join("00000000000000000000.json"), commit0).unwrap();
let table_url = Url::from_directory_path(tempdir.path()).unwrap();
let engine = SyncEngine::new();
let snapshot = Snapshot::builder_for(table_url).build(&engine)?;
let prev_ict = snapshot.get_in_commit_timestamp(&engine)?;
assert_eq!(prev_ict, Some(future_ict));
let (committer, captured_ts) = CapturingCommitter::new();
let mut txn = snapshot.transaction(Box::new(committer), &engine)?;
add_dummy_file(&mut txn);
let result = txn.commit(&engine)?;
assert!(
matches!(result, CommitResult::ConflictedTransaction(_)),
"Expected ConflictedTransaction from capturing committer"
);
let captured = captured_ts
.lock()
.unwrap()
.expect("should have captured a timestamp");
assert_eq!(
captured,
future_ict + 1,
"CommitMetadata.in_commit_timestamp should be the computed ICT (prev_ict + 1), \
not the wall-clock time"
);
Ok(())
}
fn commit_failure_event(reporter: &CapturingReporter) -> Option<TransactionCommitFailure> {
reporter.events().into_iter().find_map(|event| match event {
MetricEvent::TransactionCommitFailure(f) => Some(f),
_ => None,
})
}
#[test]
fn test_commit_io_error_emits_retryable_io_failure_metric() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let reporter = Arc::new(CapturingReporter::default());
let _guard = install_thread_local_metrics_reporter(reporter.clone());
let mut txn = snapshot.transaction(Box::new(IoErrorCommitter), engine.as_ref())?;
add_dummy_file(&mut txn);
let result = txn.commit(engine.as_ref())?;
assert!(matches!(result, CommitResult::RetryableTransaction(_)));
let failure = commit_failure_event(&reporter).expect("commit failure event");
assert_eq!(failure.reason, CommitFailureReason::RetryableIo);
assert_eq!(failure.table_type, TableType::PathBased);
Ok(())
}
#[test]
fn test_commit_terminal_error_emits_error_failure_metric() -> DeltaResult<()> {
let (engine, snapshot, _tempdir) = load_test_table("table-without-dv-small")?;
let reporter = Arc::new(CapturingReporter::default());
let _guard = install_thread_local_metrics_reporter(reporter.clone());
let mut txn = snapshot.transaction(Box::new(GenericErrorCommitter), engine.as_ref())?;
add_dummy_file(&mut txn);
assert!(txn.commit(engine.as_ref()).is_err());
let failure = commit_failure_event(&reporter).expect("commit failure event");
assert_eq!(failure.reason, CommitFailureReason::Error);
assert_eq!(failure.table_type, TableType::PathBased);
Ok(())
}
}