use std::borrow::Cow;
use std::collections::HashSet;
use std::sync::{Arc, OnceLock};
use url::Url;
use crate::actions::{Metadata, Protocol};
use crate::expressions::ColumnName;
use crate::scan::data_skipping::stats_schema::{
expected_stats_schema, stats_column_names, StatsConfig, StripFieldMetadataTransform,
};
pub(crate) use crate::schema::variant_utils::validate_variant_type_feature_support;
use crate::schema::{schema_has_invariants, SchemaRef, StructField, StructType};
use crate::table_features::{
column_mapping_mode, get_any_level_column_physical_name,
validate_timestamp_ntz_feature_support, ColumnMappingMode, EnablementCheck, FeatureRequirement,
FeatureType, KernelSupport, Operation, TableFeature, LEGACY_READER_FEATURES,
LEGACY_WRITER_FEATURES, MAX_VALID_READER_VERSION, MAX_VALID_WRITER_VERSION,
MIN_VALID_RW_VERSION, TABLE_FEATURES_MIN_READER_VERSION, TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::TableProperties;
use crate::transforms::SchemaTransform as _;
use crate::utils::require;
use crate::{DeltaResult, Error, Version};
use delta_kernel_derive::internal_api;
use tracing::warn;
#[allow(unused)]
#[derive(Debug, Clone)]
#[internal_api]
pub(crate) struct ExpectedStatsSchemas {
pub physical: SchemaRef,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) enum InCommitTimestampEnablement {
NotEnabled,
Enabled {
enablement: Option<(Version, i64)>,
},
}
fn strip_metadata(schema: SchemaRef) -> SchemaRef {
match StripFieldMetadataTransform.transform_struct(&schema) {
Some(Cow::Owned(s)) => Arc::new(s),
_ => schema,
}
}
#[derive(Debug, Clone, Eq)]
struct PhysicalSchemas {
full: SchemaRef,
without_partition: OnceLock<SchemaRef>,
}
impl PhysicalSchemas {
fn new(full: SchemaRef) -> Self {
Self {
full,
without_partition: OnceLock::new(),
}
}
}
impl PartialEq for PhysicalSchemas {
fn eq(&self, other: &Self) -> bool {
self.full == other.full
}
}
#[internal_api]
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct TableConfiguration {
metadata: Metadata,
protocol: Protocol,
logical_schema: SchemaRef,
physical_schemas: PhysicalSchemas,
table_properties: TableProperties,
column_mapping_mode: ColumnMappingMode,
table_root: Url,
version: Version,
}
impl TableConfiguration {
#[internal_api]
pub(crate) fn try_new(
metadata: Metadata,
protocol: Protocol,
table_root: Url,
version: Version,
) -> DeltaResult<Self> {
let logical_schema = Arc::new(metadata.parse_schema()?);
let table_properties = metadata.parse_table_properties();
let column_mapping_mode = column_mapping_mode(&protocol, &table_properties);
let physical_schema = Arc::new(logical_schema.make_physical(column_mapping_mode)?);
let physical_schemas = PhysicalSchemas::new(physical_schema);
let table_config = Self {
logical_schema,
physical_schemas,
metadata,
protocol,
table_properties,
column_mapping_mode,
table_root,
version,
};
validate_timestamp_ntz_feature_support(&table_config)?;
validate_variant_type_feature_support(&table_config)?;
Ok(table_config)
}
pub(crate) fn try_new_from(
table_configuration: &Self,
new_metadata: Option<Metadata>,
new_protocol: Option<Protocol>,
new_version: Version,
) -> DeltaResult<Self> {
if new_metadata.is_none() && new_protocol.is_none() {
return Ok(Self {
version: new_version,
..table_configuration.clone()
});
}
Self::try_new(
new_metadata.unwrap_or_else(|| table_configuration.metadata.clone()),
new_protocol.unwrap_or_else(|| table_configuration.protocol.clone()),
table_configuration.table_root.clone(),
new_version,
)
}
pub(crate) fn new_post_commit(table_configuration: &Self, new_version: Version) -> Self {
Self {
version: new_version,
..table_configuration.clone()
}
}
#[allow(unused)]
#[internal_api]
pub(crate) fn build_expected_stats_schemas(
&self,
required_physical_columns: Option<&[ColumnName]>,
requested_physical_columns: Option<&[ColumnName]>,
) -> DeltaResult<ExpectedStatsSchemas> {
let physical_data_schema = self.physical_data_schema_without_partition_columns();
let required_physical_stats_columns = self.required_physical_stats_columns();
let config = StatsConfig {
data_skipping_stats_columns: required_physical_stats_columns.as_deref(),
data_skipping_num_indexed_cols: self.table_properties().data_skipping_num_indexed_cols,
};
let physical_stats_schema = Arc::new(expected_stats_schema(
&physical_data_schema,
&config,
required_physical_columns,
requested_physical_columns,
)?);
let physical_stats_schema = strip_metadata(physical_stats_schema);
Ok(ExpectedStatsSchemas {
physical: physical_stats_schema,
})
}
pub(crate) fn physical_stats_column_names(
&self,
required_columns: Option<&[ColumnName]>,
) -> Vec<ColumnName> {
let physical_stats_columns = self.required_physical_stats_columns();
let config = StatsConfig {
data_skipping_stats_columns: physical_stats_columns.as_deref(),
data_skipping_num_indexed_cols: self.table_properties().data_skipping_num_indexed_cols,
};
stats_column_names(
&self.physical_data_schema_without_partition_columns(),
&config,
required_columns,
)
}
pub(crate) fn build_partition_values_parsed_schema(&self) -> Option<SchemaRef> {
let partition_columns = self.metadata().partition_columns();
if partition_columns.is_empty() {
return None;
}
let logical_schema = self.logical_schema();
let column_mapping_mode = self.column_mapping_mode();
let partition_fields: Vec<StructField> = partition_columns
.iter()
.filter_map(|col_name| {
let field = logical_schema.field(col_name);
if field.is_none() {
warn!("Partition column '{col_name}' not found in table schema");
}
field
})
.map(|field: &StructField| {
StructField::new(
field.physical_name(column_mapping_mode).to_owned(),
field.data_type().clone(),
field.is_nullable(),
)
})
.collect();
Some(Arc::new(StructType::new_unchecked(partition_fields)))
}
fn logical_data_schema(&self) -> SchemaRef {
let partition_columns = self.partition_columns();
Arc::new(StructType::new_unchecked(
self.logical_schema()
.fields()
.filter(|field| !partition_columns.contains(field.name()))
.cloned(),
))
}
pub(crate) fn physical_data_schema_without_partition_columns(&self) -> SchemaRef {
self.physical_schemas
.without_partition
.get_or_init(|| {
let partition_columns: HashSet<&str> = self
.partition_columns()
.iter()
.map(|s| s.as_str())
.collect();
Arc::new(StructType::new_unchecked(
self.logical_schema()
.fields()
.zip(self.physical_schemas.full.fields())
.filter(|(logical_field, _)| {
!partition_columns.contains(logical_field.name().as_str())
})
.map(|(_, physical_field)| physical_field.clone()),
))
})
.clone()
}
fn required_physical_stats_columns(&self) -> Option<Vec<ColumnName>> {
self.table_properties()
.data_skipping_stats_columns
.as_ref()
.map(|cols| {
let logical_schema = self.logical_data_schema();
let mode = self.column_mapping_mode();
cols.iter()
.filter_map(|col| {
get_any_level_column_physical_name(&logical_schema, col, mode)
.inspect_err(|e| {
warn!(
"Couldn't translate dataSkippingStatsColumns entry '{col}' \
to physical name: {e}; skipping"
);
})
.ok()
})
.collect()
})
}
#[internal_api]
pub(crate) fn metadata(&self) -> &Metadata {
&self.metadata
}
#[allow(unused)]
#[internal_api]
pub(crate) fn protocol(&self) -> &Protocol {
&self.protocol
}
#[internal_api]
pub(crate) fn logical_schema(&self) -> SchemaRef {
self.logical_schema.clone()
}
#[internal_api]
pub(crate) fn physical_schema(&self) -> SchemaRef {
self.physical_schemas.full.clone()
}
pub(crate) fn physical_write_schema(&self) -> SchemaRef {
if self.is_feature_enabled(&TableFeature::MaterializePartitionColumns) {
self.physical_schema()
} else {
self.physical_data_schema_without_partition_columns()
}
}
#[internal_api]
pub(crate) fn table_properties(&self) -> &TableProperties {
&self.table_properties
}
#[internal_api]
pub(crate) fn is_catalog_managed(&self) -> bool {
self.is_feature_supported(&TableFeature::CatalogManaged)
|| self.is_feature_supported(&TableFeature::CatalogOwnedPreview)
}
#[internal_api]
pub(crate) fn column_mapping_mode(&self) -> ColumnMappingMode {
self.column_mapping_mode
}
#[internal_api]
pub(crate) fn partition_columns(&self) -> &[String] {
self.metadata().partition_columns()
}
#[internal_api]
pub(crate) fn table_root(&self) -> &Url {
&self.table_root
}
#[internal_api]
pub(crate) fn version(&self) -> Version {
self.version
}
fn validate_feature_requirements(&self, feature: &TableFeature) -> DeltaResult<()> {
for req in feature.info().feature_requirements {
match req {
FeatureRequirement::Supported(dep) => {
require!(
self.is_feature_supported(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to be supported"
))
);
}
FeatureRequirement::Enabled(dep) => {
require!(
self.is_feature_enabled(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to be enabled"
))
);
}
FeatureRequirement::NotSupported(dep) => {
require!(
!self.is_feature_supported(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to not be supported"
))
);
}
FeatureRequirement::NotEnabled(dep) => {
require!(
!self.is_feature_enabled(dep),
Error::invalid_protocol(format!(
"Feature '{feature}' requires '{dep}' to not be enabled"
))
);
}
FeatureRequirement::Custom(check) => {
check(&self.protocol, &self.table_properties)?;
}
}
}
Ok(())
}
fn check_feature_support(
&self,
feature: &TableFeature,
operation: Operation,
) -> DeltaResult<()> {
let info = feature.info();
match &info.kernel_support {
KernelSupport::Supported => {}
KernelSupport::NotSupported => {
return Err(Error::unsupported(format!(
"Feature '{feature}' is not supported"
)))
}
KernelSupport::Custom(check) => {
check(&self.protocol, &self.table_properties, operation)?;
}
};
self.validate_feature_requirements(feature)
}
fn get_enabled_reader_features(&self) -> Vec<TableFeature> {
match self.protocol.min_reader_version() {
TABLE_FEATURES_MIN_READER_VERSION => {
self.protocol
.reader_features()
.map(|f| f.to_vec())
.unwrap_or_default()
}
v if (1..=2).contains(&v) => {
LEGACY_READER_FEATURES
.iter()
.filter(|f| f.is_valid_for_legacy_reader(v))
.cloned()
.collect()
}
_ => Vec::new(),
}
}
fn get_enabled_writer_features(&self) -> Vec<TableFeature> {
match self.protocol.min_writer_version() {
TABLE_FEATURES_MIN_WRITER_VERSION => {
self.protocol
.writer_features()
.map(|f| f.to_vec())
.unwrap_or_default()
}
v if (1..=6).contains(&v) => {
LEGACY_WRITER_FEATURES
.iter()
.filter(|f| f.is_valid_for_legacy_writer(v))
.cloned()
.collect()
}
_ => Vec::new(),
}
}
#[internal_api]
pub(crate) fn ensure_operation_supported(&self, operation: Operation) -> DeltaResult<()> {
match operation {
Operation::Scan | Operation::Cdf => self.ensure_read_supported(operation),
Operation::Write => self.ensure_write_supported(),
}
}
fn ensure_read_supported(&self, operation: Operation) -> DeltaResult<()> {
require!(
self.protocol.min_reader_version() >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_reader_version must be >= {MIN_VALID_RW_VERSION}, got {}",
self.protocol.min_reader_version()
))
);
if self.protocol.min_reader_version() > MAX_VALID_READER_VERSION {
return Err(Error::unsupported(format!(
"Unsupported minimum reader version {}",
self.protocol.min_reader_version()
)));
}
for feature in self.get_enabled_reader_features() {
self.check_feature_support(&feature, operation)?;
}
Ok(())
}
fn ensure_write_supported(&self) -> DeltaResult<()> {
require!(
self.protocol.min_writer_version() >= MIN_VALID_RW_VERSION,
Error::InvalidProtocol(format!(
"min_writer_version must be >= {MIN_VALID_RW_VERSION}, got {}",
self.protocol.min_writer_version()
))
);
if self.protocol.min_writer_version() > MAX_VALID_WRITER_VERSION {
return Err(Error::unsupported(format!(
"Unsupported minimum writer version {}",
self.protocol.min_writer_version()
)));
}
for feature in self.get_enabled_writer_features() {
self.check_feature_support(&feature, Operation::Write)?;
}
if self.is_feature_supported(&TableFeature::Invariants)
&& schema_has_invariants(self.logical_schema.as_ref())
{
return Err(Error::unsupported(
"Column invariants are not yet supported",
));
}
Ok(())
}
#[allow(unused)]
pub(crate) fn in_commit_timestamp_enablement(
&self,
) -> DeltaResult<InCommitTimestampEnablement> {
if !self.is_feature_enabled(&TableFeature::InCommitTimestamp) {
return Ok(InCommitTimestampEnablement::NotEnabled);
}
let enablement_version = self
.table_properties()
.in_commit_timestamp_enablement_version;
let enablement_timestamp = self
.table_properties()
.in_commit_timestamp_enablement_timestamp;
match (enablement_version, enablement_timestamp) {
(Some(version), Some(timestamp)) => Ok(InCommitTimestampEnablement::Enabled {
enablement: Some((version, timestamp)),
}),
(Some(_), None) => Err(Error::generic(
"In-commit timestamp enabled, but enablement timestamp is missing",
)),
(None, Some(_)) => Err(Error::generic(
"In-commit timestamp enabled, but enablement version is missing",
)),
(None, None) => Ok(InCommitTimestampEnablement::Enabled { enablement: None }),
}
}
pub(crate) fn is_row_tracking_suspended(&self) -> bool {
self.table_properties()
.row_tracking_suspended
.unwrap_or(false)
}
pub(crate) fn should_write_row_tracking(&self) -> bool {
self.is_feature_supported(&TableFeature::RowTracking) && !self.is_row_tracking_suspended()
}
#[allow(dead_code)]
fn is_legacy_reader_version(&self) -> bool {
self.protocol.min_reader_version() < TABLE_FEATURES_MIN_READER_VERSION
}
#[allow(dead_code)]
fn is_legacy_writer_version(&self) -> bool {
self.protocol.min_writer_version() < TABLE_FEATURES_MIN_WRITER_VERSION
}
fn has_feature(features: Option<&[TableFeature]>, feature: &TableFeature) -> bool {
features
.map(|features| features.contains(feature))
.unwrap_or(false)
}
#[internal_api]
pub(crate) fn is_feature_supported(&self, feature: &TableFeature) -> bool {
let info = feature.info();
let min_legacy_version = info.min_legacy_version.as_ref();
let min_reader_version =
min_legacy_version.map_or(TABLE_FEATURES_MIN_READER_VERSION, |v| v.reader);
let min_writer_version =
min_legacy_version.map_or(TABLE_FEATURES_MIN_WRITER_VERSION, |v| v.writer);
match info.feature_type {
FeatureType::WriterOnly => {
if self.is_legacy_writer_version() {
self.protocol.min_writer_version() >= min_writer_version
} else {
Self::has_feature(self.protocol.writer_features(), feature)
}
}
FeatureType::ReaderWriter => {
let reader_supported = if self.is_legacy_reader_version() {
self.protocol.min_reader_version() >= min_reader_version
} else {
Self::has_feature(self.protocol.reader_features(), feature)
};
let writer_supported = if self.is_legacy_writer_version() {
self.protocol.min_writer_version() >= min_writer_version
} else {
Self::has_feature(self.protocol.writer_features(), feature)
};
reader_supported && writer_supported
}
FeatureType::Unknown => Self::has_feature(self.protocol.writer_features(), feature),
}
}
#[internal_api]
pub(crate) fn is_feature_enabled(&self, feature: &TableFeature) -> bool {
if !self.is_feature_supported(feature) {
return false;
}
match feature.info().enablement_check {
EnablementCheck::AlwaysIfSupported => true,
EnablementCheck::EnabledIf(check_fn) => check_fn(&self.table_properties),
}
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;
use crate::actions::{Metadata, Protocol};
use crate::schema::ColumnName;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_features::ColumnMappingMode;
use crate::table_features::{
FeatureType, Operation, TableFeature, TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
};
use crate::table_properties::{
TableProperties, COLUMN_MAPPING_MODE, ENABLE_IN_COMMIT_TIMESTAMPS,
};
use crate::utils::test_utils::{
assert_result_error_with_message, test_schema_flat, test_schema_flat_with_column_mapping,
test_schema_nested, test_schema_nested_with_column_mapping, test_schema_with_array,
test_schema_with_array_and_column_mapping, test_schema_with_map,
test_schema_with_map_and_column_mapping,
};
use crate::Error;
use rstest::rstest;
use super::{InCommitTimestampEnablement, TableConfiguration};
fn create_mock_table_config(
props_to_enable: &[(&str, &str)],
features: &[TableFeature],
) -> TableConfiguration {
create_mock_table_config_with_version(
props_to_enable,
Some(features),
TABLE_FEATURES_MIN_READER_VERSION,
TABLE_FEATURES_MIN_WRITER_VERSION,
)
}
fn create_mock_table_config_with_version(
props_to_enable: &[(&str, &str)],
features_opt: Option<&[TableFeature]>,
min_reader_version: i32,
min_writer_version: i32,
) -> TableConfiguration {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter(
props_to_enable
.iter()
.map(|(key, value)| (key.to_string(), value.to_string())),
),
)
.unwrap();
let (reader_features_opt, writer_features_opt) = if let Some(features) = features_opt {
assert!(
features
.iter()
.all(|f| f.feature_type() != FeatureType::Unknown),
"Test helper does not support unknown features"
);
let reader_features = features
.iter()
.filter(|f| f.feature_type() == FeatureType::ReaderWriter);
(
(min_reader_version >= TABLE_FEATURES_MIN_READER_VERSION)
.then_some(reader_features),
(min_writer_version >= TABLE_FEATURES_MIN_WRITER_VERSION).then_some(features),
)
} else {
(None, None)
};
let protocol = Protocol::try_new(
min_reader_version,
min_writer_version,
reader_features_opt,
writer_features_opt,
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}
#[test]
fn dv_supported_not_enabled() {
use crate::table_properties::ENABLE_CHANGE_DATA_FEED;
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string())]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::DeletionVectors));
assert!(!table_config.is_feature_enabled(&TableFeature::DeletionVectors));
}
#[test]
fn dv_enabled() {
use crate::table_properties::{ENABLE_CHANGE_DATA_FEED, ENABLE_DELETION_VECTORS};
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([
(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string()),
(ENABLE_DELETION_VECTORS.to_string(), "true".to_string()),
]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::DeletionVectors));
assert!(table_config.is_feature_enabled(&TableFeature::DeletionVectors));
}
#[rstest]
#[case(-1, 2, Operation::Scan)]
#[case(1, -1, Operation::Write)]
fn reject_protocol_version_below_minimum(
#[case] rv: i32,
#[case] wv: i32,
#[case] op: Operation,
) {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol =
Protocol::new_unchecked(rv, wv, TableFeature::NO_LIST, TableFeature::NO_LIST);
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
let expected = if rv < 1 {
format!("Invalid protocol action in the delta log: min_reader_version must be >= 1, got {rv}")
} else {
format!("Invalid protocol action in the delta log: min_writer_version must be >= 1, got {wv}")
};
assert_result_error_with_message(table_config.ensure_operation_supported(op), &expected);
}
#[test]
fn write_with_cdf() {
use crate::table_properties::{APPEND_ONLY, ENABLE_CHANGE_DATA_FEED};
use TableFeature::*;
let cases = [
(
create_mock_table_config(&[(ENABLE_CHANGE_DATA_FEED, "true")], &[ChangeDataFeed]),
Ok(()),
),
(
create_mock_table_config(
&[(ENABLE_CHANGE_DATA_FEED, "true")],
&[ChangeDataFeed, AppendOnly],
),
Ok(()),
),
(
create_mock_table_config(
&[(ENABLE_CHANGE_DATA_FEED, "true"), (APPEND_ONLY, "true")],
&[ChangeDataFeed, AppendOnly],
),
Ok(()),
),
(
create_mock_table_config_with_version(
&[(ENABLE_CHANGE_DATA_FEED, "true")],
None,
1,
8,
),
Err(Error::unsupported("Unsupported minimum writer version 8")),
),
(
create_mock_table_config(
&[(ENABLE_CHANGE_DATA_FEED, "true"), (APPEND_ONLY, "true")],
&[ChangeDataFeed, ColumnMapping, AppendOnly],
),
Ok(()),
),
(
create_mock_table_config(
&[(APPEND_ONLY, "true")],
&[ChangeDataFeed, ColumnMapping, AppendOnly],
),
Ok(()),
),
(
create_mock_table_config(&[(APPEND_ONLY, "true")], &[AppendOnly]),
Ok(()),
),
];
for (table_configuration, result) in cases {
match (
table_configuration.ensure_operation_supported(Operation::Write),
result,
) {
(Ok(()), Ok(())) => { }
(actual_result, Err(expected)) => {
assert_result_error_with_message(actual_result, &expected.to_string());
}
(Err(actual_result), Ok(())) => {
panic!("Expected Ok but got error: {actual_result}");
}
}
}
}
#[test]
fn ict_enabled_from_table_creation() {
use crate::table_properties::ENABLE_IN_COMMIT_TIMESTAMPS;
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0, HashMap::from_iter([(ENABLE_IN_COMMIT_TIMESTAMPS.to_string(), "true".to_string())]),
)
.unwrap();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, [TableFeature::InCommitTimestamp])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::InCommitTimestamp));
assert!(table_config.is_feature_enabled(&TableFeature::InCommitTimestamp));
let info = table_config.in_commit_timestamp_enablement().unwrap();
assert_eq!(
info,
InCommitTimestampEnablement::Enabled { enablement: None }
);
}
#[test]
fn ict_supported_and_enabled() {
use crate::table_properties::{
ENABLE_IN_COMMIT_TIMESTAMPS, IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP,
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION,
};
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([
(ENABLE_IN_COMMIT_TIMESTAMPS.to_string(), "true".to_string()),
(
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.to_string(),
"5".to_string(),
),
(
IN_COMMIT_TIMESTAMP_ENABLEMENT_TIMESTAMP.to_string(),
"100".to_string(),
),
]),
)
.unwrap();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, [TableFeature::InCommitTimestamp])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::InCommitTimestamp));
assert!(table_config.is_feature_enabled(&TableFeature::InCommitTimestamp));
let info = table_config.in_commit_timestamp_enablement().unwrap();
assert_eq!(
info,
InCommitTimestampEnablement::Enabled {
enablement: Some((5, 100))
}
)
}
#[test]
fn ict_enabled_with_partial_enablement_info() {
use crate::table_properties::{
ENABLE_IN_COMMIT_TIMESTAMPS, IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION,
};
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([
(ENABLE_IN_COMMIT_TIMESTAMPS.to_string(), "true".to_string()),
(
IN_COMMIT_TIMESTAMP_ENABLEMENT_VERSION.to_string(),
"5".to_string(),
),
]),
)
.unwrap();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, [TableFeature::InCommitTimestamp])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::InCommitTimestamp));
assert!(table_config.is_feature_enabled(&TableFeature::InCommitTimestamp));
assert!(matches!(
table_config.in_commit_timestamp_enablement(),
Err(Error::Generic(msg)) if msg.contains("In-commit timestamp enabled, but enablement timestamp is missing")
));
}
#[test]
fn ict_supported_and_not_enabled() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, [TableFeature::InCommitTimestamp])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(table_config.is_feature_supported(&TableFeature::InCommitTimestamp));
assert!(!table_config.is_feature_enabled(&TableFeature::InCommitTimestamp));
let info = table_config.in_commit_timestamp_enablement().unwrap();
assert_eq!(info, InCommitTimestampEnablement::NotEnabled);
}
#[test]
fn fails_on_unsupported_feature() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol = Protocol::try_new_modern(["unknown"], ["unknown"]).unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
table_config
.ensure_operation_supported(Operation::Scan)
.expect_err("Unknown feature is not supported in kernel");
}
#[test]
fn dv_not_supported() {
use crate::table_properties::ENABLE_CHANGE_DATA_FEED;
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string())]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::TimestampWithoutTimezone],
[
TableFeature::TimestampWithoutTimezone,
TableFeature::ChangeDataFeed,
],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(!table_config.is_feature_supported(&TableFeature::DeletionVectors));
assert!(!table_config.is_feature_enabled(&TableFeature::DeletionVectors));
}
#[test]
fn test_try_new_from() {
use crate::table_properties::{ENABLE_CHANGE_DATA_FEED, ENABLE_DELETION_VECTORS};
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec![],
0,
HashMap::from_iter([(ENABLE_CHANGE_DATA_FEED.to_string(), "true".to_string())]),
)
.unwrap();
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors, TableFeature::ChangeDataFeed],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let table_config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
let new_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let new_metadata = Metadata::try_new(
None,
None,
new_schema,
vec![],
0,
HashMap::from_iter([
(ENABLE_CHANGE_DATA_FEED.to_string(), "false".to_string()),
(ENABLE_DELETION_VECTORS.to_string(), "true".to_string()),
]),
)
.unwrap();
let new_protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors, TableFeature::V2Checkpoint],
[
TableFeature::DeletionVectors,
TableFeature::V2Checkpoint,
TableFeature::AppendOnly,
TableFeature::ChangeDataFeed,
],
)
.unwrap();
let new_version = 1;
let new_table_config = TableConfiguration::try_new_from(
&table_config,
Some(new_metadata.clone()),
Some(new_protocol.clone()),
new_version,
)
.unwrap();
assert_eq!(new_table_config.version(), new_version);
assert_eq!(new_table_config.metadata(), &new_metadata);
assert_eq!(new_table_config.protocol(), &new_protocol);
assert_eq!(
new_table_config.logical_schema(),
table_config.logical_schema()
);
assert_eq!(
new_table_config.table_properties(),
&TableProperties {
enable_change_data_feed: Some(false),
enable_deletion_vectors: Some(true),
..Default::default()
}
);
assert_eq!(
new_table_config.column_mapping_mode(),
table_config.column_mapping_mode()
);
assert_eq!(new_table_config.table_root(), table_config.table_root());
}
#[test]
fn test_timestamp_ntz_validation_integration() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"ts",
DataType::TIMESTAMP_NTZ,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol_without_timestamp_ntz_features =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, TableFeature::EMPTY_LIST).unwrap();
let protocol_with_timestamp_ntz_features = Protocol::try_new_modern(
[TableFeature::TimestampWithoutTimezone],
[TableFeature::TimestampWithoutTimezone],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let result = TableConfiguration::try_new(
metadata.clone(),
protocol_without_timestamp_ntz_features,
table_root.clone(),
0,
);
assert_result_error_with_message(result, "Unsupported: Table contains TIMESTAMP_NTZ columns but does not have the required 'timestampNtz' feature in reader and writer features");
let result = TableConfiguration::try_new(
metadata,
protocol_with_timestamp_ntz_features,
table_root,
0,
);
assert!(
result.is_ok(),
"Should succeed when TIMESTAMP_NTZ is used with required features"
);
}
#[test]
fn test_variant_validation_integration() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"v",
DataType::unshredded_variant(),
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol_without_variant_features =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, TableFeature::EMPTY_LIST).unwrap();
let protocol_with_variant_features =
Protocol::try_new_modern([TableFeature::VariantType], [TableFeature::VariantType])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let result: Result<TableConfiguration, Error> = TableConfiguration::try_new(
metadata.clone(),
protocol_without_variant_features,
table_root.clone(),
0,
);
assert_result_error_with_message(result, "Unsupported: Table contains VARIANT columns but does not have the required 'variantType' feature in reader and writer features");
let result =
TableConfiguration::try_new(metadata, protocol_with_variant_features, table_root, 0);
assert!(
result.is_ok(),
"Should succeed when VARIANT is used with required features"
);
}
#[derive(Debug, Clone, Copy)]
enum UnknownFeatureShape {
NotListed,
WriterOnly,
ReaderWriter,
}
fn create_unknown_feature_config(
shape: UnknownFeatureShape,
) -> (TableFeature, TableConfiguration) {
const UNKNOWN: &str = "futureFeature";
let metadata = Metadata::try_new(
None,
None,
Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)])),
vec![],
0,
HashMap::new(),
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let reader_features = match shape {
UnknownFeatureShape::ReaderWriter => vec![UNKNOWN],
_ => vec![],
};
let writer_features = match shape {
UnknownFeatureShape::NotListed => vec![],
_ => vec![UNKNOWN],
};
let protocol = Protocol::try_new_modern(reader_features, writer_features).unwrap();
let tc = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
(TableFeature::unknown(UNKNOWN), tc)
}
#[rstest]
#[case::not_listed(UnknownFeatureShape::NotListed, false)]
#[case::writer_only(UnknownFeatureShape::WriterOnly, true)]
#[case::reader_writer(UnknownFeatureShape::ReaderWriter, true)]
fn test_unknown_feature_protocol_support(
#[case] shape: UnknownFeatureShape,
#[case] expected_supported: bool,
) {
let (unknown, config) = create_unknown_feature_config(shape);
assert_eq!(config.is_feature_supported(&unknown), expected_supported);
}
#[rstest]
#[case::not_listed(UnknownFeatureShape::NotListed, false)]
#[case::writer_only(UnknownFeatureShape::WriterOnly, true)]
#[case::reader_writer(UnknownFeatureShape::ReaderWriter, true)]
fn test_unknown_feature_protocol_enablement(
#[case] shape: UnknownFeatureShape,
#[case] expected_enabled: bool,
) {
let (unknown, config) = create_unknown_feature_config(shape);
assert_eq!(config.is_feature_enabled(&unknown), expected_enabled);
}
#[rstest]
fn test_unknown_feature_capabilities(
#[values(
UnknownFeatureShape::NotListed,
UnknownFeatureShape::WriterOnly,
UnknownFeatureShape::ReaderWriter
)]
shape: UnknownFeatureShape,
#[values(Operation::Scan, Operation::Cdf, Operation::Write)] operation: Operation,
) {
let (_, config) = create_unknown_feature_config(shape);
let expected_ok = match shape {
UnknownFeatureShape::NotListed => true,
UnknownFeatureShape::WriterOnly => operation != Operation::Write,
UnknownFeatureShape::ReaderWriter => false,
};
assert_eq!(
config.ensure_operation_supported(operation).is_ok(),
expected_ok
);
}
#[test]
fn test_is_feature_supported_writer_only() {
let feature = TableFeature::AppendOnly;
let config = create_mock_table_config_with_version(&[], None, 1, 2);
assert!(config.is_feature_supported(&feature));
let config = create_mock_table_config_with_version(&[], None, 1, 1);
assert!(!config.is_feature_supported(&feature));
let config =
create_mock_table_config_with_version(&[], Some(&[TableFeature::AppendOnly]), 2, 7);
assert!(config.is_feature_supported(&feature));
let config =
create_mock_table_config_with_version(&[], Some(&[TableFeature::ChangeDataFeed]), 2, 7);
assert!(!config.is_feature_supported(&feature));
let config = create_mock_table_config(&[], &[TableFeature::AppendOnly]);
assert!(config.is_feature_supported(&feature));
let config = create_mock_table_config(&[], &[TableFeature::DeletionVectors]);
assert!(!config.is_feature_supported(&feature));
}
#[test]
fn test_is_feature_supported_reader_writer() {
let feature = TableFeature::ColumnMapping;
let config = create_mock_table_config_with_version(&[], None, 2, 5);
assert!(config.is_feature_supported(&feature));
let config = create_mock_table_config_with_version(&[], None, 1, 5);
assert!(!config.is_feature_supported(&feature));
let config = create_mock_table_config_with_version(&[], None, 2, 4);
assert!(!config.is_feature_supported(&feature));
let config =
create_mock_table_config_with_version(&[], Some(&[TableFeature::AppendOnly]), 2, 7);
assert!(!config.is_feature_supported(&feature));
let config = create_mock_table_config(&[], &[TableFeature::ColumnMapping]);
assert!(config.is_feature_supported(&feature));
let config = create_mock_table_config(&[], &[TableFeature::DeletionVectors]);
assert!(!config.is_feature_supported(&feature));
}
#[test]
fn test_is_feature_enabled_with_property_check() {
use crate::table_properties::APPEND_ONLY;
let feature = TableFeature::AppendOnly;
let config = create_mock_table_config_with_version(&[], None, 1, 2);
assert!(config.is_feature_supported(&feature));
assert!(!config.is_feature_enabled(&feature));
let config = create_mock_table_config_with_version(&[(APPEND_ONLY, "true")], None, 1, 2);
assert!(config.is_feature_supported(&feature));
assert!(config.is_feature_enabled(&feature));
let config = create_mock_table_config_with_version(&[(APPEND_ONLY, "true")], None, 1, 1);
assert!(!config.is_feature_supported(&feature));
assert!(!config.is_feature_enabled(&feature));
}
#[test]
fn test_is_feature_enabled_always_if_supported() {
let feature = TableFeature::V2Checkpoint;
let config = create_mock_table_config(&[], &[TableFeature::V2Checkpoint]);
assert!(config.is_feature_supported(&feature));
assert!(config.is_feature_enabled(&feature));
let config = create_mock_table_config(&[], &[TableFeature::DeletionVectors]);
assert!(!config.is_feature_supported(&feature));
assert!(!config.is_feature_enabled(&feature));
}
#[test]
fn test_ensure_operation_supported_reads() {
let config = create_mock_table_config(&[], &[]);
assert!(config.ensure_operation_supported(Operation::Scan).is_ok());
let config = create_mock_table_config(&[], &[TableFeature::V2Checkpoint]);
assert!(config.ensure_operation_supported(Operation::Scan).is_ok());
let config = create_mock_table_config_with_version(&[], None, 1, 2);
assert!(config.ensure_operation_supported(Operation::Scan).is_ok());
let config = create_mock_table_config_with_version(
&[],
Some(&[TableFeature::InCommitTimestamp]),
2,
7,
);
assert!(config.ensure_operation_supported(Operation::Scan).is_ok());
}
#[test]
fn test_ensure_operation_supported_writes() {
let config = create_mock_table_config(
&[],
&[
TableFeature::AppendOnly,
TableFeature::DeletionVectors,
TableFeature::DomainMetadata,
TableFeature::Invariants,
TableFeature::RowTracking,
],
);
assert!(config.ensure_operation_supported(Operation::Write).is_ok());
let config = create_mock_table_config(&[], &[TableFeature::TypeWidening]);
assert_result_error_with_message(
config.ensure_operation_supported(Operation::Write),
r#"Feature 'typeWidening' is not supported for writes"#,
);
}
#[test]
fn test_illegal_writer_feature_combination() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, vec![TableFeature::RowTracking])
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert_result_error_with_message(
config.ensure_operation_supported(Operation::Write),
"Feature 'rowTracking' requires 'domainMetadata' to be supported",
);
}
#[test]
fn test_row_tracking_with_domain_metadata_requirement() {
let schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"value",
DataType::INTEGER,
)]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol = Protocol::try_new_modern(
TableFeature::EMPTY_LIST,
vec![TableFeature::RowTracking, TableFeature::DomainMetadata],
)
.unwrap();
let table_root = Url::try_from("file:///").unwrap();
let config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert!(
config.ensure_operation_supported(Operation::Write).is_ok(),
"RowTracking with DomainMetadata should be supported for writes"
);
}
#[test]
fn test_catalog_managed_writes() {
let config = create_mock_table_config(
&[(ENABLE_IN_COMMIT_TIMESTAMPS, "true")],
&[
TableFeature::CatalogManaged,
TableFeature::InCommitTimestamp,
],
);
assert!(config.ensure_operation_supported(Operation::Write).is_ok());
let config = create_mock_table_config(
&[(ENABLE_IN_COMMIT_TIMESTAMPS, "true")],
&[
TableFeature::CatalogOwnedPreview,
TableFeature::InCommitTimestamp,
],
);
assert!(config.ensure_operation_supported(Operation::Write).is_ok());
}
fn schema_with_column_mapping() -> SchemaRef {
let field_a: StructField = serde_json::from_str(
r#"{
"name": "col_a",
"type": "long",
"nullable": true,
"metadata": {
"delta.columnMapping.id": 1,
"delta.columnMapping.physicalName": "phys_col_a"
}
}"#,
)
.unwrap();
let field_b: StructField = serde_json::from_str(
r#"{
"name": "col_b",
"type": "string",
"nullable": true,
"metadata": {
"delta.columnMapping.id": 2,
"delta.columnMapping.physicalName": "phys_col_b"
}
}"#,
)
.unwrap();
Arc::new(StructType::new_unchecked([field_a, field_b]))
}
fn create_table_config_with_column_mapping(
schema: SchemaRef,
column_mapping_mode: &str,
) -> TableConfiguration {
create_table_config_with_column_mapping_and_props(schema, column_mapping_mode, [])
}
fn create_table_config_with_column_mapping_and_props(
schema: SchemaRef,
column_mapping_mode: &str,
extra_props: impl IntoIterator<Item = (&'static str, &'static str)>,
) -> TableConfiguration {
create_partitioned_table_config_with_column_mapping(
schema,
column_mapping_mode,
vec![], extra_props,
)
}
fn create_partitioned_table_config_with_column_mapping(
schema: SchemaRef,
column_mapping_mode: &str,
partition_columns: Vec<String>,
extra_props: impl IntoIterator<Item = (&'static str, &'static str)>,
) -> TableConfiguration {
let mut props: HashMap<String, String> = extra_props
.into_iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();
props.insert(
COLUMN_MAPPING_MODE.to_string(),
column_mapping_mode.to_string(),
);
let metadata = Metadata::try_new(None, None, schema, partition_columns, 0, props).unwrap();
let protocol = Protocol::try_new_legacy(2, 5).unwrap();
let table_root = Url::try_from("file:///").unwrap();
TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap()
}
#[test]
fn test_build_expected_stats_schemas_no_column_mapping() {
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("col_a", DataType::LONG),
StructField::nullable("col_b", DataType::STRING),
]));
let metadata = Metadata::try_new(None, None, schema, vec![], 0, HashMap::new()).unwrap();
let protocol = Protocol::try_new_legacy(1, 2).unwrap();
let table_root = Url::try_from("file:///").unwrap();
let config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
assert_eq!(config.column_mapping_mode(), ColumnMappingMode::None);
let stats_schemas = config.build_expected_stats_schemas(None, None).unwrap();
let min_values = stats_schemas
.physical
.field("minValues")
.unwrap()
.data_type();
if let DataType::Struct(inner) = min_values {
assert!(inner.field("col_a").is_some());
assert!(inner.field("col_b").is_some());
} else {
panic!("Expected minValues to be a struct");
}
}
#[test]
fn test_build_expected_stats_schemas_with_column_mapping() {
let schema = schema_with_column_mapping();
let config = create_table_config_with_column_mapping(schema, "name");
assert_eq!(config.column_mapping_mode(), ColumnMappingMode::Name);
let stats_schemas = config.build_expected_stats_schemas(None, None).unwrap();
let physical_min_values = stats_schemas
.physical
.field("minValues")
.unwrap()
.data_type();
if let DataType::Struct(inner) = physical_min_values {
assert!(
inner.field("phys_col_a").is_some(),
"Physical schema should have phys_col_a"
);
assert!(
inner.field("phys_col_b").is_some(),
"Physical schema should have phys_col_b"
);
assert!(inner.field("col_a").is_none());
} else {
panic!("Expected minValues to be a struct");
}
}
#[test]
fn test_build_expected_stats_schemas_id_mode_has_no_parquet_field_ids() {
use crate::schema::{ColumnMetadataKey, MetadataValue};
let schema = schema_with_column_mapping();
let config = create_table_config_with_column_mapping(schema, "id");
assert_eq!(config.column_mapping_mode(), ColumnMappingMode::Id);
let stats_schemas = config.build_expected_stats_schemas(None, None).unwrap();
let physical_min_values = stats_schemas
.physical
.field("minValues")
.unwrap()
.data_type();
let DataType::Struct(inner) = physical_min_values else {
panic!("Expected minValues to be a struct");
};
assert!(
inner.field("phys_col_a").is_some(),
"Physical schema should have phys_col_a"
);
assert!(
inner.field("phys_col_b").is_some(),
"Physical schema should have phys_col_b"
);
assert!(inner.field("col_a").is_none());
for field in inner.fields() {
assert!(
field
.get_config_value(&ColumnMetadataKey::ParquetFieldId)
.is_none(),
"Physical stats schema field '{}' should not have ParquetFieldId metadata",
field.name()
);
}
let data_schema = schema_with_column_mapping();
let physical_data = data_schema.make_physical(ColumnMappingMode::Id).unwrap();
let data_field = physical_data.field("phys_col_a").unwrap();
assert!(
matches!(
data_field.get_config_value(&ColumnMetadataKey::ParquetFieldId),
Some(MetadataValue::Number(_))
),
"make_physical should inject ParquetFieldId for data schemas in Id mode"
);
}
fn partitioned_schema_with_column_mapping() -> SchemaRef {
let data_col: StructField = serde_json::from_str(
r#"{
"name": "data_col",
"type": "long",
"nullable": true,
"metadata": {
"delta.columnMapping.id": 1,
"delta.columnMapping.physicalName": "phys_data"
}
}"#,
)
.unwrap();
let part_a: StructField = serde_json::from_str(
r#"{
"name": "part_a",
"type": "string",
"nullable": true,
"metadata": {
"delta.columnMapping.id": 2,
"delta.columnMapping.physicalName": "phys_part_a"
}
}"#,
)
.unwrap();
let part_b: StructField = serde_json::from_str(
r#"{
"name": "part_b",
"type": "integer",
"nullable": true,
"metadata": {
"delta.columnMapping.id": 3,
"delta.columnMapping.physicalName": "phys_part_b"
}
}"#,
)
.unwrap();
Arc::new(StructType::new_unchecked([data_col, part_a, part_b]))
}
#[test]
fn test_build_expected_stats_schemas_excludes_partition_columns() {
let config = create_partitioned_table_config_with_column_mapping(
partitioned_schema_with_column_mapping(),
"name",
vec!["part_a".to_string(), "part_b".to_string()],
[],
);
let stats_schemas = config.build_expected_stats_schemas(None, None).unwrap();
let DataType::Struct(inner) = stats_schemas
.physical
.field("minValues")
.unwrap()
.data_type()
else {
panic!("Expected minValues to be a struct");
};
assert!(
inner.field("phys_data").is_some(),
"Data column should be present with physical name"
);
assert!(
inner.field("phys_part_a").is_none(),
"Partition column a should be excluded"
);
assert!(
inner.field("phys_part_b").is_none(),
"Partition column b should be excluded"
);
}
#[test]
fn test_physical_stats_column_names_excludes_partition_columns() {
let config = create_partitioned_table_config_with_column_mapping(
partitioned_schema_with_column_mapping(),
"name",
vec!["part_a".to_string(), "part_b".to_string()],
[],
);
let column_names = config.physical_stats_column_names(None);
assert_eq!(column_names, vec![ColumnName::new(["phys_data"])]);
let required = [
ColumnName::new(["phys_part_a"]),
ColumnName::new(["phys_part_b"]),
];
let column_names = config.physical_stats_column_names(Some(&required));
assert_eq!(column_names, vec![ColumnName::new(["phys_data"])]);
}
#[test]
fn test_physical_stats_column_names_excludes_partition_columns_no_column_mapping() {
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("data_col", DataType::LONG),
StructField::nullable("part_a", DataType::STRING),
StructField::nullable("part_b", DataType::INTEGER),
]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec!["part_a".to_string(), "part_b".to_string()],
0,
HashMap::new(),
)
.unwrap();
let protocol = Protocol::try_new_legacy(1, 2).unwrap();
let table_root = Url::try_from("file:///").unwrap();
let config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
let column_names = config.physical_stats_column_names(None);
assert_eq!(column_names, vec![ColumnName::new(["data_col"])]);
}
#[test]
fn test_physical_stats_column_names_all_partition_columns_returns_empty() {
let schema = Arc::new(StructType::new_unchecked([
StructField::nullable("part_a", DataType::STRING),
StructField::nullable("part_b", DataType::INTEGER),
]));
let metadata = Metadata::try_new(
None,
None,
schema,
vec!["part_a".to_string(), "part_b".to_string()],
0,
HashMap::new(),
)
.unwrap();
let protocol = Protocol::try_new_legacy(1, 2).unwrap();
let table_root = Url::try_from("file:///").unwrap();
let config = TableConfiguration::try_new(metadata, protocol, table_root, 0).unwrap();
let column_names = config.physical_stats_column_names(None);
assert!(column_names.is_empty());
}
#[test]
fn test_physical_stats_column_names_returns_physical_names() {
let schema = schema_with_column_mapping();
let config = create_table_config_with_column_mapping(schema, "name");
let column_names = config.physical_stats_column_names(None );
assert_eq!(
column_names,
vec![
ColumnName::new(["phys_col_a"]),
ColumnName::new(["phys_col_b"]),
],
"Expected physical column names, not logical names"
);
}
#[test]
fn test_physical_stats_column_names_with_data_skipping_stats_columns() {
let config = create_table_config_with_column_mapping_and_props(
test_schema_nested_with_column_mapping(),
"name",
[("delta.dataSkippingStatsColumns", "id,info.name")],
);
let column_names = config.physical_stats_column_names(None);
assert_eq!(
column_names,
vec![
ColumnName::new(["phys_id"]),
ColumnName::new(["phys_info", "phys_name"]),
],
);
}
#[test]
fn test_physical_stats_column_names_skips_nonexistent_data_skipping_stats_column() {
let config = create_table_config_with_column_mapping_and_props(
test_schema_nested_with_column_mapping(),
"name",
[("delta.dataSkippingStatsColumns", "id,nonexistent")],
);
let column_names = config.physical_stats_column_names(None);
assert_eq!(column_names, vec![ColumnName::new(["phys_id"])],);
}
#[rstest]
#[case::flat_none(
test_schema_flat(),
"none",
vec![ColumnName::new(["id"]), ColumnName::new(["name"])],
)]
#[case::flat_name(
test_schema_flat_with_column_mapping(),
"name",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
#[case::flat_id(
test_schema_flat_with_column_mapping(),
"id",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
#[case::nested_none(
test_schema_nested(),
"none",
vec![
ColumnName::new(["id"]),
ColumnName::new(["info", "name"]),
ColumnName::new(["info", "age"]),
],
)]
#[case::nested_name(
test_schema_nested_with_column_mapping(),
"name",
vec![
ColumnName::new(["phys_id"]),
ColumnName::new(["phys_info", "phys_name"]),
ColumnName::new(["phys_info", "phys_age"]),
],
)]
#[case::nested_id(
test_schema_nested_with_column_mapping(),
"id",
vec![
ColumnName::new(["phys_id"]),
ColumnName::new(["phys_info", "phys_name"]),
ColumnName::new(["phys_info", "phys_age"]),
],
)]
#[case::map_none(
test_schema_with_map(),
"none",
vec![ColumnName::new(["id"]), ColumnName::new(["name"])],
)]
#[case::map_name(
test_schema_with_map_and_column_mapping(),
"name",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
#[case::map_id(
test_schema_with_map_and_column_mapping(),
"id",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
#[case::array_none(
test_schema_with_array(),
"none",
vec![ColumnName::new(["id"]), ColumnName::new(["name"])],
)]
#[case::array_name(
test_schema_with_array_and_column_mapping(),
"name",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
#[case::array_id(
test_schema_with_array_and_column_mapping(),
"id",
vec![ColumnName::new(["phys_id"]), ColumnName::new(["phys_name"])],
)]
fn test_physical_stats_column_names_all_schemas(
#[case] schema: SchemaRef,
#[case] mode: &str,
#[case] expected_physical: Vec<ColumnName>,
) {
let config = create_table_config_with_column_mapping(schema, mode);
let physical_names = config.physical_stats_column_names(None);
assert_eq!(
physical_names, expected_physical,
"Incorrect physical column names for mode '{mode}'"
);
}
#[test]
fn test_clustered_table_writes() {
let config = create_mock_table_config(
&[],
&[TableFeature::ClusteredTable, TableFeature::DomainMetadata],
);
assert!(
config.ensure_operation_supported(Operation::Write).is_ok(),
"ClusteredTable with DomainMetadata should be supported for writes"
);
}
}