use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use itertools::Itertools;
use url::Url;
use uuid::Uuid;
use crate::actions::{DomainMetadata, Metadata, Protocol};
use crate::clustering::{create_clustering_domain_metadata, validate_clustering_columns};
use crate::committer::Committer;
use crate::expressions::ColumnName;
use crate::schema::validation::validate_schema_for_create;
use crate::schema::variant_utils::schema_contains_variant_type;
use crate::schema::{normalize_column_names_to_schema_casing, DataType, SchemaRef, StructType};
use crate::table_configuration::TableConfiguration;
use crate::table_features::{
assign_column_mapping_metadata, get_any_level_column_physical_name,
get_column_mapping_mode_from_properties, schema_contains_timestamp_ntz, ColumnMappingMode,
EnablementCheck, FeatureType, TableFeature, SET_TABLE_FEATURE_SUPPORTED_PREFIX,
SET_TABLE_FEATURE_SUPPORTED_VALUE,
};
use crate::table_properties::{
TableProperties, APPEND_ONLY, CHECKPOINT_WRITE_STATS_AS_JSON, CHECKPOINT_WRITE_STATS_AS_STRUCT,
COLUMN_MAPPING_MAX_COLUMN_ID, COLUMN_MAPPING_MODE, DELTA_PROPERTY_PREFIX,
ENABLE_CHANGE_DATA_FEED, ENABLE_DELETION_VECTORS, ENABLE_IN_COMMIT_TIMESTAMPS,
ENABLE_ROW_TRACKING, ENABLE_TYPE_WIDENING, MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME,
MATERIALIZED_ROW_ID_COLUMN_NAME, PARQUET_FORMAT_VERSION, SET_TRANSACTION_RETENTION_DURATION,
};
use crate::transaction::create_table::CreateTableTransaction;
use crate::transaction::data_layout::DataLayout;
use crate::transaction::Transaction;
use crate::utils::{current_time_ms, try_parse_uri};
use crate::{DeltaResult, Engine, Error, StorageHandler};
const ALLOWED_DELTA_FEATURES: &[TableFeature] = &[
TableFeature::DomainMetadata,
TableFeature::ColumnMapping,
TableFeature::InCommitTimestamp,
TableFeature::VacuumProtocolCheck,
TableFeature::CatalogManaged,
TableFeature::DeletionVectors,
TableFeature::V2Checkpoint,
TableFeature::AppendOnly,
TableFeature::ChangeDataFeed,
TableFeature::TypeWidening,
TableFeature::RowTracking,
];
const ALLOWED_DELTA_PROPERTIES: &[&str] = &[
COLUMN_MAPPING_MODE,
ENABLE_IN_COMMIT_TIMESTAMPS,
CHECKPOINT_WRITE_STATS_AS_JSON,
CHECKPOINT_WRITE_STATS_AS_STRUCT,
ENABLE_DELETION_VECTORS,
ENABLE_CHANGE_DATA_FEED,
ENABLE_TYPE_WIDENING,
APPEND_ONLY,
ENABLE_ROW_TRACKING,
SET_TRANSACTION_RETENTION_DURATION,
PARQUET_FORMAT_VERSION,
];
fn ensure_table_does_not_exist(
storage: &dyn StorageHandler,
delta_log_url: &Url,
table_path: &str,
) -> DeltaResult<()> {
match storage.list_from(delta_log_url) {
Ok(mut files) => {
match files.next() {
Some(Ok(_)) => Err(Error::generic(format!(
"Table already exists at path: {table_path}"
))),
Some(Err(Error::FileNotFound(_))) | None => {
Ok(())
}
Some(Err(e)) => {
Err(e)
}
}
}
Err(Error::FileNotFound(_)) => {
Ok(())
}
Err(e) => {
Err(e)
}
}
}
struct ValidatedTableProperties {
properties: HashMap<String, String>,
reader_features: Vec<TableFeature>,
writer_features: Vec<TableFeature>,
}
fn add_feature_to_lists(
feature: TableFeature,
reader_features: &mut Vec<TableFeature>,
writer_features: &mut Vec<TableFeature>,
) {
match feature.feature_type() {
FeatureType::ReaderWriter => {
if !reader_features.contains(&feature) {
reader_features.push(feature.clone());
}
if !writer_features.contains(&feature) {
writer_features.push(feature);
}
}
FeatureType::WriterOnly | FeatureType::Unknown => {
if !writer_features.contains(&feature) {
writer_features.push(feature);
}
}
}
}
#[cfg(test)]
fn validate_clustering_and_make_domain_metadata(
logical_schema: &SchemaRef,
logical_columns: &[ColumnName],
reader_features: &mut Vec<TableFeature>,
writer_features: &mut Vec<TableFeature>,
) -> DeltaResult<DomainMetadata> {
validate_clustering_columns(logical_schema, logical_columns)?;
add_feature_to_lists(
TableFeature::DomainMetadata,
reader_features,
writer_features,
);
add_feature_to_lists(
TableFeature::ClusteredTable,
reader_features,
writer_features,
);
Ok(create_clustering_domain_metadata(logical_columns))
}
#[derive(Debug, Default)]
struct DataLayoutResult {
system_domain_metadata: Vec<DomainMetadata>,
clustering_columns: Option<Vec<ColumnName>>,
partition_columns: Option<Vec<ColumnName>>,
}
fn validate_partition_columns(
schema: &StructType,
partition_columns: &[ColumnName],
) -> DeltaResult<()> {
if partition_columns.is_empty() {
return Err(Error::generic("Partitioning requires at least one column"));
}
if partition_columns.len() >= schema.fields().len() {
return Err(Error::generic(
"Table must have at least one non-partition column",
));
}
let mut seen = HashSet::new();
for col in partition_columns {
let path = col.path();
if path.len() != 1 {
return Err(Error::generic(format!(
"Partition column '{}' must be a top-level column (nested paths are not supported)",
col
)));
}
if !seen.insert(col) {
return Err(Error::generic(format!(
"Duplicate partition column: '{col}'"
)));
}
let col_name = &path[0];
let field = schema.field(col_name).ok_or_else(|| {
Error::generic(format!("Partition column '{col}' not found in schema"))
})?;
if !matches!(field.data_type(), DataType::Primitive(_)) {
return Err(Error::generic(format!(
"Partition column '{col}' has non-primitive type '{}'. \
Partition columns must have primitive types.",
field.data_type()
)));
}
}
Ok(())
}
fn apply_data_layout(
data_layout: &DataLayout,
effective_schema: &SchemaRef,
column_mapping_mode: ColumnMappingMode,
validated: &mut ValidatedTableProperties,
) -> DeltaResult<DataLayoutResult> {
match data_layout {
DataLayout::None => Ok(DataLayoutResult::default()),
DataLayout::Clustered { columns } => {
let normalized = normalize_column_names_to_schema_casing(effective_schema, columns);
validate_clustering_columns(effective_schema, &normalized)?;
let physical_columns: Vec<ColumnName> = normalized
.iter()
.map(|c| {
get_any_level_column_physical_name(effective_schema, c, column_mapping_mode)
})
.try_collect()?;
add_feature_to_lists(
TableFeature::DomainMetadata,
&mut validated.reader_features,
&mut validated.writer_features,
);
add_feature_to_lists(
TableFeature::ClusteredTable,
&mut validated.reader_features,
&mut validated.writer_features,
);
let dm = create_clustering_domain_metadata(&physical_columns);
Ok(DataLayoutResult {
system_domain_metadata: vec![dm],
clustering_columns: Some(physical_columns),
partition_columns: None,
})
}
DataLayout::Partitioned { columns } => {
let normalized = normalize_column_names_to_schema_casing(effective_schema, columns);
validate_partition_columns(effective_schema, &normalized)?;
Ok(DataLayoutResult {
system_domain_metadata: vec![],
clustering_columns: None,
partition_columns: Some(normalized),
})
}
}
}
fn maybe_enable_variant_type(schema: &SchemaRef, validated: &mut ValidatedTableProperties) {
if schema_contains_variant_type(schema) {
add_feature_to_lists(
TableFeature::VariantType,
&mut validated.reader_features,
&mut validated.writer_features,
);
}
}
fn maybe_enable_timestamp_ntz(schema: &SchemaRef, validated: &mut ValidatedTableProperties) {
if schema_contains_timestamp_ntz(schema) {
add_feature_to_lists(
TableFeature::TimestampWithoutTimezone,
&mut validated.reader_features,
&mut validated.writer_features,
);
}
}
fn maybe_auto_enable_property_driven_features(validated: &mut ValidatedTableProperties) {
let table_properties = TableProperties::from(validated.properties.iter());
for feature in ALLOWED_DELTA_FEATURES {
if let EnablementCheck::EnabledIf(check) = feature.info().enablement_check {
if check(&table_properties) {
add_feature_to_lists(
feature.clone(),
&mut validated.reader_features,
&mut validated.writer_features,
);
if *feature == TableFeature::RowTracking {
add_feature_to_lists(
TableFeature::DomainMetadata,
&mut validated.reader_features,
&mut validated.writer_features,
);
}
}
}
}
}
fn maybe_set_materialized_row_tracking_column_name_properties(
validated: &mut ValidatedTableProperties,
) {
if validated
.properties
.get(ENABLE_ROW_TRACKING)
.is_none_or(|v| v != "true")
{
return;
}
validated.properties.insert(
MATERIALIZED_ROW_ID_COLUMN_NAME.to_string(),
format!("_row-id-col-{}", Uuid::new_v4()),
);
validated.properties.insert(
MATERIALIZED_ROW_COMMIT_VERSION_COLUMN_NAME.to_string(),
format!("_row-commit-version-col-{}", Uuid::new_v4()),
);
}
fn maybe_enable_ict_for_catalog_managed(
validated: &mut ValidatedTableProperties,
) -> DeltaResult<()> {
let has_catalog_managed = validated
.writer_features
.contains(&TableFeature::CatalogManaged);
if has_catalog_managed {
if validated
.properties
.get(ENABLE_IN_COMMIT_TIMESTAMPS)
.is_some_and(|v| v != "true")
{
return Err(Error::generic(format!(
"Catalog-managed tables require '{ENABLE_IN_COMMIT_TIMESTAMPS}=true', \
but it was explicitly set to '{}'",
validated.properties[ENABLE_IN_COMMIT_TIMESTAMPS]
)));
}
add_feature_to_lists(
TableFeature::InCommitTimestamp,
&mut validated.reader_features,
&mut validated.writer_features,
);
validated
.properties
.entry(ENABLE_IN_COMMIT_TIMESTAMPS.to_string())
.or_insert_with(|| "true".to_string());
}
Ok(())
}
fn maybe_apply_column_mapping_for_table_create(
schema: &SchemaRef,
validated: &mut ValidatedTableProperties,
) -> DeltaResult<(SchemaRef, ColumnMappingMode)> {
let column_mapping_mode = get_column_mapping_mode_from_properties(&validated.properties)?;
let effective_schema = match column_mapping_mode {
ColumnMappingMode::Name | ColumnMappingMode::Id => {
add_feature_to_lists(
TableFeature::ColumnMapping,
&mut validated.reader_features,
&mut validated.writer_features,
);
let mut max_id = 0i64;
let transformed_schema = assign_column_mapping_metadata(schema, &mut max_id)?;
validated
.properties
.insert(COLUMN_MAPPING_MAX_COLUMN_ID.to_string(), max_id.to_string());
Arc::new(transformed_schema)
}
ColumnMappingMode::None => schema.clone(),
};
Ok((effective_schema, column_mapping_mode))
}
fn validate_extract_table_features_and_properties(
properties: HashMap<String, String>,
) -> DeltaResult<ValidatedTableProperties> {
let mut reader_features = Vec::new();
let mut writer_features = Vec::new();
let (feature_signals, properties): (HashMap<_, _>, HashMap<_, _>) = properties
.into_iter()
.partition(|(k, _)| k.starts_with(SET_TABLE_FEATURE_SUPPORTED_PREFIX));
for (key, value) in &feature_signals {
let Some(feature_name) = key.strip_prefix(SET_TABLE_FEATURE_SUPPORTED_PREFIX) else {
continue;
};
if value != SET_TABLE_FEATURE_SUPPORTED_VALUE {
return Err(Error::generic(format!(
"Invalid value '{value}' for '{key}'. Only '{SET_TABLE_FEATURE_SUPPORTED_VALUE}' is allowed."
)));
}
let feature: TableFeature = feature_name
.parse()
.unwrap_or_else(|_| TableFeature::Unknown(feature_name.to_string()));
if !ALLOWED_DELTA_FEATURES.contains(&feature) {
return Err(Error::generic(format!(
"Enabling feature '{feature_name}' via '{key}' is not supported during CREATE TABLE"
)));
}
let needs_domain_metadata = feature == TableFeature::RowTracking;
add_feature_to_lists(feature, &mut reader_features, &mut writer_features);
if needs_domain_metadata {
add_feature_to_lists(
TableFeature::DomainMetadata,
&mut reader_features,
&mut writer_features,
);
}
}
for key in properties.keys() {
if key.starts_with(DELTA_PROPERTY_PREFIX)
&& !ALLOWED_DELTA_PROPERTIES.contains(&key.as_str())
{
return Err(Error::generic(format!(
"Setting delta property '{key}' is not supported during CREATE TABLE"
)));
}
}
Ok(ValidatedTableProperties {
properties,
reader_features,
writer_features,
})
}
pub struct CreateTableTransactionBuilder {
path: String,
schema: SchemaRef,
engine_info: String,
table_properties: HashMap<String, String>,
data_layout: DataLayout,
}
impl CreateTableTransactionBuilder {
pub fn new(path: impl AsRef<str>, schema: SchemaRef, engine_info: impl Into<String>) -> Self {
Self {
path: path.as_ref().to_string(),
schema,
engine_info: engine_info.into(),
table_properties: HashMap::new(),
data_layout: DataLayout::None,
}
}
pub fn with_table_properties<I, K, V>(mut self, properties: I) -> Self
where
I: IntoIterator<Item = (K, V)>,
K: Into<String>,
V: Into<String>,
{
self.table_properties
.extend(properties.into_iter().map(|(k, v)| (k.into(), v.into())));
self
}
pub fn with_data_layout(mut self, layout: DataLayout) -> Self {
self.data_layout = layout;
self
}
pub fn build(
self,
engine: &dyn Engine,
committer: Box<dyn Committer>,
) -> DeltaResult<CreateTableTransaction> {
let table_url = try_parse_uri(&self.path)?;
let delta_log_url = table_url.join("_delta_log/")?;
let storage = engine.storage_handler();
ensure_table_does_not_exist(storage.as_ref(), &delta_log_url, &self.path)?;
let mut validated = validate_extract_table_features_and_properties(self.table_properties)?;
let (effective_schema, column_mapping_mode) =
maybe_apply_column_mapping_for_table_create(&self.schema, &mut validated)?;
validate_schema_for_create(
&effective_schema,
column_mapping_mode,
validated
.writer_features
.contains(&TableFeature::Invariants),
)?;
let data_layout_result = apply_data_layout(
&self.data_layout,
&effective_schema,
column_mapping_mode,
&mut validated,
)?;
maybe_enable_variant_type(&effective_schema, &mut validated);
maybe_enable_timestamp_ntz(&effective_schema, &mut validated);
maybe_auto_enable_property_driven_features(&mut validated);
maybe_enable_ict_for_catalog_managed(&mut validated)?;
maybe_set_materialized_row_tracking_column_name_properties(&mut validated);
let protocol =
Protocol::try_new_modern(validated.reader_features, validated.writer_features)?;
let partition_columns: Vec<String> = data_layout_result
.partition_columns
.map(|cols| cols.into_iter().map(|c| c.into_inner().remove(0)).collect())
.unwrap_or_default();
let metadata = Metadata::try_new(
None, None, effective_schema.clone(),
partition_columns,
current_time_ms()?,
validated.properties,
)?;
let table_configuration = TableConfiguration::try_new(metadata, protocol, table_url, 0)?;
Transaction::try_new_create_table(
table_configuration,
self.engine_info,
committer,
data_layout_result.system_domain_metadata,
data_layout_result.clustering_columns,
)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use crate::expressions::ColumnName;
use crate::schema::{DataType, StructField, StructType};
use crate::table_features::FeatureType;
use crate::table_properties::{ENABLE_ICEBERG_COMPAT_V1, PARQUET_FORMAT_VERSION};
use crate::utils::test_utils::assert_result_error_with_message;
fn test_schema() -> SchemaRef {
Arc::new(StructType::new_unchecked(vec![StructField::new(
"id",
DataType::INTEGER,
false,
)]))
}
#[test]
fn test_basic_builder_creation() {
let schema = test_schema();
let builder =
CreateTableTransactionBuilder::new("/path/to/table", schema.clone(), "TestApp/1.0");
assert_eq!(builder.path, "/path/to/table");
assert_eq!(builder.engine_info, "TestApp/1.0");
assert!(builder.table_properties.is_empty());
}
#[test]
fn test_nested_path_builder_creation() {
let schema = test_schema();
let builder = CreateTableTransactionBuilder::new(
"/path/to/table/nested",
schema.clone(),
"TestApp/1.0",
);
assert_eq!(builder.path, "/path/to/table/nested");
}
#[test]
fn test_with_table_properties() {
let schema = test_schema();
let builder = CreateTableTransactionBuilder::new("/path/to/table", schema, "TestApp/1.0")
.with_table_properties([("key1", "value1")]);
assert_eq!(
builder.table_properties.get("key1"),
Some(&"value1".to_string())
);
}
#[test]
fn test_with_multiple_table_properties() {
let schema = test_schema();
let builder = CreateTableTransactionBuilder::new("/path/to/table", schema, "TestApp/1.0")
.with_table_properties([("key1", "value1")])
.with_table_properties([("key2", "value2")]);
assert_eq!(
builder.table_properties.get("key1"),
Some(&"value1".to_string())
);
assert_eq!(
builder.table_properties.get("key2"),
Some(&"value2".to_string())
);
}
#[test]
fn test_validate_supported_properties() {
let properties = HashMap::new();
let result = validate_extract_table_features_and_properties(properties);
assert!(result.is_ok());
let validated = result.unwrap();
assert!(validated.properties.is_empty());
assert!(validated.reader_features.is_empty());
assert!(validated.writer_features.is_empty());
let mut properties = HashMap::new();
properties.insert("myapp.version".to_string(), "1.0".to_string());
properties.insert("custom.setting".to_string(), "value".to_string());
let result = validate_extract_table_features_and_properties(properties);
assert!(result.is_ok());
let validated = result.unwrap();
assert_eq!(validated.properties.len(), 2);
assert_eq!(
validated.properties.get("myapp.version"),
Some(&"1.0".to_string())
);
assert_eq!(
validated.properties.get("custom.setting"),
Some(&"value".to_string())
);
}
#[test]
fn test_parquet_format_version_accepted() {
let properties =
HashMap::from([(PARQUET_FORMAT_VERSION.to_string(), "2.12.0".to_string())]);
let validated = validate_extract_table_features_and_properties(properties).unwrap();
assert_eq!(
validated.properties.get(PARQUET_FORMAT_VERSION),
Some(&"2.12.0".to_string()),
);
assert!(validated.reader_features.is_empty());
assert!(validated.writer_features.is_empty());
}
#[test]
fn test_validate_unsupported_properties() {
let mut properties = HashMap::new();
properties.insert(ENABLE_ICEBERG_COMPAT_V1.to_string(), "true".to_string());
assert_result_error_with_message(
validate_extract_table_features_and_properties(properties),
"Setting delta property 'delta.enableIcebergCompatV1' is not supported",
);
let properties = HashMap::from([(
"delta.feature.identityColumns".to_string(),
"supported".to_string(),
)]);
assert_result_error_with_message(
validate_extract_table_features_and_properties(properties),
"Enabling feature 'identityColumns' via 'delta.feature.identityColumns' is not supported",
);
let properties = HashMap::from([(
"delta.feature.clustering".to_string(),
"supported".to_string(),
)]);
assert_result_error_with_message(
validate_extract_table_features_and_properties(properties),
"Enabling feature 'clustering' via 'delta.feature.clustering' is not supported",
);
let mut properties = HashMap::new();
properties.insert("myapp.version".to_string(), "1.0".to_string());
properties.insert(ENABLE_ICEBERG_COMPAT_V1.to_string(), "true".to_string());
assert_result_error_with_message(
validate_extract_table_features_and_properties(properties),
"Setting delta property 'delta.enableIcebergCompatV1' is not supported",
);
}
#[test]
fn test_clustering_support_valid() {
use crate::clustering::CLUSTERING_DOMAIN_NAME;
use crate::expressions::ColumnName;
let schema = Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("name", DataType::STRING, true),
]));
let mut reader_features = vec![];
let mut writer_features = vec![];
let dm = validate_clustering_and_make_domain_metadata(
&schema,
&[ColumnName::new(["id"])],
&mut reader_features,
&mut writer_features,
)
.unwrap();
assert_eq!(dm.domain(), CLUSTERING_DOMAIN_NAME);
assert!(writer_features.contains(&TableFeature::DomainMetadata));
assert!(writer_features.contains(&TableFeature::ClusteredTable));
assert!(reader_features.is_empty());
}
#[test]
fn test_clustering_support_multiple_columns() {
use crate::expressions::ColumnName;
let schema = Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("date", DataType::STRING, true),
StructField::new("region", DataType::STRING, true),
]));
let mut reader_features = vec![];
let mut writer_features = vec![];
let dm = validate_clustering_and_make_domain_metadata(
&schema,
&[ColumnName::new(["id"]), ColumnName::new(["date"])],
&mut reader_features,
&mut writer_features,
)
.unwrap();
let config: serde_json::Value = serde_json::from_str(dm.configuration()).unwrap();
let clustering_cols = config["clusteringColumns"].as_array().unwrap();
assert_eq!(clustering_cols.len(), 2);
assert_eq!(clustering_cols[0], serde_json::json!(["id"]));
assert_eq!(clustering_cols[1], serde_json::json!(["date"]));
}
#[test]
fn test_clustering_column_not_in_schema() {
use crate::expressions::ColumnName;
let schema = Arc::new(StructType::new_unchecked(vec![StructField::new(
"id",
DataType::INTEGER,
false,
)]));
let mut reader_features = vec![];
let mut writer_features = vec![];
let result = validate_clustering_and_make_domain_metadata(
&schema,
&[ColumnName::new(["nonexistent"])],
&mut reader_features,
&mut writer_features,
);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in schema"));
}
#[test]
fn test_clustering_nested_column_accepted() {
use crate::clustering::CLUSTERING_DOMAIN_NAME;
use crate::expressions::ColumnName;
let address_struct = StructType::new_unchecked(vec![
StructField::new("city", DataType::STRING, true),
StructField::new("zip", DataType::STRING, true),
]);
let schema = Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("address", DataType::Struct(Box::new(address_struct)), true),
]));
let mut reader_features = vec![];
let mut writer_features = vec![];
let nested_col = ColumnName::new(["address", "city"]);
let dm = validate_clustering_and_make_domain_metadata(
&schema,
&[nested_col],
&mut reader_features,
&mut writer_features,
)
.unwrap();
assert_eq!(dm.domain(), CLUSTERING_DOMAIN_NAME);
assert!(writer_features.contains(&TableFeature::ClusteredTable));
}
#[rstest::rstest]
#[case::clustered(DataLayout::clustered(["id"]), true, false)]
#[case::partitioned(DataLayout::partitioned(["id"]), false, true)]
#[case::none(DataLayout::default(), false, false)]
fn test_with_data_layout(
#[case] layout: DataLayout,
#[case] expect_clustered: bool,
#[case] expect_partitioned: bool,
) {
let schema = test_schema();
let builder = CreateTableTransactionBuilder::new("/path/to/table", schema, "TestApp/1.0")
.with_data_layout(layout);
assert_eq!(builder.data_layout.is_clustered(), expect_clustered);
assert_eq!(builder.data_layout.is_partitioned(), expect_partitioned);
}
#[rstest::rstest]
#[case::variant_top_level(
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("v", DataType::unshredded_variant(), true),
])),
&[TableFeature::VariantType],
)]
#[case::variant_nested(
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new(
"nested",
DataType::Struct(Box::new(StructType::new_unchecked(vec![
StructField::new("inner_v", DataType::unshredded_variant(), true),
]))),
true,
),
])),
&[TableFeature::VariantType],
)]
#[case::ntz_top_level(
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("ts", DataType::TIMESTAMP_NTZ, true),
])),
&[TableFeature::TimestampWithoutTimezone],
)]
#[case::ntz_nested(
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new(
"nested",
DataType::Struct(Box::new(StructType::new_unchecked(vec![
StructField::new("inner_ts", DataType::TIMESTAMP_NTZ, true),
]))),
true,
),
])),
&[TableFeature::TimestampWithoutTimezone],
)]
#[case::both_variant_and_ntz(
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("v", DataType::unshredded_variant(), true),
StructField::new("ts", DataType::TIMESTAMP_NTZ, true),
])),
&[TableFeature::VariantType, TableFeature::TimestampWithoutTimezone],
)]
#[case::no_special_types(
test_schema(),
&[],
)]
fn test_schema_driven_feature_auto_enablement(
#[case] schema: SchemaRef,
#[case] expected_features: &[TableFeature],
) {
let mut validated = ValidatedTableProperties {
properties: HashMap::new(),
reader_features: vec![],
writer_features: vec![],
};
maybe_enable_variant_type(&schema, &mut validated);
maybe_enable_timestamp_ntz(&schema, &mut validated);
for feature in expected_features {
assert!(
validated.reader_features.contains(feature),
"Expected {feature:?} in reader_features"
);
assert!(
validated.writer_features.contains(feature),
"Expected {feature:?} in writer_features"
);
}
assert_eq!(
validated.reader_features.len(),
expected_features.len(),
"Unexpected extra reader features: {:?}",
validated.reader_features
);
assert_eq!(
validated.writer_features.len(),
expected_features.len(),
"Unexpected extra writer features: {:?}",
validated.writer_features
);
}
#[rstest::rstest]
#[case::property_true(&[("delta.enableInCommitTimestamps", "true")], true, true)]
#[case::property_false(&[("delta.enableInCommitTimestamps", "false")], false, true)]
#[case::property_absent(&[], false, false)]
#[case::feature_signal(&[("delta.feature.inCommitTimestamp", "supported")], true, false)]
fn test_ict_support_and_enablement(
#[case] properties: &[(&str, &str)],
#[case] expect_in_writer_features: bool,
#[case] expect_property_preserved: bool,
) {
let properties: HashMap<String, String> = properties
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
let mut validated = validate_extract_table_features_and_properties(properties).unwrap();
maybe_auto_enable_property_driven_features(&mut validated);
assert_eq!(
validated
.writer_features
.contains(&TableFeature::InCommitTimestamp),
expect_in_writer_features,
);
assert_eq!(
validated
.properties
.contains_key(ENABLE_IN_COMMIT_TIMESTAMPS),
expect_property_preserved,
);
assert!(
validated.reader_features.is_empty(),
"InCommitTimestamp is writer-only, reader_features should always be empty"
);
}
#[rstest::rstest]
#[case::vacuum_protocol_check(TableFeature::VacuumProtocolCheck, "vacuumProtocolCheck")]
#[case::domain_metadata(TableFeature::DomainMetadata, "domainMetadata")]
#[case::column_mapping(TableFeature::ColumnMapping, "columnMapping")]
#[case::in_commit_timestamp(TableFeature::InCommitTimestamp, "inCommitTimestamp")]
#[case::deletion_vectors(TableFeature::DeletionVectors, "deletionVectors")]
#[case::v2_checkpoint(TableFeature::V2Checkpoint, "v2Checkpoint")]
#[case::append_only(TableFeature::AppendOnly, "appendOnly")]
#[case::change_data_feed(TableFeature::ChangeDataFeed, "changeDataFeed")]
#[case::type_widening(TableFeature::TypeWidening, "typeWidening")]
#[case::catalog_managed(TableFeature::CatalogManaged, "catalogManaged")]
fn test_feature_signal_accepted(#[case] feature: TableFeature, #[case] feature_name: &str) {
let key = format!("delta.feature.{feature_name}");
let properties = HashMap::from([(key, "supported".to_string())]);
let validated = validate_extract_table_features_and_properties(properties).unwrap();
assert!(
validated.properties.is_empty(),
"Feature signal should be removed from properties"
);
assert!(
validated.writer_features.contains(&feature),
"{feature:?} should be in writer_features"
);
match feature.feature_type() {
FeatureType::ReaderWriter => assert!(
validated.reader_features.contains(&feature),
"{feature:?} is ReaderWriter but missing from reader_features"
),
_ => assert!(
validated.reader_features.is_empty(),
"{feature:?} is WriterOnly but reader_features is not empty"
),
}
}
fn multi_column_schema() -> SchemaRef {
Arc::new(StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
]))
}
struct DataLayoutExpectation {
layout: DataLayout,
has_domain_metadata: bool,
has_clustering_columns: bool,
expected_partition_columns: Option<Vec<ColumnName>>,
expected_writer_features: Vec<TableFeature>,
}
#[rstest::rstest]
#[case::none(DataLayoutExpectation {
layout: DataLayout::default(),
has_domain_metadata: false,
has_clustering_columns: false,
expected_partition_columns: None,
expected_writer_features: vec![],
})]
#[case::clustered(DataLayoutExpectation {
layout: DataLayout::clustered(["id"]),
has_domain_metadata: true,
has_clustering_columns: true,
expected_partition_columns: None,
expected_writer_features: vec![TableFeature::DomainMetadata, TableFeature::ClusteredTable],
})]
#[case::partitioned_single(DataLayoutExpectation {
layout: DataLayout::partitioned(["date"]),
has_domain_metadata: false,
has_clustering_columns: false,
expected_partition_columns: Some(vec![ColumnName::new(["date"])]),
expected_writer_features: vec![],
})]
#[case::partitioned_multiple(DataLayoutExpectation {
layout: DataLayout::partitioned(["id", "date"]),
has_domain_metadata: false,
has_clustering_columns: false,
expected_partition_columns: Some(vec![ColumnName::new(["id"]), ColumnName::new(["date"])]),
expected_writer_features: vec![],
})]
fn test_apply_data_layout(#[case] expectation: DataLayoutExpectation) {
let schema = multi_column_schema();
let mut validated = ValidatedTableProperties {
properties: HashMap::new(),
reader_features: vec![],
writer_features: vec![],
};
let result = apply_data_layout(
&expectation.layout,
&schema,
ColumnMappingMode::None,
&mut validated,
)
.unwrap();
assert_eq!(
!result.system_domain_metadata.is_empty(),
expectation.has_domain_metadata
);
assert_eq!(
result.clustering_columns.is_some(),
expectation.has_clustering_columns
);
assert_eq!(
result.partition_columns,
expectation.expected_partition_columns
);
for feature in &expectation.expected_writer_features {
assert!(
validated.writer_features.contains(feature),
"Expected {feature:?} in writer_features"
);
}
}
#[rstest::rstest]
#[case::clustered_invalid_col(DataLayout::clustered(["nonexistent"]), "not found in schema")]
#[case::partitioned_invalid_col(DataLayout::partitioned(["nonexistent"]), "not found in schema")]
#[case::partitioned_duplicate(DataLayout::partitioned(["id", "id"]), "Duplicate partition column")]
#[case::partitioned_empty(DataLayout::Partitioned { columns: vec![] }, "at least one column")]
#[case::partitioned_all_columns(DataLayout::partitioned(["id", "name", "date"]), "at least one non-partition column")]
fn test_apply_data_layout_validation_errors(
#[case] layout: DataLayout,
#[case] expected_error: &str,
) {
let schema = multi_column_schema();
let mut validated = ValidatedTableProperties {
properties: HashMap::new(),
reader_features: vec![],
writer_features: vec![],
};
let result = apply_data_layout(&layout, &schema, ColumnMappingMode::None, &mut validated);
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains(expected_error),
"Expected error containing '{expected_error}'"
);
}
#[test]
fn test_validate_partition_columns_nested_rejected() {
let address_struct =
StructType::new_unchecked(vec![StructField::new("city", DataType::STRING, true)]);
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("address", DataType::Struct(Box::new(address_struct)), true),
]);
let columns = vec![ColumnName::new(["address", "city"])];
let result = validate_partition_columns(&schema, &columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("must be a top-level column"));
}
#[rstest::rstest]
#[case::struct_type(
"struct_col",
DataType::Struct(Box::new(StructType::new_unchecked(vec![
StructField::new("inner", DataType::STRING, false),
]))),
)]
#[case::array_type(
"array_col",
DataType::Array(Box::new(crate::schema::ArrayType::new(DataType::INTEGER, false)))
)]
#[case::map_type(
"map_col",
DataType::Map(Box::new(crate::schema::MapType::new(
DataType::STRING,
DataType::INTEGER,
false
)))
)]
fn test_validate_partition_columns_complex_types_rejected(
#[case] col_name: &str,
#[case] data_type: DataType,
) {
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new(col_name, data_type, false),
]);
let columns = vec![ColumnName::new([col_name])];
let result = validate_partition_columns(&schema, &columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("non-primitive type"));
}
#[rstest::rstest]
#[case::integer(DataType::INTEGER)]
#[case::string(DataType::STRING)]
#[case::date(DataType::DATE)]
#[case::timestamp(DataType::TIMESTAMP)]
#[case::boolean(DataType::BOOLEAN)]
#[case::long(DataType::LONG)]
fn test_validate_partition_columns_primitive_types_accepted(#[case] data_type: DataType) {
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("col", data_type, false),
]);
let columns = vec![ColumnName::new(["col"])];
assert!(validate_partition_columns(&schema, &columns).is_ok());
}
#[test]
fn test_catalog_managed_auto_enables_ict() {
let properties = HashMap::from([(
"delta.feature.catalogManaged".to_string(),
"supported".to_string(),
)]);
let mut validated = validate_extract_table_features_and_properties(properties).unwrap();
maybe_auto_enable_property_driven_features(&mut validated);
maybe_enable_ict_for_catalog_managed(&mut validated).unwrap();
assert!(
validated
.writer_features
.contains(&TableFeature::InCommitTimestamp),
"ICT should be auto-added to writer_features"
);
assert_eq!(
validated.properties.get(ENABLE_IN_COMMIT_TIMESTAMPS),
Some(&"true".to_string()),
"delta.enableInCommitTimestamps should be set to true"
);
}
#[test]
fn test_catalog_managed_with_ict_true_succeeds() {
let properties = HashMap::from([
(
"delta.feature.catalogManaged".to_string(),
"supported".to_string(),
),
(
"delta.enableInCommitTimestamps".to_string(),
"true".to_string(),
),
]);
let mut validated = validate_extract_table_features_and_properties(properties).unwrap();
maybe_auto_enable_property_driven_features(&mut validated);
maybe_enable_ict_for_catalog_managed(&mut validated).unwrap();
assert!(validated
.writer_features
.contains(&TableFeature::InCommitTimestamp));
assert_eq!(
validated.properties.get(ENABLE_IN_COMMIT_TIMESTAMPS),
Some(&"true".to_string()),
);
}
#[rstest::rstest]
#[case::enablement_property(
HashMap::from([(ENABLE_ROW_TRACKING.to_string(), "true".to_string())]),
true, // enablement property is set
)]
#[case::feature_signal(
HashMap::from([("delta.feature.rowTracking".to_string(), "supported".to_string())]),
false, // enablement property is NOT set
)]
fn test_row_tracking_activation_adds_required_features(
#[case] properties: HashMap<String, String>,
#[case] expect_enablement_property: bool,
) {
let mut validated = validate_extract_table_features_and_properties(properties).unwrap();
maybe_auto_enable_property_driven_features(&mut validated);
assert!(
validated
.writer_features
.contains(&TableFeature::RowTracking),
"Expected RowTracking in writer_features"
);
assert!(
validated
.writer_features
.contains(&TableFeature::DomainMetadata),
"Expected DomainMetadata in writer_features"
);
assert_eq!(
validated.properties.contains_key(ENABLE_ROW_TRACKING),
expect_enablement_property,
"delta.enableRowTracking presence mismatch"
);
}
#[test]
fn test_catalog_managed_with_ict_false_fails() {
let properties = HashMap::from([
(
"delta.feature.catalogManaged".to_string(),
"supported".to_string(),
),
(
"delta.enableInCommitTimestamps".to_string(),
"false".to_string(),
),
]);
let mut validated = validate_extract_table_features_and_properties(properties).unwrap();
maybe_auto_enable_property_driven_features(&mut validated);
let err = maybe_enable_ict_for_catalog_managed(&mut validated).unwrap_err();
assert!(
err.to_string().contains("enableInCommitTimestamps"),
"expected ICT conflict error, got: {err}"
);
}
}