use std::collections::HashSet;
use crate::schema::{ColumnMetadataKey, StructField, StructType};
use crate::table_configuration::TableConfiguration;
use crate::table_features::{ColumnMappingMode, TableFeature};
use crate::transforms::SchemaTransform;
use crate::{transform_output_type, DeltaResult, Error};
const INVALID_PARQUET_CHARS: &[char] = &[' ', ',', ';', '{', '}', '(', ')', '\n', '\t', '='];
pub(crate) fn validate_schema(
schema: &StructType,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<()> {
if schema.num_fields() == 0 {
return Err(Error::generic("Schema cannot be empty"));
}
let mut validator = SchemaValidator::new(column_mapping_mode);
validator.transform_struct(schema);
validator.into_result()
}
struct SchemaValidator {
cm_enabled: bool,
seen_paths: HashSet<String>,
current_path: Vec<String>,
errors: Vec<String>,
}
impl SchemaValidator {
fn new(column_mapping_mode: ColumnMappingMode) -> Self {
Self {
cm_enabled: !matches!(column_mapping_mode, ColumnMappingMode::None),
seen_paths: HashSet::new(),
current_path: Vec::new(),
errors: Vec::new(),
}
}
fn into_result(self) -> DeltaResult<()> {
if self.errors.is_empty() {
Ok(())
} else {
Err(Error::generic(format!(
"Schema validation failed:\n- {}",
self.errors.join("\n- ")
)))
}
}
}
impl<'a> SchemaTransform<'a> for SchemaValidator {
transform_output_type!(|'a, T| ());
fn transform_struct_field(&mut self, field: &'a StructField) {
if let Err(e) = validate_field_name(field.name(), self.cm_enabled) {
self.errors.push(e.to_string());
}
self.current_path.push(field.name().to_ascii_lowercase());
if field.has_invariants() {
self.errors.push(format!(
"Column '{}' has `delta.invariants` metadata; SQL expression invariants \
are not supported by kernel",
self.current_path.join(".")
));
}
let key = self.current_path.join("\0");
if !self.seen_paths.insert(key) {
self.errors.push(format!(
"Schema contains duplicate column (case-insensitive): '{}'",
field.name()
));
}
self.recurse_into_struct_field(field);
self.current_path.pop();
}
}
pub(crate) fn validate_iceberg_compat_v3_no_legacy_nested_id(
tc: &TableConfiguration,
) -> DeltaResult<()> {
if !tc.is_feature_enabled(&TableFeature::IcebergCompatV3) {
return Ok(());
}
let mut v = LegacyNestedIdsVisitor {
path: vec![],
offender: None,
};
v.transform_struct(&tc.logical_schema());
let Some(offender) = v.offender else {
return Ok(());
};
Err(Error::generic(format!(
"field `{offender}` carries deprecated `{}` metadata; use `{}` instead. \
See https://github.com/delta-io/delta/issues/6688",
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
ColumnMetadataKey::ColumnMappingNestedIds.as_ref(),
)))
}
struct LegacyNestedIdsVisitor {
path: Vec<String>,
offender: Option<String>,
}
impl<'a> SchemaTransform<'a> for LegacyNestedIdsVisitor {
transform_output_type!(|'a, T| ());
fn transform_struct_field(&mut self, f: &'a StructField) {
if self.offender.is_some() {
return;
}
self.path.push(f.name().to_string());
if f.metadata()
.contains_key(ColumnMetadataKey::ParquetFieldNestedIds.as_ref())
{
self.offender = Some(self.path.join("."));
return;
}
self.recurse_into_struct_field(f);
self.path.pop();
}
}
fn validate_field_name(name: &str, cm_enabled: bool) -> DeltaResult<()> {
if name.is_empty() {
return Err(Error::generic("Column name cannot be empty"));
}
if cm_enabled {
if name.contains('\n') {
return Err(Error::generic(format!(
"Column name '{name}' contains a newline character, which is not allowed"
)));
}
} else if name.contains(INVALID_PARQUET_CHARS) {
let invalid: Vec<char> = name
.chars()
.filter(|c| INVALID_PARQUET_CHARS.contains(c))
.collect();
return Err(Error::generic(format!(
"Column name '{name}' contains invalid character(s) {invalid:?} that are not \
allowed in Parquet column names. \
Enable column mapping to use special characters in column names."
)));
}
Ok(())
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataValue, StructField, StructType,
};
fn simple_schema() -> StructType {
StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("name", DataType::STRING, true),
])
}
fn schema_with_underscores() -> StructType {
StructType::new_unchecked(vec![
StructField::new("col_1", DataType::INTEGER, false),
StructField::new("_private", DataType::STRING, true),
StructField::new("CamelCase123", DataType::LONG, false),
])
}
fn schema_with_special_chars() -> StructType {
StructType::new_unchecked(vec![
StructField::new("my column", DataType::INTEGER, false),
StructField::new("col;name", DataType::STRING, true),
])
}
fn schema_with_dot() -> StructType {
StructType::new_unchecked(vec![
StructField::new("a.b", DataType::INTEGER, false),
StructField::new("c", DataType::STRING, true),
])
}
fn schema_different_struct_children() -> StructType {
let inner_a =
StructType::new_unchecked(vec![StructField::new("child", DataType::INTEGER, false)]);
let inner_b =
StructType::new_unchecked(vec![StructField::new("CHILD", DataType::STRING, true)]);
StructType::new_unchecked(vec![
StructField::new("a", DataType::Struct(Box::new(inner_a)), false),
StructField::new("b", DataType::Struct(Box::new(inner_b)), false),
])
}
fn schema_with_space() -> StructType {
StructType::new_unchecked(vec![StructField::new(
"my column",
DataType::INTEGER,
false,
)])
}
fn schema_with_semicolon() -> StructType {
StructType::new_unchecked(vec![StructField::new("col;name", DataType::INTEGER, false)])
}
fn schema_with_newline() -> StructType {
StructType::new_unchecked(vec![StructField::new(
"col\nname",
DataType::INTEGER,
false,
)])
}
fn schema_with_empty_name() -> StructType {
StructType::new_unchecked(vec![StructField::new("", DataType::INTEGER, false)])
}
fn schema_nested_bad_char() -> StructType {
let inner = StructType::new_unchecked(vec![StructField::new(
"bad column",
DataType::INTEGER,
false,
)]);
StructType::new_unchecked(vec![StructField::new(
"parent",
DataType::Struct(Box::new(inner)),
false,
)])
}
fn schema_array_bad_char() -> StructType {
let inner =
StructType::new_unchecked(vec![StructField::new("bad col", DataType::INTEGER, false)]);
StructType::new_unchecked(vec![StructField::new(
"arr",
DataType::Array(Box::new(ArrayType::new(
DataType::Struct(Box::new(inner)),
true,
))),
false,
)])
}
fn schema_map_bad_char() -> StructType {
let inner =
StructType::new_unchecked(vec![StructField::new("bad;val", DataType::INTEGER, false)]);
StructType::new_unchecked(vec![StructField::new(
"m",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::Struct(Box::new(inner)),
true,
))),
false,
)])
}
fn schema_top_level_dup() -> StructType {
let inner =
StructType::new_unchecked(vec![StructField::new("x", DataType::INTEGER, false)]);
StructType::new_unchecked(vec![
StructField::new("a", DataType::Struct(Box::new(inner)), false),
StructField::new("A", DataType::STRING, true),
])
}
fn schema_array_dup() -> StructType {
let inner = StructType::new_unchecked(vec![
StructField::new("x", DataType::INTEGER, false),
StructField::new("X", DataType::STRING, true),
]);
StructType::new_unchecked(vec![StructField::new(
"arr",
DataType::Array(Box::new(ArrayType::new(
DataType::Struct(Box::new(inner)),
true,
))),
false,
)])
}
fn schema_multi_bad() -> StructType {
StructType::new_unchecked(vec![
StructField::new("good", DataType::INTEGER, false),
StructField::new("bad column", DataType::STRING, true),
StructField::new("col;name", DataType::LONG, false),
])
}
fn field_with_invariant(name: &str, data_type: DataType, nullable: bool) -> StructField {
let mut field = StructField::new(name, data_type, nullable);
field.metadata.insert(
ColumnMetadataKey::Invariants.as_ref().to_string(),
MetadataValue::String(r#"{"expression": {"expression": "x > 0"}}"#.to_string()),
);
field
}
fn schema_top_level_invariant() -> StructType {
StructType::new_unchecked(vec![
field_with_invariant("x", DataType::INTEGER, true),
StructField::new("y", DataType::INTEGER, true),
])
}
fn schema_nested_invariant() -> StructType {
let inner =
StructType::new_unchecked(vec![field_with_invariant("child", DataType::INTEGER, true)]);
StructType::new_unchecked(vec![StructField::new(
"parent",
DataType::Struct(Box::new(inner)),
true,
)])
}
fn schema_array_nested_invariant() -> StructType {
let inner =
StructType::new_unchecked(vec![field_with_invariant("child", DataType::INTEGER, true)]);
StructType::new_unchecked(vec![StructField::new(
"arr",
DataType::Array(Box::new(ArrayType::new(
DataType::Struct(Box::new(inner)),
true,
))),
true,
)])
}
fn schema_map_nested_invariant() -> StructType {
let inner =
StructType::new_unchecked(vec![field_with_invariant("child", DataType::INTEGER, true)]);
StructType::new_unchecked(vec![StructField::new(
"map",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::Struct(Box::new(inner)),
true,
))),
true,
)])
}
#[rstest]
#[case::simple(simple_schema(), ColumnMappingMode::None)]
#[case::underscores_digits(schema_with_underscores(), ColumnMappingMode::None)]
#[case::special_chars_with_cm(schema_with_special_chars(), ColumnMappingMode::Name)]
#[case::dot_in_name_with_cm(schema_with_dot(), ColumnMappingMode::Name)]
#[case::different_struct_children(schema_different_struct_children(), ColumnMappingMode::None)]
fn valid_schema_accepted(#[case] schema: StructType, #[case] cm: ColumnMappingMode) {
assert!(validate_schema(&schema, cm).is_ok());
}
#[rstest]
#[case::empty_schema(StructType::new_unchecked(vec![]), ColumnMappingMode::None, &["cannot be empty"])]
#[case::space_without_cm(schema_with_space(), ColumnMappingMode::None, &["invalid character"])]
#[case::semicolon_without_cm(schema_with_semicolon(), ColumnMappingMode::None, &["invalid character"])]
#[case::newline_with_cm(schema_with_newline(), ColumnMappingMode::Name, &["newline"])]
#[case::empty_name(schema_with_empty_name(), ColumnMappingMode::None, &["cannot be empty"])]
#[case::nested_struct_bad_char(schema_nested_bad_char(), ColumnMappingMode::None, &["invalid character"])]
#[case::array_nested_bad_char(schema_array_bad_char(), ColumnMappingMode::None, &["invalid character"])]
#[case::map_nested_bad_char(schema_map_bad_char(), ColumnMappingMode::None, &["invalid character"])]
#[case::top_level_dup(schema_top_level_dup(), ColumnMappingMode::None, &["duplicate"])]
#[case::array_element_dup(schema_array_dup(), ColumnMappingMode::None, &["duplicate"])]
#[case::multi_error(schema_multi_bad(), ColumnMappingMode::None, &["bad column", "col;name"])]
fn invalid_schema_rejected(
#[case] schema: StructType,
#[case] cm: ColumnMappingMode,
#[case] expected_errs: &[&str],
) {
let result = validate_schema(&schema, cm);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
for expected in expected_errs {
assert!(
err.contains(expected),
"Expected '{expected}' in error, got: {err}"
);
}
}
#[rstest]
#[case::top_level(schema_top_level_invariant(), "x")]
#[case::nested_struct(schema_nested_invariant(), "parent.child")]
#[case::array_nested(schema_array_nested_invariant(), "arr.child")]
#[case::map_nested(schema_map_nested_invariant(), "map.child")]
fn invariants_metadata_rejected(#[case] schema: StructType, #[case] expected_path: &str) {
let result = validate_schema(&schema, ColumnMappingMode::None);
let err = result.expect_err("expected delta.invariants metadata rejection");
let msg = err.to_string();
assert!(
msg.contains("delta.invariants"),
"Expected delta.invariants mention in error, got: {msg}"
);
assert!(
msg.contains(expected_path),
"Expected path '{expected_path}' in error, got: {msg}"
);
}
#[rstest]
#[case::clean_schema(simple_schema(), None)]
#[case::column_mapping_nested_id_key_only(schema_with_good_nested_ids(), None)]
#[case::top_level_legacy(schema_with_legacy_at("top"), Some("top".to_string()))]
#[case::nested_struct_legacy(schema_struct_with_legacy_at_inner(), Some("parent.inner".to_string()))]
#[case::array_struct_legacy(schema_array_struct_with_legacy_at_inner(), Some("arr.inner".to_string()))]
#[case::map_value_struct_legacy(schema_map_value_struct_with_legacy_at_inner(), Some("m.inner".to_string()))]
#[case::first_offender_wins(schema_two_legacy_fields(), Some("a".to_string()))]
fn legacy_nested_ids_visitor_finds_first_offender(
#[case] schema: StructType,
#[case] expected: Option<String>,
) {
let mut v = LegacyNestedIdsVisitor {
path: vec![],
offender: None,
};
v.transform_struct(&schema);
assert_eq!(v.offender, expected);
}
fn field_with_metadata(name: &str, dtype: DataType, key: &str) -> StructField {
let mut f = StructField::nullable(name, dtype);
f.metadata.insert(
key.to_string(),
MetadataValue::Other(serde_json::json!({ "x.element": 1 })),
);
f
}
fn schema_with_good_nested_ids() -> StructType {
StructType::new_unchecked(vec![field_with_metadata(
"x",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ColumnMappingNestedIds.as_ref(),
)])
}
fn schema_with_legacy_at(name: &str) -> StructType {
StructType::new_unchecked(vec![field_with_metadata(
name,
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
)])
}
fn schema_struct_with_legacy_at_inner() -> StructType {
let inner = StructType::new_unchecked(vec![field_with_metadata(
"inner",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
)]);
StructType::new_unchecked(vec![StructField::nullable(
"parent",
DataType::Struct(Box::new(inner)),
)])
}
fn schema_array_struct_with_legacy_at_inner() -> StructType {
let inner = StructType::new_unchecked(vec![field_with_metadata(
"inner",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
)]);
StructType::new_unchecked(vec![StructField::nullable(
"arr",
DataType::Array(Box::new(ArrayType::new(
DataType::Struct(Box::new(inner)),
true,
))),
)])
}
fn schema_map_value_struct_with_legacy_at_inner() -> StructType {
let inner = StructType::new_unchecked(vec![field_with_metadata(
"inner",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
)]);
StructType::new_unchecked(vec![StructField::nullable(
"m",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::Struct(Box::new(inner)),
true,
))),
)])
}
fn schema_two_legacy_fields() -> StructType {
StructType::new_unchecked(vec![
field_with_metadata(
"a",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
),
field_with_metadata(
"b",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))),
ColumnMetadataKey::ParquetFieldNestedIds.as_ref(),
),
])
}
}