use std::cmp::Ordering;
use indexmap::IndexMap;
use crate::error::Error;
use crate::expressions::ColumnName;
use crate::schema::validation::validate_schema;
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_features::{
find_max_column_id_in_schema, try_assign_flat_column_mapping_info, validate_column_mapping_id,
ColumnMappingMode,
};
use crate::DeltaResult;
#[derive(Debug, Clone)]
pub(crate) enum SchemaOperation {
AddColumn { field: StructField },
SetNullable { column: ColumnName },
}
fn modify_field_at_path(
fields: &mut IndexMap<String, StructField>,
path: &[String],
modifier: &dyn Fn(&mut StructField) -> DeltaResult<()>,
) -> DeltaResult<()> {
let (first, rest) = path
.split_first()
.ok_or_else(|| Error::generic("empty column path"))?;
let lowered = first.to_lowercase();
let idx = fields
.iter()
.position(|(_, f)| f.name().to_lowercase() == lowered)
.ok_or_else(|| Error::generic(format!("field '{first}' does not exist")))?;
if !rest.is_empty() {
let (_, field) = fields
.get_index_mut(idx)
.ok_or_else(|| Error::internal_error("idx from position() invalid"))?;
let DataType::Struct(inner) = &mut field.data_type else {
return Err(Error::generic(format!(
"intermediate field '{first}' is not a struct"
)));
};
return modify_field_at_path(inner.field_map_mut(), rest, modifier);
}
let (_, field) = fields
.get_index_mut(idx)
.ok_or_else(|| Error::internal_error("idx from position() invalid"))?;
modifier(field)
}
#[derive(Debug)]
pub(crate) struct SchemaEvolutionResult {
pub schema: SchemaRef,
pub new_max_column_id: Option<i64>,
}
pub(crate) fn apply_schema_operations(
mut schema: StructType,
operations: Vec<SchemaOperation>,
column_mapping_mode: ColumnMappingMode,
current_max_column_id: Option<i64>,
) -> DeltaResult<SchemaEvolutionResult> {
let cm_enabled = column_mapping_mode != ColumnMappingMode::None;
if let Some(seed) = current_max_column_id {
validate_column_mapping_id(seed).map_err(|e| {
Error::invalid_protocol(format!(
"Table property `delta.columnMapping.maxColumnId`: {e}"
))
})?;
}
let mut max_id = if cm_enabled {
current_max_column_id.map(|cfg| cfg.max(find_max_column_id_in_schema(&schema).unwrap_or(0)))
} else {
current_max_column_id
};
for op in operations {
match op {
SchemaOperation::AddColumn { field } => {
if field.is_metadata_column() {
return Err(Error::schema(format!(
"Cannot add column '{}': metadata columns are not allowed in \
a table schema",
field.name()
)));
}
if !matches!(field.data_type, DataType::Primitive(_)) {
StructType::ensure_no_metadata_columns_in_field(&field)?;
}
let lowered = field.name().to_lowercase();
if schema.fields().any(|f| f.name().to_lowercase() == lowered) {
return Err(Error::schema(format!(
"Cannot add column '{}': a column with that name already exists",
field.name()
)));
}
if !field.is_nullable() {
return Err(Error::schema(format!(
"Cannot add non-nullable column '{}'. Added columns must be nullable \
because existing data files do not contain this column.",
field.name()
)));
}
let field = if cm_enabled {
let id = max_id.as_mut().ok_or_else(|| {
Error::invalid_protocol(
"Column mapping is enabled but delta.columnMapping.maxColumnId \
is not set in table properties",
)
})?;
try_assign_flat_column_mapping_info(&field, id)?
} else {
field
};
schema.field_map_mut().insert(field.name().clone(), field);
}
SchemaOperation::SetNullable { column } => {
modify_field_at_path(schema.field_map_mut(), column.path(), &|f| {
f.nullable = true;
Ok(())
})
.map_err(|e| {
Error::generic(format!("Cannot set nullable on column '{column}': {e}"))
})?;
}
}
}
validate_schema(&schema, column_mapping_mode)?;
let new_max_column_id = match max_id.cmp(¤t_max_column_id) {
Ordering::Greater => max_id,
Ordering::Equal => None,
Ordering::Less => {
return Err(Error::internal_error(
"max column ID went backwards during schema evolution",
))
}
};
Ok(SchemaEvolutionResult {
schema: schema.into(),
new_max_column_id,
})
}
#[cfg(test)]
mod tests {
use std::collections::HashSet;
use rstest::rstest;
use super::*;
use crate::expressions::{column_name, ColumnName};
use crate::schema::{
ArrayType, ColumnMetadataKey, DataType, MapType, MetadataColumnSpec, MetadataValue,
StructField, StructType,
};
fn simple_schema() -> StructType {
StructType::try_new(vec![
StructField::not_null("id", DataType::INTEGER),
StructField::nullable("name", DataType::STRING),
])
.unwrap()
}
fn add_col(name: &str, nullable: bool) -> SchemaOperation {
let field = if nullable {
StructField::nullable(name, DataType::STRING)
} else {
StructField::not_null(name, DataType::STRING)
};
SchemaOperation::AddColumn { field }
}
fn add_struct_with_nested_leaf(name: &str, leaf_name: &str) -> SchemaOperation {
let inner =
StructType::try_new(vec![StructField::nullable(leaf_name, DataType::STRING)]).unwrap();
SchemaOperation::AddColumn {
field: StructField::nullable(name, inner),
}
}
fn nested_schema() -> StructType {
StructType::try_new(vec![
StructField::not_null("id", DataType::INTEGER),
StructField::nullable(
"address",
StructType::try_new(vec![
StructField::not_null("city", DataType::STRING),
StructField::nullable("zip", DataType::STRING),
])
.unwrap(),
),
])
.unwrap()
}
fn into_field_map(schema: StructType) -> IndexMap<String, StructField> {
schema
.into_fields()
.map(|f| (f.name().clone(), f))
.collect()
}
fn set_nullable_modifier(f: &mut StructField) -> DeltaResult<()> {
f.nullable = true;
Ok(())
}
fn modify_field_at_path_test_helper(
schema: StructType,
path: &[String],
) -> DeltaResult<IndexMap<String, StructField>> {
let mut fields = into_field_map(schema);
modify_field_at_path(&mut fields, path, &set_nullable_modifier)?;
Ok(fields)
}
#[test]
fn modify_top_level_field_sets_nullable() {
let path = vec!["id".to_string()];
let result = modify_field_at_path_test_helper(simple_schema(), &path).unwrap();
let id = result.values().find(|f| f.name() == "id").unwrap();
assert!(id.is_nullable());
}
#[test]
fn modify_nested_field_modifies_only_leaf() {
let path = vec!["address".to_string(), "city".to_string()];
let result = modify_field_at_path_test_helper(nested_schema(), &path).unwrap();
let addr = result.values().find(|f| f.name() == "address").unwrap();
match addr.data_type() {
DataType::Struct(s) => assert!(s.field("city").unwrap().is_nullable()),
other => panic!("Expected Struct, got: {other:?}"),
}
}
#[test]
fn modify_nested_leaf_preserves_other_fields() {
let path = vec!["address".to_string(), "city".to_string()];
let result = modify_field_at_path_test_helper(nested_schema(), &path).unwrap();
let id = result.values().find(|f| f.name() == "id").unwrap();
assert!(!id.is_nullable());
let addr = result.values().find(|f| f.name() == "address").unwrap();
match addr.data_type() {
DataType::Struct(s) => assert!(s.field("zip").unwrap().is_nullable()),
other => panic!("Expected Struct, got: {other:?}"),
}
}
#[test]
fn modify_nonexistent_field_fails() {
let path = vec!["nope".to_string()];
let err = modify_field_at_path_test_helper(simple_schema(), &path).unwrap_err();
assert!(err.to_string().contains("does not exist"));
}
#[test]
fn modify_through_non_struct_fails() {
let path = vec!["name".to_string(), "inner".to_string()];
let err = modify_field_at_path_test_helper(simple_schema(), &path).unwrap_err();
assert!(err.to_string().contains("not a struct"));
}
#[test]
fn modify_case_insensitive_lookup_finds_field() {
let path = vec!["ID".to_string()];
let result = modify_field_at_path_test_helper(simple_schema(), &path).unwrap();
let id = result.values().find(|f| f.name() == "id").unwrap();
assert!(id.is_nullable());
}
#[rstest]
#[case::dup_exact(vec![add_col("name", true)], "already exists")]
#[case::dup_case_insensitive(vec![add_col("Name", true)], "already exists")]
#[case::dup_within_batch(
vec![add_col("email", true), add_col("email", true)],
"already exists"
)]
#[case::non_nullable(vec![add_col("age", false)], "non-nullable")]
#[case::invalid_parquet_char(vec![add_col("foo,bar", true)], "invalid character")]
#[case::nested_invalid_parquet_char(
vec![add_struct_with_nested_leaf("addr", "bad,leaf")],
"invalid character"
)]
#[case::metadata_column(
vec![SchemaOperation::AddColumn {
field: StructField::create_metadata_column("row_idx", MetadataColumnSpec::RowIndex),
}],
"metadata columns are not allowed"
)]
fn apply_schema_operations_rejects(
#[case] ops: Vec<SchemaOperation>,
#[case] error_contains: &str,
) {
let err = apply_schema_operations(simple_schema(), ops, ColumnMappingMode::None, None)
.unwrap_err();
assert!(err.to_string().contains(error_contains));
}
#[rstest]
#[case::single(vec![add_col("email", true)], &["id", "name", "email"])]
#[case::multiple(
vec![add_col("email", true), add_col("age", true)],
&["id", "name", "email", "age"]
)]
fn apply_schema_operations_succeeds(
#[case] ops: Vec<SchemaOperation>,
#[case] expected_names: &[&str],
) {
let result =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::None, None).unwrap();
let actual: Vec<&str> = result.schema.fields().map(|f| f.name().as_str()).collect();
assert_eq!(&actual, expected_names);
}
fn deeply_nested_required_schema() -> StructType {
StructType::try_new(vec![
StructField::not_null("id", DataType::INTEGER),
StructField::nullable(
"address",
StructType::try_new(vec![StructField::nullable(
"location",
StructType::try_new(vec![StructField::not_null("zipcode", DataType::STRING)])
.unwrap(),
)])
.unwrap(),
),
])
.unwrap()
}
#[rstest]
#[case::on_required_field(simple_schema(), column_name!("id"))]
#[case::already_nullable_is_noop(simple_schema(), column_name!("name"))]
#[case::case_insensitive(simple_schema(), column_name!("ID"))]
#[case::nested_field(nested_schema(), column_name!("address.city"))]
#[case::deeply_nested_field(deeply_nested_required_schema(), column_name!("address.location.zipcode"))]
fn set_nullable_succeeds(#[case] schema: StructType, #[case] column: ColumnName) {
let ops = vec![SchemaOperation::SetNullable {
column: column.clone(),
}];
let result = apply_schema_operations(schema, ops, ColumnMappingMode::None, None).unwrap();
assert!(result.schema.field_at_path(column.path()).is_nullable());
}
#[rstest]
#[case::nonexistent_column(column_name!("nonexistent"), "does not exist")]
#[case::through_non_struct(column_name!("name.inner"), "not a struct")]
#[case::empty_path(ColumnName::new(Vec::<String>::new()), "empty column path")]
fn set_nullable_fails(#[case] column: ColumnName, #[case] error_contains: &str) {
let ops = vec![SchemaOperation::SetNullable { column }];
let err = apply_schema_operations(simple_schema(), ops, ColumnMappingMode::None, None)
.unwrap_err();
assert!(
err.to_string().contains(error_contains),
"expected error to contain '{error_contains}', got: {err}"
);
}
#[test]
fn set_nullable_on_struct_itself_preserves_inner_fields() {
let schema = StructType::try_new(vec![StructField::not_null(
"address",
StructType::try_new(vec![StructField::not_null("city", DataType::STRING)]).unwrap(),
)])
.unwrap();
let ops = vec![SchemaOperation::SetNullable {
column: column_name!("address"),
}];
let result = apply_schema_operations(schema, ops, ColumnMappingMode::None, None).unwrap();
let addr = result.schema.field("address").unwrap();
assert!(addr.is_nullable(), "struct itself must be nullable");
match addr.data_type() {
DataType::Struct(s) => assert!(
!s.field("city").unwrap().is_nullable(),
"inner field must remain NOT NULL"
),
other => panic!("Expected Struct, got: {other:?}"),
}
}
#[test]
fn chain_add_and_set_nullable_applies_both() {
let ops = vec![
SchemaOperation::AddColumn {
field: StructField::nullable("email", DataType::STRING),
},
SchemaOperation::SetNullable {
column: column_name!("id"),
},
];
let result =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::None, None).unwrap();
assert_eq!(result.schema.fields().count(), 3);
assert!(result.schema.field("email").is_some());
assert!(result.schema.field("id").unwrap().is_nullable());
}
#[test]
fn set_nullable_nested_preserves_top_level_order() {
let schema = StructType::try_new(vec![
StructField::not_null("alpha", DataType::INTEGER),
StructField::nullable(
"beta",
StructType::try_new(vec![StructField::not_null("nested", DataType::STRING)])
.unwrap(),
),
StructField::not_null("gamma", DataType::STRING),
])
.unwrap();
let ops = vec![SchemaOperation::SetNullable {
column: column_name!("beta.nested"),
}];
let result = apply_schema_operations(schema, ops, ColumnMappingMode::None, None).unwrap();
let names: Vec<&String> = result.schema.fields().map(|f| f.name()).collect();
assert_eq!(names, vec!["alpha", "beta", "gamma"]);
}
fn get_cm_id(field: &StructField) -> i64 {
field
.column_mapping_id()
.expect("field should have column mapping ID")
}
fn get_physical_name(field: &StructField) -> String {
match field
.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
.expect("field should have physical name")
{
MetadataValue::String(s) => s.clone(),
other => panic!("expected String, got {other:?}"),
}
}
#[rstest]
#[case::name_mode(ColumnMappingMode::Name, 2, 3)]
#[case::id_mode(ColumnMappingMode::Id, 5, 6)]
fn add_column_with_column_mapping_assigns_id_and_physical_name(
#[case] mode: ColumnMappingMode,
#[case] current_max: i64,
#[case] expected_id: i64,
) {
let ops = vec![SchemaOperation::AddColumn {
field: StructField::nullable("email", DataType::STRING),
}];
let result =
apply_schema_operations(simple_schema(), ops, mode, Some(current_max)).unwrap();
let email_field = result.schema.field("email").unwrap();
assert_eq!(get_cm_id(email_field), expected_id);
assert!(get_physical_name(email_field).starts_with("col-"));
assert_eq!(result.new_max_column_id, Some(expected_id));
}
#[test]
fn add_column_without_max_column_id_fails_when_mapping_enabled() {
let ops = vec![SchemaOperation::AddColumn {
field: StructField::nullable("email", DataType::STRING),
}];
let err = apply_schema_operations(simple_schema(), ops, ColumnMappingMode::Name, None)
.unwrap_err();
assert!(matches!(err, Error::InvalidProtocol(_)));
assert!(err.to_string().contains("maxColumnId"));
}
#[test]
fn add_multiple_columns_with_column_mapping_assigns_unique_ids() {
let ops = vec![
SchemaOperation::AddColumn {
field: StructField::nullable("a", DataType::STRING),
},
SchemaOperation::AddColumn {
field: StructField::nullable("b", DataType::STRING),
},
SchemaOperation::AddColumn {
field: StructField::nullable("c", DataType::STRING),
},
];
let result =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::Name, Some(10))
.unwrap();
let id_a = get_cm_id(result.schema.field("a").unwrap());
let id_b = get_cm_id(result.schema.field("b").unwrap());
let id_c = get_cm_id(result.schema.field("c").unwrap());
assert_eq!(id_a, 11);
assert_eq!(id_b, 12);
assert_eq!(id_c, 13);
let name_a = get_physical_name(result.schema.field("a").unwrap());
let name_b = get_physical_name(result.schema.field("b").unwrap());
let name_c = get_physical_name(result.schema.field("c").unwrap());
assert_ne!(name_a, name_b);
assert_ne!(name_b, name_c);
assert_ne!(name_a, name_c);
assert_eq!(result.new_max_column_id, Some(13));
}
fn struct_of_two_primitives() -> DataType {
DataType::Struct(Box::new(
StructType::try_new(vec![
StructField::nullable("a", DataType::STRING),
StructField::nullable("b", DataType::STRING),
])
.unwrap(),
))
}
#[rstest]
#[case::nested_struct(struct_of_two_primitives(), 3)]
#[case::array_of_primitive(
DataType::Array(Box::new(ArrayType::new(DataType::STRING, true))),
1
)]
#[case::map_of_primitives(
DataType::Map(Box::new(MapType::new(DataType::STRING, DataType::INTEGER, true))),
1
)]
#[case::array_of_struct(
DataType::Array(Box::new(ArrayType::new(struct_of_two_primitives(), true))),
3
)]
#[case::map_value_is_struct(
DataType::Map(Box::new(MapType::new(
DataType::STRING,
struct_of_two_primitives(),
true,
))),
3
)]
#[case::map_key_is_struct(
DataType::Map(Box::new(MapType::new(
struct_of_two_primitives(),
DataType::INTEGER,
true,
))),
3
)]
fn add_complex_column_with_column_mapping_assigns_ids_to_all_inner_fields(
#[case] data_type: DataType,
#[case] expected_id_count: usize,
) {
let ops = vec![SchemaOperation::AddColumn {
field: StructField::nullable("col", data_type),
}];
let result =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::Name, Some(10))
.unwrap();
let added = result.schema.field("col").unwrap();
let ids = added.collect_column_mapping_ids();
let unique: HashSet<_> = ids.iter().copied().collect();
assert_eq!(ids.len(), expected_id_count, "expected ID count mismatch");
assert_eq!(unique.len(), ids.len(), "all assigned IDs must be distinct");
assert!(
ids.iter().all(|&id| id > 10),
"all assigned IDs must exceed previous max"
);
assert_eq!(
result.new_max_column_id,
ids.iter().max().copied(),
"new_max_column_id must equal the largest assigned ID",
);
}
fn field_with_id_only(name: &str, ty: DataType, id: i64) -> StructField {
let mut f = StructField::nullable(name, ty);
f.metadata.insert(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::Number(id),
);
f
}
#[rstest]
#[case::top_level(field_with_id_only("tainted", DataType::STRING, 99))]
#[case::nested_in_struct(StructField::nullable(
"outer",
DataType::Struct(Box::new(
StructType::try_new(vec![field_with_id_only("inner", DataType::STRING, 99)]).unwrap(),
)),
))]
fn add_column_with_preexisting_cm_metadata_is_preserved_under_cm(#[case] field: StructField) {
let ops = vec![SchemaOperation::AddColumn { field }];
let result = apply_schema_operations(
simple_schema(),
ops,
ColumnMappingMode::Name,
Some(2), )
.unwrap();
assert_eq!(find_max_column_id_in_schema(&result.schema), Some(99));
assert_eq!(result.new_max_column_id, Some(99));
}
#[test]
fn add_column_with_only_physical_name_allocates_id() {
let mut field = StructField::nullable("named", DataType::STRING);
field.metadata.insert(
ColumnMetadataKey::ColumnMappingPhysicalName
.as_ref()
.to_string(),
MetadataValue::String("user-supplied-name".to_string()),
);
let ops = vec![SchemaOperation::AddColumn { field }];
let result =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::Name, Some(7))
.unwrap();
let added = result.schema.field("named").unwrap();
assert_eq!(get_cm_id(added), 8);
assert_eq!(
added.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName),
Some(&MetadataValue::String("user-supplied-name".to_string()))
);
assert_eq!(result.new_max_column_id, Some(8));
}
#[rstest]
#[case::above_protocol_max(i32::MAX as i64 + 1)]
#[case::i64_max(i64::MAX)]
#[case::negative(-1)]
fn alter_with_out_of_range_persisted_max_column_id_is_rejected(#[case] seed: i64) {
let ops = vec![SchemaOperation::AddColumn {
field: StructField::nullable("anything", DataType::INTEGER),
}];
let err =
apply_schema_operations(simple_schema(), ops, ColumnMappingMode::Name, Some(seed))
.unwrap_err()
.to_string();
assert!(
err.contains("Invalid column mapping id")
&& err.contains("Table property `delta.columnMapping.maxColumnId`")
&& err.contains(&seed.to_string()),
"expected canonical out-of-range rejection naming the seed location and value, \
got: {err}",
);
}
#[test]
fn add_column_with_wrong_typed_id_is_rejected() {
let mut field = StructField::nullable("bad", DataType::STRING);
field.metadata.insert(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::String("not-a-number".to_string()),
);
let ops = vec![SchemaOperation::AddColumn { field }];
let err = apply_schema_operations(
simple_schema(),
ops,
ColumnMappingMode::Name,
Some(2), )
.unwrap_err()
.to_string();
assert!(
err.contains("non-numeric") && err.contains("delta.columnMapping.id"),
"error should name the wrong-typed id annotation, got: {err}"
);
}
#[test]
fn stale_max_column_id_is_self_healed_by_schema_walk() {
let mut existing = StructField::nullable("existing", DataType::STRING);
existing.metadata.insert(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::Number(42),
);
let schema = StructType::try_new(vec![existing]).unwrap();
let ops = vec![SchemaOperation::AddColumn {
field: StructField::nullable("new", DataType::STRING),
}];
let result =
apply_schema_operations(schema, ops, ColumnMappingMode::Name, Some(5)).unwrap();
let new_id = get_cm_id(result.schema.field("new").unwrap());
assert_eq!(
new_id, 43,
"new id must follow schema max (42), not stale property (5)"
);
assert_eq!(result.new_max_column_id, Some(43));
}
}