#[internal_api]
pub(crate) use column_mapping::get_any_level_column_physical_name;
#[deprecated = "Enable internal-api and use TableConfiguration instead"]
pub use column_mapping::validate_schema_column_mapping;
pub use column_mapping::ColumnMappingMode;
pub(crate) use column_mapping::{
assign_column_mapping_metadata, column_mapping_mode, get_column_mapping_mode_from_properties,
get_field_column_mapping_info, physical_to_logical_column_name,
};
use delta_kernel_derive::internal_api;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use strum::{AsRefStr, Display as StrumDisplay, EnumCount, EnumIter, EnumString};
pub(crate) use timestamp_ntz::{
schema_contains_timestamp_ntz, validate_timestamp_ntz_feature_support,
};
use crate::actions::Protocol;
use crate::expressions::Scalar;
use crate::schema::derive_macro_utils::ToDataType;
use crate::schema::DataType;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};
mod column_mapping;
mod timestamp_ntz;
pub const MIN_VALID_RW_VERSION: i32 = 1;
pub const MAX_VALID_READER_VERSION: i32 = 3;
pub const MAX_VALID_WRITER_VERSION: i32 = 7;
pub const TABLE_FEATURES_MIN_READER_VERSION: i32 = 3;
pub const TABLE_FEATURES_MIN_WRITER_VERSION: i32 = 7;
pub const SET_TABLE_FEATURE_SUPPORTED_PREFIX: &str = "delta.feature.";
pub const SET_TABLE_FEATURE_SUPPORTED_VALUE: &str = "supported";
#[derive(
Serialize,
Deserialize,
Debug,
Clone,
Eq,
PartialEq,
EnumString,
StrumDisplay,
AsRefStr,
EnumCount,
Hash,
)]
#[strum(
serialize_all = "camelCase",
parse_err_fn = xxx__not_needed__default_variant_means_parsing_is_infallible__xxx,
parse_err_ty = Infallible // ignored, sadly: https://github.com/Peternator7/strum/issues/430
)]
#[serde(rename_all = "camelCase")]
#[internal_api]
#[derive(EnumIter)]
pub(crate) enum TableFeature {
AppendOnly,
Invariants,
CheckConstraints,
ChangeDataFeed,
GeneratedColumns,
IdentityColumns,
InCommitTimestamp,
RowTracking,
DomainMetadata,
IcebergCompatV1,
IcebergCompatV2,
#[strum(serialize = "clustering")]
#[serde(rename = "clustering")]
ClusteredTable,
MaterializePartitionColumns,
CatalogManaged,
#[strum(serialize = "catalogOwned-preview")]
#[serde(rename = "catalogOwned-preview")]
CatalogOwnedPreview,
ColumnMapping,
DeletionVectors,
#[strum(serialize = "timestampNtz")]
#[serde(rename = "timestampNtz")]
TimestampWithoutTimezone,
TypeWidening,
#[strum(serialize = "typeWidening-preview")]
#[serde(rename = "typeWidening-preview")]
TypeWideningPreview,
V2Checkpoint,
VacuumProtocolCheck,
VariantType,
#[strum(serialize = "variantType-preview")]
#[serde(rename = "variantType-preview")]
VariantTypePreview,
#[strum(serialize = "variantShredding-preview")]
#[serde(rename = "variantShredding-preview")]
VariantShreddingPreview,
#[serde(untagged)]
#[strum(default)]
Unknown(String),
}
pub(crate) static LEGACY_READER_FEATURES: [TableFeature; 1] = [TableFeature::ColumnMapping];
pub(crate) static LEGACY_WRITER_FEATURES: [TableFeature; 7] = [
TableFeature::AppendOnly, TableFeature::Invariants, TableFeature::CheckConstraints, TableFeature::ChangeDataFeed, TableFeature::GeneratedColumns, TableFeature::IdentityColumns, TableFeature::ColumnMapping, ];
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum FeatureType {
WriterOnly,
ReaderWriter,
Unknown,
}
#[derive(Debug, Clone, Copy)]
pub(crate) enum EnablementCheck {
AlwaysIfSupported,
EnabledIf(fn(&TableProperties) -> bool),
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
#[internal_api]
pub(crate) enum Operation {
Scan,
Cdf,
Write,
}
pub(crate) enum KernelSupport {
Supported,
NotSupported,
Custom(fn(&Protocol, &TableProperties, Operation) -> DeltaResult<()>),
}
#[derive(Debug)]
pub(crate) enum FeatureRequirement {
Supported(TableFeature),
Enabled(TableFeature),
NotSupported(TableFeature),
NotEnabled(TableFeature),
Custom(fn(&Protocol, &TableProperties) -> DeltaResult<()>),
}
pub(crate) struct MinReaderWriterVersion {
pub reader: i32,
pub writer: i32,
}
impl MinReaderWriterVersion {
pub(crate) const fn new(reader: i32, writer: i32) -> Self {
Self { reader, writer }
}
}
pub(crate) struct FeatureInfo {
pub feature_type: FeatureType,
pub min_legacy_version: Option<MinReaderWriterVersion>,
pub feature_requirements: &'static [FeatureRequirement],
pub kernel_support: KernelSupport,
pub enablement_check: EnablementCheck,
}
static APPEND_ONLY_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 2)),
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::EnabledIf(|props| props.append_only == Some(true)),
};
static INVARIANTS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 2)),
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static CHECK_CONSTRAINTS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 3)),
feature_requirements: &[],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static CHANGE_DATA_FEED_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 4)),
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_change_data_feed == Some(true)
}),
};
static GENERATED_COLUMNS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 4)),
feature_requirements: &[],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static IDENTITY_COLUMNS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: Some(MinReaderWriterVersion::new(1, 6)),
feature_requirements: &[],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static IN_COMMIT_TIMESTAMP_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Custom(|_protocol, _properties, operation| match operation {
Operation::Scan | Operation::Write | Operation::Cdf => Ok(()),
}),
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_in_commit_timestamps == Some(true)
}),
};
static ROW_TRACKING_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[FeatureRequirement::Supported(TableFeature::DomainMetadata)],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_row_tracking == Some(true) && props.row_tracking_suspended != Some(true)
}),
};
static DOMAIN_METADATA_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static ICEBERG_COMPAT_V1_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[
FeatureRequirement::Enabled(TableFeature::ColumnMapping),
FeatureRequirement::Custom(|_protocol, properties| {
let mode = properties.column_mapping_mode;
if !matches!(
mode,
Some(ColumnMappingMode::Name) | Some(ColumnMappingMode::Id)
) {
return Err(Error::generic(
"IcebergCompatV1 requires Column Mapping in 'name' or 'id' mode",
));
}
Ok(())
}),
FeatureRequirement::NotSupported(TableFeature::DeletionVectors),
],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_iceberg_compat_v1 == Some(true)
}),
};
static ICEBERG_COMPAT_V2_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[
FeatureRequirement::Enabled(TableFeature::ColumnMapping),
FeatureRequirement::Custom(|_protocol, properties| {
let mode = properties.column_mapping_mode;
if !matches!(
mode,
Some(ColumnMappingMode::Name) | Some(ColumnMappingMode::Id)
) {
return Err(Error::generic(
"IcebergCompatV2 requires Column Mapping in 'name' or 'id' mode",
));
}
Ok(())
}),
FeatureRequirement::NotEnabled(TableFeature::IcebergCompatV1),
FeatureRequirement::NotEnabled(TableFeature::DeletionVectors),
],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_iceberg_compat_v2 == Some(true)
}),
};
static CLUSTERED_TABLE_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[FeatureRequirement::Supported(TableFeature::DomainMetadata)],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static MATERIALIZE_PARTITION_COLUMNS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::WriterOnly,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static CATALOG_MANAGED_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Custom(|_, _, op| match op {
Operation::Scan | Operation::Write => Ok(()),
Operation::Cdf => Err(Error::unsupported(
"Feature 'catalogManaged' is not supported for CDF",
)),
}),
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static CATALOG_OWNED_PREVIEW_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Custom(|_, _, op| match op {
Operation::Scan | Operation::Write => Ok(()),
Operation::Cdf => Err(Error::unsupported(
"Feature 'catalogOwned-preview' is not supported for CDF",
)),
}),
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static COLUMN_MAPPING_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: Some(MinReaderWriterVersion::new(2, 5)),
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.column_mapping_mode.is_some()
&& props.column_mapping_mode != Some(ColumnMappingMode::None)
}),
};
static DELETION_VECTORS_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::EnabledIf(|props| {
props.enable_deletion_vectors == Some(true)
}),
};
static TIMESTAMP_WITHOUT_TIMEZONE_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static TYPE_WIDENING_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Custom(|_, _, op| match op {
Operation::Scan | Operation::Cdf => Ok(()),
Operation::Write => Err(Error::unsupported(
"Feature 'typeWidening' is not supported for writes",
)),
}),
enablement_check: EnablementCheck::EnabledIf(|props| props.enable_type_widening == Some(true)),
};
static TYPE_WIDENING_PREVIEW_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Custom(|_, _, op| match op {
Operation::Scan | Operation::Cdf => Ok(()),
Operation::Write => Err(Error::unsupported(
"Feature 'typeWidening-preview' is not supported for writes",
)),
}),
enablement_check: EnablementCheck::EnabledIf(|props| props.enable_type_widening == Some(true)),
};
static V2_CHECKPOINT_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static VACUUM_PROTOCOL_CHECK_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static VARIANT_TYPE_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static VARIANT_TYPE_PREVIEW_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static VARIANT_SHREDDING_PREVIEW_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::ReaderWriter,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::Supported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
static UNKNOWN_FEATURE_INFO: FeatureInfo = FeatureInfo {
feature_type: FeatureType::Unknown,
min_legacy_version: None,
feature_requirements: &[],
kernel_support: KernelSupport::NotSupported,
enablement_check: EnablementCheck::AlwaysIfSupported,
};
impl TableFeature {
#[cfg(test)]
pub(crate) const NO_LIST: Option<Vec<TableFeature>> = None;
#[cfg(test)]
pub(crate) const EMPTY_LIST: Vec<TableFeature> = vec![];
pub(crate) fn feature_type(&self) -> FeatureType {
match self {
TableFeature::CatalogManaged
| TableFeature::CatalogOwnedPreview
| TableFeature::ColumnMapping
| TableFeature::DeletionVectors
| TableFeature::TimestampWithoutTimezone
| TableFeature::TypeWidening
| TableFeature::TypeWideningPreview
| TableFeature::V2Checkpoint
| TableFeature::VacuumProtocolCheck
| TableFeature::VariantType
| TableFeature::VariantTypePreview
| TableFeature::VariantShreddingPreview => FeatureType::ReaderWriter,
TableFeature::AppendOnly
| TableFeature::DomainMetadata
| TableFeature::Invariants
| TableFeature::RowTracking
| TableFeature::CheckConstraints
| TableFeature::ChangeDataFeed
| TableFeature::GeneratedColumns
| TableFeature::IdentityColumns
| TableFeature::InCommitTimestamp
| TableFeature::IcebergCompatV1
| TableFeature::IcebergCompatV2
| TableFeature::ClusteredTable
| TableFeature::MaterializePartitionColumns => FeatureType::WriterOnly,
TableFeature::Unknown(_) => FeatureType::Unknown,
}
}
pub(crate) fn is_valid_for_legacy_reader(&self, reader_version: i32) -> bool {
matches!(&self.info().min_legacy_version, Some(v) if reader_version >= v.reader)
}
pub(crate) fn is_valid_for_legacy_writer(&self, writer_version: i32) -> bool {
matches!(&self.info().min_legacy_version, Some(v) if writer_version >= v.writer)
}
pub(crate) fn info(&self) -> &FeatureInfo {
match self {
TableFeature::AppendOnly => &APPEND_ONLY_INFO,
TableFeature::Invariants => &INVARIANTS_INFO,
TableFeature::CheckConstraints => &CHECK_CONSTRAINTS_INFO,
TableFeature::ChangeDataFeed => &CHANGE_DATA_FEED_INFO,
TableFeature::GeneratedColumns => &GENERATED_COLUMNS_INFO,
TableFeature::IdentityColumns => &IDENTITY_COLUMNS_INFO,
TableFeature::InCommitTimestamp => &IN_COMMIT_TIMESTAMP_INFO,
TableFeature::RowTracking => &ROW_TRACKING_INFO,
TableFeature::DomainMetadata => &DOMAIN_METADATA_INFO,
TableFeature::IcebergCompatV1 => &ICEBERG_COMPAT_V1_INFO,
TableFeature::IcebergCompatV2 => &ICEBERG_COMPAT_V2_INFO,
TableFeature::ClusteredTable => &CLUSTERED_TABLE_INFO,
TableFeature::MaterializePartitionColumns => &MATERIALIZE_PARTITION_COLUMNS_INFO,
TableFeature::CatalogManaged => &CATALOG_MANAGED_INFO,
TableFeature::CatalogOwnedPreview => &CATALOG_OWNED_PREVIEW_INFO,
TableFeature::ColumnMapping => &COLUMN_MAPPING_INFO,
TableFeature::DeletionVectors => &DELETION_VECTORS_INFO,
TableFeature::TimestampWithoutTimezone => &TIMESTAMP_WITHOUT_TIMEZONE_INFO,
TableFeature::TypeWidening => &TYPE_WIDENING_INFO,
TableFeature::TypeWideningPreview => &TYPE_WIDENING_PREVIEW_INFO,
TableFeature::V2Checkpoint => &V2_CHECKPOINT_INFO,
TableFeature::VacuumProtocolCheck => &VACUUM_PROTOCOL_CHECK_INFO,
TableFeature::VariantType => &VARIANT_TYPE_INFO,
TableFeature::VariantTypePreview => &VARIANT_TYPE_PREVIEW_INFO,
TableFeature::VariantShreddingPreview => &VARIANT_SHREDDING_PREVIEW_INFO,
TableFeature::Unknown(_) => &UNKNOWN_FEATURE_INFO,
}
}
}
impl ToDataType for TableFeature {
fn to_data_type() -> DataType {
DataType::STRING
}
}
impl From<TableFeature> for Scalar {
fn from(feature: TableFeature) -> Self {
Scalar::String(feature.to_string())
}
}
#[cfg(test)] impl TableFeature {
pub(crate) fn unknown(s: impl ToString) -> Self {
TableFeature::Unknown(s.to_string())
}
}
pub(crate) trait IntoTableFeature {
fn into_table_feature(self) -> TableFeature;
}
impl IntoTableFeature for TableFeature {
fn into_table_feature(self) -> TableFeature {
self
}
}
impl IntoTableFeature for &TableFeature {
fn into_table_feature(self) -> TableFeature {
self.clone()
}
}
impl IntoTableFeature for &str {
fn into_table_feature(self) -> TableFeature {
#[allow(clippy::unwrap_used)] self.parse().unwrap()
}
}
impl IntoTableFeature for String {
fn into_table_feature(self) -> TableFeature {
self.as_str().into_table_feature()
}
}
pub(crate) fn format_features(features: &[TableFeature]) -> String {
let feature_strings: Vec<&str> = features.iter().map(|f| f.as_ref()).collect_vec();
format!("[{}]", feature_strings.join(", "))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_unknown_features() {
let mixed_reader = &[
TableFeature::DeletionVectors,
TableFeature::unknown("cool_feature"),
TableFeature::ColumnMapping,
];
let mixed_writer = &[
TableFeature::DeletionVectors,
TableFeature::unknown("cool_feature"),
TableFeature::AppendOnly,
];
let reader_string = serde_json::to_string(mixed_reader).unwrap();
let writer_string = serde_json::to_string(mixed_writer).unwrap();
assert_eq!(
&reader_string,
"[\"deletionVectors\",\"cool_feature\",\"columnMapping\"]"
);
assert_eq!(
&writer_string,
"[\"deletionVectors\",\"cool_feature\",\"appendOnly\"]"
);
let typed_reader: Vec<TableFeature> = serde_json::from_str(&reader_string).unwrap();
let typed_writer: Vec<TableFeature> = serde_json::from_str(&writer_string).unwrap();
assert_eq!(typed_reader.len(), 3);
assert_eq!(&typed_reader, mixed_reader);
assert_eq!(typed_writer.len(), 3);
assert_eq!(&typed_writer, mixed_writer);
}
#[test]
fn test_roundtrip_table_features() {
use strum::IntoEnumIterator as _;
for feature in TableFeature::iter() {
let expected = match feature {
TableFeature::AppendOnly => "appendOnly",
TableFeature::Invariants => "invariants",
TableFeature::CheckConstraints => "checkConstraints",
TableFeature::ChangeDataFeed => "changeDataFeed",
TableFeature::GeneratedColumns => "generatedColumns",
TableFeature::IdentityColumns => "identityColumns",
TableFeature::InCommitTimestamp => "inCommitTimestamp",
TableFeature::RowTracking => "rowTracking",
TableFeature::DomainMetadata => "domainMetadata",
TableFeature::IcebergCompatV1 => "icebergCompatV1",
TableFeature::IcebergCompatV2 => "icebergCompatV2",
TableFeature::ClusteredTable => "clustering",
TableFeature::MaterializePartitionColumns => "materializePartitionColumns",
TableFeature::CatalogManaged => "catalogManaged",
TableFeature::CatalogOwnedPreview => "catalogOwned-preview",
TableFeature::ColumnMapping => "columnMapping",
TableFeature::DeletionVectors => "deletionVectors",
TableFeature::TimestampWithoutTimezone => "timestampNtz",
TableFeature::TypeWidening => "typeWidening",
TableFeature::TypeWideningPreview => "typeWidening-preview",
TableFeature::V2Checkpoint => "v2Checkpoint",
TableFeature::VacuumProtocolCheck => "vacuumProtocolCheck",
TableFeature::VariantType => "variantType",
TableFeature::VariantTypePreview => "variantType-preview",
TableFeature::VariantShreddingPreview => "variantShredding-preview",
TableFeature::Unknown(_) => continue, };
assert_eq!(feature.to_string(), expected);
assert_eq!(feature, expected.into_table_feature());
let serialized = serde_json::to_string(&feature).unwrap();
assert_eq!(serialized, format!("\"{expected}\""));
let deserialized: TableFeature = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, feature);
}
}
}