use std::borrow::Cow;
use std::collections::HashMap;
use serde::{Deserialize, Serialize};
use strum::EnumString;
use uuid::Uuid;
use super::TableFeature;
use crate::actions::Protocol;
use crate::schema::{
ArrayType, ColumnMetadataKey, ColumnName, DataType, MapType, MetadataValue, Schema,
StructField, StructType,
};
use crate::table_properties::{TableProperties, COLUMN_MAPPING_MODE};
use crate::transforms::SchemaTransform;
use crate::{DeltaResult, Error};
#[derive(Debug, EnumString, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub enum ColumnMappingMode {
None,
Id,
Name,
}
pub(crate) fn column_mapping_mode(
protocol: &Protocol,
table_properties: &TableProperties,
) -> ColumnMappingMode {
match (
table_properties.column_mapping_mode,
protocol.min_reader_version(),
) {
(Some(mode), 2) => mode,
(Some(mode), 3) if protocol.has_table_feature(&TableFeature::ColumnMapping) => mode,
_ => ColumnMappingMode::None,
}
}
pub fn validate_schema_column_mapping(schema: &Schema, mode: ColumnMappingMode) -> DeltaResult<()> {
let mut validator = ValidateColumnMappings {
mode,
path: vec![],
seen: HashMap::new(),
err: None,
};
let _ = validator.transform_struct(schema);
match validator.err {
Some(err) => Err(err),
None => Ok(()),
}
}
pub(crate) fn get_field_column_mapping_info<'a>(
field: &'a StructField,
mode: ColumnMappingMode,
path: &[&str],
seen: Option<&mut HashMap<i64, &'a str>>,
) -> DeltaResult<(&'a str, Option<i64>)> {
let field_path = || ColumnName::new(path.iter().copied());
let physical_name_meta = field
.metadata
.get(ColumnMetadataKey::ColumnMappingPhysicalName.as_ref());
let id_meta = field
.metadata
.get(ColumnMetadataKey::ColumnMappingId.as_ref());
if field.is_metadata_column() {
if physical_name_meta.is_some() || id_meta.is_some() {
return Err(Error::internal_error(format!(
"Metadata column '{}' must not have column mapping annotations",
field.name()
)));
}
return Ok((field.name(), None));
}
let annotation = ColumnMetadataKey::ColumnMappingPhysicalName.as_ref();
let physical_name = match (mode, physical_name_meta) {
(ColumnMappingMode::None, None) => field.name(),
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::String(s))) => s,
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
return Err(Error::schema(format!(
"The {annotation} annotation on field '{}' must be a string",
field_path(),
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
return Err(Error::schema(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
field_path(),
)));
}
(ColumnMappingMode::None, Some(_)) => {
return Err(Error::schema(format!(
"Column mapping is not enabled but field '{}' is annotated with {annotation}",
field_path(),
)));
}
};
let annotation = ColumnMetadataKey::ColumnMappingId.as_ref();
let id = match (mode, id_meta) {
(ColumnMappingMode::None, None) => None,
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::Number(n))) => {
Some(*n)
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
return Err(Error::schema(format!(
"The {annotation} annotation on field '{}' must be a number",
field_path(),
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
return Err(Error::schema(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
field_path(),
)));
}
(ColumnMappingMode::None, Some(_)) => {
return Err(Error::schema(format!(
"Column mapping is not enabled but field '{}' is annotated with {annotation}",
field_path(),
)));
}
};
if let (Some(id), Some(seen)) = (id, seen) {
seen.insert(id, field.name()).map_or(Ok(()), |prev| {
Err(Error::schema(format!(
"Duplicate column mapping ID {id} assigned to both '{prev}' and '{}'",
field.name()
)))
})?;
}
Ok((physical_name, id))
}
struct ValidateColumnMappings<'a> {
mode: ColumnMappingMode,
path: Vec<&'a str>,
seen: HashMap<i64, &'a str>, err: Option<Error>,
}
impl<'a> ValidateColumnMappings<'a> {
fn transform_inner<R>(&mut self, field_name: &'a str, validate: impl FnOnce(&mut Self) -> R) {
if self.err.is_none() {
self.path.push(field_name);
let _ = validate(self);
self.path.pop();
}
}
}
impl<'a> SchemaTransform<'a> for ValidateColumnMappings<'a> {
fn transform_array_element(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner("<array element>", |this| this.transform(etype));
Some(Cow::Borrowed(etype))
}
fn transform_map_key(&mut self, ktype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner("<map key>", |this| this.transform(ktype));
Some(Cow::Borrowed(ktype))
}
fn transform_map_value(&mut self, vtype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner("<map value>", |this| this.transform(vtype));
Some(Cow::Borrowed(vtype))
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
self.transform_inner(field.name(), |this| {
get_field_column_mapping_info(field, this.mode, &this.path, Some(&mut this.seen))
.map_err(|e| this.err = Some(e))
.ok()?;
this.recurse_into_struct_field(field)
});
None
}
fn transform_variant(&mut self, _: &'a StructType) -> Option<Cow<'a, StructType>> {
None
}
}
pub(crate) fn get_column_mapping_mode_from_properties(
properties: &HashMap<String, String>,
) -> DeltaResult<ColumnMappingMode> {
match properties.get(COLUMN_MAPPING_MODE) {
Some(mode_str) => mode_str.parse::<ColumnMappingMode>().map_err(|_| {
Error::generic(format!(
"Invalid column mapping mode '{mode_str}'. Must be one of: none, name, id"
))
}),
None => Ok(ColumnMappingMode::None),
}
}
pub(crate) fn assign_column_mapping_metadata(
schema: &StructType,
max_id: &mut i64,
) -> DeltaResult<StructType> {
let new_fields: Vec<StructField> = schema
.fields()
.map(|field| assign_field_column_mapping(field, max_id))
.collect::<DeltaResult<Vec<_>>>()?;
StructType::try_new(new_fields)
}
fn assign_field_column_mapping(field: &StructField, max_id: &mut i64) -> DeltaResult<StructField> {
let has_id = field
.get_config_value(&ColumnMetadataKey::ColumnMappingId)
.is_some();
let has_physical_name = field
.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
.is_some();
if has_id || has_physical_name {
return Err(Error::generic(format!(
"Field '{}' already has column mapping metadata. \
Pre-existing column mapping metadata is not supported for CREATE TABLE.",
field.name
)));
}
let mut new_field = field.clone();
*max_id += 1;
new_field.metadata.insert(
ColumnMetadataKey::ColumnMappingId.as_ref().to_string(),
MetadataValue::Number(*max_id),
);
let physical_name = format!("col-{}", Uuid::new_v4());
new_field.metadata.insert(
ColumnMetadataKey::ColumnMappingPhysicalName
.as_ref()
.to_string(),
MetadataValue::String(physical_name),
);
new_field.data_type = process_nested_data_type(&field.data_type, max_id)?;
Ok(new_field)
}
fn process_nested_data_type(data_type: &DataType, max_id: &mut i64) -> DeltaResult<DataType> {
match data_type {
DataType::Struct(inner) => {
let new_inner = assign_column_mapping_metadata(inner, max_id)?;
Ok(DataType::Struct(Box::new(new_inner)))
}
DataType::Array(array_type) => {
let new_element_type = process_nested_data_type(array_type.element_type(), max_id)?;
Ok(DataType::Array(Box::new(ArrayType::new(
new_element_type,
array_type.contains_null(),
))))
}
DataType::Map(map_type) => {
let new_key_type = process_nested_data_type(map_type.key_type(), max_id)?;
let new_value_type = process_nested_data_type(map_type.value_type(), max_id)?;
Ok(DataType::Map(Box::new(MapType::new(
new_key_type,
new_value_type,
map_type.value_contains_null(),
))))
}
DataType::Primitive(_) | DataType::Variant(_) => Ok(data_type.clone()),
}
}
#[delta_kernel_derive::internal_api]
pub(crate) fn get_any_level_column_physical_name(
schema: &StructType,
col_name: &ColumnName,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<ColumnName> {
let fields = schema.walk_column_fields(col_name)?;
let physical_path: Vec<String> = fields
.iter()
.map(|field| -> DeltaResult<String> {
if column_mapping_mode != ColumnMappingMode::None {
if !field.has_physical_name_annotation() {
return Err(Error::Schema(format!(
"Column mapping is enabled but field '{}' lacks the {} annotation",
field.name,
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref()
)));
}
if !field.has_id_annotation() {
return Err(Error::Schema(format!(
"Column mapping is enabled but field '{}' lacks the {} annotation",
field.name,
ColumnMetadataKey::ColumnMappingId.as_ref()
)));
}
}
Ok(field.physical_name(column_mapping_mode).to_string())
})
.collect::<DeltaResult<Vec<_>>>()?;
Ok(ColumnName::new(physical_path))
}
pub(crate) fn physical_to_logical_column_name(
logical_schema: &StructType,
physical_col: &ColumnName,
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<ColumnName> {
let fields = logical_schema.walk_column_fields_by(physical_col, |s, phys_name| {
s.fields()
.find(|f| f.physical_name(column_mapping_mode) == phys_name)
})?;
Ok(ColumnName::new(fields.iter().map(|f| f.name.clone())))
}
#[cfg(test)]
mod tests {
use std::collections::{HashMap, HashSet};
use super::*;
use crate::expressions::ColumnName;
use crate::schema::{DataType, MetadataValue, StructField, StructType};
use crate::utils::test_utils::{make_test_tc, test_deep_nested_schema_missing_leaf_cm};
#[test]
fn test_column_mapping_mode() {
let annotated = create_schema("5", "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
let plain = create_schema(None, None, None, None);
let cmm_id = HashMap::from([("delta.columnMapping.mode".to_string(), "id".to_string())]);
let no_props = HashMap::new();
let tc = make_test_tc(
annotated.clone(),
Protocol::try_new_legacy(2, 5).unwrap(),
cmm_id.clone(),
)
.unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::Id);
let tc = make_test_tc(
plain.clone(),
Protocol::try_new_legacy(2, 5).unwrap(),
no_props.clone(),
)
.unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let protocol =
Protocol::try_new_modern(TableFeature::EMPTY_LIST, TableFeature::EMPTY_LIST).unwrap();
let tc = make_test_tc(plain.clone(), protocol.clone(), cmm_id.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let tc = make_test_tc(plain.clone(), protocol, no_props.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let protocol =
Protocol::try_new_modern([TableFeature::ColumnMapping], [TableFeature::ColumnMapping])
.unwrap();
let tc = make_test_tc(annotated.clone(), protocol.clone(), cmm_id.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::Id);
let tc = make_test_tc(plain.clone(), protocol, no_props.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors],
[TableFeature::DeletionVectors],
)
.unwrap();
let tc = make_test_tc(plain.clone(), protocol.clone(), cmm_id.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let tc = make_test_tc(plain.clone(), protocol, no_props.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
let protocol = Protocol::try_new_modern(
[TableFeature::DeletionVectors, TableFeature::ColumnMapping],
[TableFeature::DeletionVectors, TableFeature::ColumnMapping],
)
.unwrap();
let tc = make_test_tc(annotated.clone(), protocol.clone(), cmm_id.clone()).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::Id);
let tc = make_test_tc(plain.clone(), protocol, no_props).unwrap();
assert_eq!(tc.column_mapping_mode(), ColumnMappingMode::None);
}
fn create_annotations<'a>(
id: impl Into<Option<&'a str>>,
name: impl Into<Option<&'a str>>,
) -> String {
let mut annotations = vec![];
if let Some(id) = id.into() {
annotations.push(format!("\"delta.columnMapping.id\": {id}"));
}
if let Some(name) = name.into() {
annotations.push(format!("\"delta.columnMapping.physicalName\": {name}"));
}
annotations.join(", ")
}
fn create_schema<'a>(
inner_id: impl Into<Option<&'a str>>,
inner_name: impl Into<Option<&'a str>>,
outer_id: impl Into<Option<&'a str>>,
outer_name: impl Into<Option<&'a str>>,
) -> StructType {
let schema = format!(
r#"
{{
"name": "e",
"type": {{
"type": "array",
"elementType": {{
"type": "struct",
"fields": [
{{
"name": "d",
"type": "integer",
"nullable": false,
"metadata": {{ {} }}
}}
]
}},
"containsNull": true
}},
"nullable": true,
"metadata": {{ {} }}
}}
"#,
create_annotations(inner_id, inner_name),
create_annotations(outer_id, outer_name)
);
println!("{schema}");
StructType::new_unchecked([serde_json::from_str(&schema).unwrap()])
}
#[test]
fn test_column_mapping_enabled() {
[ColumnMappingMode::Name, ColumnMappingMode::Id]
.into_iter()
.for_each(|mode| {
let schema = create_schema("5", "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).unwrap();
let schema = create_schema(None, "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("missing field id");
let schema = create_schema("5", None, "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("missing field name");
let schema = create_schema("5", "\"col-a7f4159c\"", None, "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("missing field id");
let schema = create_schema("5", "\"col-a7f4159c\"", "4", None);
validate_schema_column_mapping(&schema, mode).expect_err("missing field name");
let schema = create_schema("\"5\"", "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("invalid field id");
let schema = create_schema("5", "\"col-a7f4159c\"", "\"4\"", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("invalid field id");
let schema = create_schema("5", "555", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, mode).expect_err("invalid field name");
let schema = create_schema("5", "\"col-a7f4159c\"", "4", "444");
validate_schema_column_mapping(&schema, mode).expect_err("invalid field name");
});
}
#[test]
fn test_column_mapping_disabled() {
let schema = create_schema(None, None, None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).unwrap();
let schema = create_schema("5", None, None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field id");
let schema = create_schema(None, "\"col-a7f4159c\"", None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field name");
let schema = create_schema(None, None, "4", None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field id");
let schema = create_schema(None, None, None, "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field name");
}
#[test]
fn test_annotation_validation_reaches_struct_fields_in_map_value() {
let unannotated =
StructType::new_unchecked([StructField::new("x", DataType::INTEGER, false)]);
let schema = StructType::new_unchecked([make_cm_field(
"b",
1,
MapType::new(
DataType::STRING,
DataType::Struct(Box::new(unannotated)),
false,
),
)]);
validate_schema_column_mapping(&schema, ColumnMappingMode::Id)
.expect_err("missing annotation on struct field inside map value");
}
fn make_cm_field(name: &str, id: i64, data_type: impl Into<DataType>) -> StructField {
StructField::new(name, data_type, false).with_metadata([
(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(id),
),
(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String(format!("col-{name}")),
),
])
}
fn cm_schema_same_level_duplicates() -> StructType {
StructType::new_unchecked([
make_cm_field("a", 1, DataType::INTEGER),
make_cm_field("b", 1, DataType::INTEGER),
])
}
fn cm_schema_nested_duplicates() -> StructType {
let nested = StructType::new_unchecked([
make_cm_field("x", 5, DataType::INTEGER),
make_cm_field("y", 5, DataType::INTEGER),
]);
StructType::new_unchecked([make_cm_field(
"outer",
10,
DataType::Struct(Box::new(nested)),
)])
}
fn cm_schema_cross_level_duplicates() -> StructType {
let nested = StructType::new_unchecked([make_cm_field("inner", 1, DataType::INTEGER)]);
StructType::new_unchecked([
make_cm_field("a", 1, DataType::INTEGER),
make_cm_field("b", 2, DataType::Struct(Box::new(nested))),
])
}
fn cm_schema_array_duplicates() -> StructType {
let element = StructType::new_unchecked([make_cm_field("x", 1, DataType::INTEGER)]);
StructType::new_unchecked([
make_cm_field("a", 1, DataType::INTEGER),
make_cm_field(
"b",
2,
ArrayType::new(DataType::Struct(Box::new(element)), false),
),
])
}
fn cm_schema_map_duplicates() -> StructType {
let value = StructType::new_unchecked([make_cm_field("x", 1, DataType::INTEGER)]);
StructType::new_unchecked([
make_cm_field("a", 1, DataType::INTEGER),
make_cm_field(
"b",
2,
MapType::new(DataType::STRING, DataType::Struct(Box::new(value)), false),
),
])
}
#[rstest::rstest]
#[case::same_level(cm_schema_same_level_duplicates())]
#[case::nested_struct(cm_schema_nested_duplicates())]
#[case::across_nesting_levels(cm_schema_cross_level_duplicates())]
#[case::across_array(cm_schema_array_duplicates())]
#[case::across_map(cm_schema_map_duplicates())]
fn test_duplicate_column_mapping_ids_rejected(#[case] schema: StructType) {
crate::utils::test_utils::assert_result_error_with_message(
validate_schema_column_mapping(&schema, ColumnMappingMode::Id),
"Duplicate column mapping ID",
);
}
#[test]
fn test_duplicate_column_mapping_ids_rejected_in_name_mode() {
crate::utils::test_utils::assert_result_error_with_message(
validate_schema_column_mapping(
&cm_schema_same_level_duplicates(),
ColumnMappingMode::Name,
),
"Duplicate column mapping ID",
);
}
#[rstest::rstest]
#[case::no_property(None, Some(ColumnMappingMode::None))]
#[case::mode_name(Some("name"), Some(ColumnMappingMode::Name))]
#[case::mode_id(Some("id"), Some(ColumnMappingMode::Id))]
#[case::mode_none_explicit(Some("none"), Some(ColumnMappingMode::None))]
#[case::invalid_mode(Some("invalid"), None)]
fn test_get_column_mapping_mode_from_properties(
#[case] mode_str: Option<&str>,
#[case] expected: Option<ColumnMappingMode>,
) {
let mut properties = HashMap::new();
if let Some(mode) = mode_str {
properties.insert(COLUMN_MAPPING_MODE.to_string(), mode.to_string());
}
match expected {
Some(mode) => assert_eq!(
get_column_mapping_mode_from_properties(&properties).unwrap(),
mode
),
None => assert!(get_column_mapping_mode_from_properties(&properties).is_err()),
}
}
#[test]
fn test_assign_column_mapping_metadata_simple() {
let schema = StructType::new_unchecked([
StructField::new("a", DataType::INTEGER, false),
StructField::new("b", DataType::STRING, true),
]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 2);
assert_eq!(result.fields().count(), 2);
for (i, field) in result.fields().enumerate() {
let expected_id = (i + 1) as i64;
assert_eq!(
field.get_config_value(&ColumnMetadataKey::ColumnMappingId),
Some(&MetadataValue::Number(expected_id))
);
assert!(field
.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
.is_some());
if let Some(MetadataValue::String(name)) =
field.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
{
assert!(
name.starts_with("col-"),
"Physical name should start with 'col-'"
);
}
}
}
#[test]
fn test_assign_column_mapping_metadata_rejects_existing_id() {
let schema = StructType::new_unchecked([
StructField::new("a", DataType::INTEGER, false).add_metadata([
(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(100),
),
(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String("existing-physical".to_string()),
),
]),
StructField::new("b", DataType::STRING, true),
]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id);
assert!(result.is_err());
let err_msg = result.unwrap_err().to_string();
assert!(
err_msg.contains("already has column mapping metadata"),
"Expected error about existing column mapping metadata, got: {err_msg}"
);
}
#[test]
fn test_assign_column_mapping_metadata_nested_struct() {
let inner = StructType::new_unchecked([
StructField::new("x", DataType::INTEGER, false),
StructField::new("y", DataType::STRING, true),
]);
let schema = StructType::new_unchecked([
StructField::new("a", DataType::INTEGER, false),
StructField::new("nested", DataType::Struct(Box::new(inner)), true),
]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 4);
let mut seen_ids = HashSet::new();
let mut seen_physical_names = HashSet::new();
let field_a = result.field("a").unwrap();
assert_has_column_mapping_metadata(field_a, &mut seen_ids, &mut seen_physical_names);
let field_nested = result.field("nested").unwrap();
assert_has_column_mapping_metadata(field_nested, &mut seen_ids, &mut seen_physical_names);
let inner = unwrap_struct(&field_nested.data_type, "nested");
let field_x = inner.field("x").unwrap();
assert_has_column_mapping_metadata(field_x, &mut seen_ids, &mut seen_physical_names);
let field_y = inner.field("y").unwrap();
assert_has_column_mapping_metadata(field_y, &mut seen_ids, &mut seen_physical_names);
assert_eq!(seen_ids.len(), 4);
assert_eq!(seen_physical_names.len(), 4);
}
fn assert_has_column_mapping_metadata(
field: &StructField,
seen_ids: &mut HashSet<i64>,
seen_physical_names: &mut HashSet<String>,
) {
let id = field
.get_config_value(&ColumnMetadataKey::ColumnMappingId)
.unwrap_or_else(|| panic!("Field '{}' should have column mapping ID", field.name));
let MetadataValue::Number(id_val) = id else {
panic!(
"Field '{}' column mapping ID should be a number",
field.name
);
};
assert!(
seen_ids.insert(*id_val),
"Duplicate column mapping ID {} on field '{}'",
id_val,
field.name
);
let physical = field
.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName)
.unwrap_or_else(|| panic!("Field '{}' should have physical name", field.name));
let MetadataValue::String(physical_name) = physical else {
panic!("Field '{}' physical name should be a string", field.name);
};
assert!(
seen_physical_names.insert(physical_name.clone()),
"Duplicate physical name '{}' on field '{}'",
physical_name,
field.name
);
}
fn unwrap_struct<'a>(data_type: &'a DataType, context: &str) -> &'a StructType {
match data_type {
DataType::Struct(s) => s,
_ => panic!("Expected Struct for {context}, got {data_type:?}"),
}
}
#[test]
fn test_assign_column_mapping_metadata_map_with_struct_key_and_value() {
let key_struct =
StructType::new_unchecked([StructField::new("k", DataType::INTEGER, false)]);
let value_struct =
StructType::new_unchecked([StructField::new("v", DataType::INTEGER, false)]);
let map_type = MapType::new(
DataType::Struct(Box::new(key_struct)),
DataType::Struct(Box::new(value_struct)),
true,
);
let schema = StructType::new_unchecked([StructField::new(
"my_map",
DataType::Map(Box::new(map_type)),
true,
)]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 3);
let mut seen_ids = HashSet::new();
let mut seen_physical_names = HashSet::new();
let map_field = result.field("my_map").unwrap();
assert_has_column_mapping_metadata(map_field, &mut seen_ids, &mut seen_physical_names);
if let DataType::Map(inner_map) = &map_field.data_type {
let key_struct = unwrap_struct(inner_map.key_type(), "map key");
let field_k = key_struct.field("k").unwrap();
assert_has_column_mapping_metadata(field_k, &mut seen_ids, &mut seen_physical_names);
let value_struct = unwrap_struct(inner_map.value_type(), "map value");
let field_v = value_struct.field("v").unwrap();
assert_has_column_mapping_metadata(field_v, &mut seen_ids, &mut seen_physical_names);
} else {
panic!("Expected map type");
}
assert_eq!(seen_ids.len(), 3);
assert_eq!(seen_physical_names.len(), 3);
}
#[test]
fn test_assign_column_mapping_metadata_array_with_struct_element() {
let elem_struct =
StructType::new_unchecked([StructField::new("elem", DataType::INTEGER, false)]);
let array_type = ArrayType::new(DataType::Struct(Box::new(elem_struct)), true);
let schema = StructType::new_unchecked([StructField::new(
"my_array",
DataType::Array(Box::new(array_type)),
true,
)]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 2);
let mut seen_ids = HashSet::new();
let mut seen_physical_names = HashSet::new();
let array_field = result.field("my_array").unwrap();
assert_has_column_mapping_metadata(array_field, &mut seen_ids, &mut seen_physical_names);
if let DataType::Array(inner_array) = &array_field.data_type {
let elem_struct = unwrap_struct(inner_array.element_type(), "array element");
let field_elem = elem_struct.field("elem").unwrap();
assert_has_column_mapping_metadata(field_elem, &mut seen_ids, &mut seen_physical_names);
} else {
panic!("Expected array type");
}
assert_eq!(seen_ids.len(), 2);
assert_eq!(seen_physical_names.len(), 2);
}
#[test]
fn test_assign_column_mapping_metadata_double_nested_array() {
let deep_struct =
StructType::new_unchecked([StructField::new("deep", DataType::INTEGER, false)]);
let inner_array = ArrayType::new(DataType::Struct(Box::new(deep_struct)), true);
let outer_array = ArrayType::new(DataType::Array(Box::new(inner_array)), true);
let schema = StructType::new_unchecked([StructField::new(
"nested_arrays",
DataType::Array(Box::new(outer_array)),
true,
)]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 2);
let mut seen_ids = HashSet::new();
let mut seen_physical_names = HashSet::new();
let outer_field = result.field("nested_arrays").unwrap();
assert_has_column_mapping_metadata(outer_field, &mut seen_ids, &mut seen_physical_names);
let DataType::Array(outer) = &outer_field.data_type else {
panic!("Expected outer array type");
};
let DataType::Array(inner) = outer.element_type() else {
panic!("Expected inner array type");
};
let deep_struct = unwrap_struct(inner.element_type(), "inner array element");
let field_deep = deep_struct.field("deep").unwrap();
assert_has_column_mapping_metadata(field_deep, &mut seen_ids, &mut seen_physical_names);
assert_eq!(seen_ids.len(), 2);
assert_eq!(seen_physical_names.len(), 2);
}
#[test]
fn test_assign_column_mapping_metadata_array_map_array_struct_nesting() {
let key_struct =
StructType::new_unchecked([StructField::new("k", DataType::INTEGER, false)]);
let value_struct =
StructType::new_unchecked([StructField::new("v", DataType::INTEGER, false)]);
let key_array = ArrayType::new(DataType::Struct(Box::new(key_struct)), true);
let value_array = ArrayType::new(DataType::Struct(Box::new(value_struct)), true);
let inner_map = MapType::new(
DataType::Array(Box::new(key_array)),
DataType::Array(Box::new(value_array)),
true,
);
let outer_array = ArrayType::new(DataType::Map(Box::new(inner_map)), true);
let schema = StructType::new_unchecked([StructField::new(
"cursed",
DataType::Array(Box::new(outer_array)),
true,
)]);
let mut max_id = 0;
let result = assign_column_mapping_metadata(&schema, &mut max_id).unwrap();
assert_eq!(max_id, 3);
let mut seen_ids = HashSet::new();
let mut seen_physical_names = HashSet::new();
let cursed_field = result.field("cursed").unwrap();
assert_has_column_mapping_metadata(cursed_field, &mut seen_ids, &mut seen_physical_names);
let DataType::Array(outer) = &cursed_field.data_type else {
panic!("Expected outer array type");
};
let DataType::Map(inner_map) = outer.element_type() else {
panic!("Expected map inside outer array");
};
let DataType::Array(key_arr) = inner_map.key_type() else {
panic!("Expected array for map key");
};
let key_struct = unwrap_struct(key_arr.element_type(), "key array element");
let field_k = key_struct.field("k").unwrap();
assert_has_column_mapping_metadata(field_k, &mut seen_ids, &mut seen_physical_names);
let DataType::Array(val_arr) = inner_map.value_type() else {
panic!("Expected array for map value");
};
let val_struct = unwrap_struct(val_arr.element_type(), "value array element");
let field_v = val_struct.field("v").unwrap();
assert_has_column_mapping_metadata(field_v, &mut seen_ids, &mut seen_physical_names);
assert_eq!(seen_ids.len(), 3);
assert_eq!(seen_physical_names.len(), 3);
}
#[test]
fn test_get_any_level_column_physical_name_success() {
let inner = StructType::new_unchecked([StructField::new("y", DataType::INTEGER, false)
.add_metadata([
(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String("col-inner-y".to_string()),
),
(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(2),
),
])]);
let schema = StructType::new_unchecked([StructField::new(
"a",
DataType::Struct(Box::new(inner)),
true,
)
.add_metadata([
(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String("col-outer-a".to_string()),
),
(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(1),
),
])]);
let result = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["a"]),
ColumnMappingMode::Name,
)
.unwrap();
assert_eq!(result, ColumnName::new(["col-outer-a"]));
assert_eq!(result.path().len(), 1);
let result = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["a", "y"]),
ColumnMappingMode::Name,
)
.unwrap();
assert_eq!(result, ColumnName::new(["col-outer-a", "col-inner-y"]));
assert_eq!(result.path().len(), 2);
let result = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["a", "y"]),
ColumnMappingMode::None,
)
.unwrap();
assert_eq!(result, ColumnName::new(["a", "y"]));
assert_eq!(result.path().len(), 2);
}
#[test]
fn test_get_any_level_column_physical_name_errors() {
let schema = StructType::new_unchecked([StructField::new("a", DataType::INTEGER, false)]);
let result = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["nonexistent"]),
ColumnMappingMode::None,
);
assert!(result.is_err());
let result = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["a", "b"]),
ColumnMappingMode::None,
);
assert!(result.is_err());
}
#[rstest::rstest]
#[case::missing_id(true, false, "delta.columnMapping.id")]
#[case::missing_physical_name(false, true, "delta.columnMapping.physicalName")]
#[case::missing_both(false, false, "delta.columnMapping.physicalName")]
fn test_get_any_level_column_physical_name_missing_annotations(
#[case] has_physical_name: bool,
#[case] has_id: bool,
#[case] expected_err: &str,
) {
let mut inner_field = StructField::new("y", DataType::INTEGER, false);
if has_physical_name {
inner_field = inner_field.add_metadata([(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String("col-inner-y".to_string()),
)]);
}
if has_id {
inner_field = inner_field.add_metadata([(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(2),
)]);
}
let inner = StructType::new_unchecked([inner_field]);
let schema = StructType::new_unchecked([StructField::new(
"a",
DataType::Struct(Box::new(inner)),
true,
)
.add_metadata([
(
ColumnMetadataKey::ColumnMappingPhysicalName.as_ref(),
MetadataValue::String("col-outer-a".to_string()),
),
(
ColumnMetadataKey::ColumnMappingId.as_ref(),
MetadataValue::Number(1),
),
])]);
let err = get_any_level_column_physical_name(
&schema,
&ColumnName::new(["a", "y"]),
ColumnMappingMode::Name,
)
.unwrap_err()
.to_string();
assert!(
err.contains(expected_err),
"Expected error containing '{expected_err}', got: {err}"
);
}
#[test]
fn validate_schema_column_mapping_error_includes_full_path() {
let schema = test_deep_nested_schema_missing_leaf_cm();
let err = validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.unwrap_err()
.to_string();
assert!(
err.contains("top.`<array element>`.mid_field.`<map value>`.leaf"),
"Expected full nested path in error, got: {err}"
);
}
#[test]
fn physical_to_logical_no_mapping() {
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("name", DataType::STRING, true),
]);
let physical_col = ColumnName::new(["id"]);
let result =
physical_to_logical_column_name(&schema, &physical_col, ColumnMappingMode::None)
.unwrap();
assert_eq!(result, ColumnName::new(["id"]));
}
#[test]
fn physical_to_logical_with_name_mapping() {
let field = StructField::new("user_id", DataType::INTEGER, false).with_metadata([(
"delta.columnMapping.physicalName".to_string(),
MetadataValue::String("col-abc-123".to_string()),
)]);
let schema = StructType::new_unchecked(vec![field]);
let physical_col = ColumnName::new(["col-abc-123"]);
let result =
physical_to_logical_column_name(&schema, &physical_col, ColumnMappingMode::Name)
.unwrap();
assert_eq!(result, ColumnName::new(["user_id"]));
}
#[test]
fn physical_to_logical_not_found() {
let schema =
StructType::new_unchecked(vec![StructField::new("id", DataType::INTEGER, false)]);
let physical_col = ColumnName::new(["nonexistent"]);
let result =
physical_to_logical_column_name(&schema, &physical_col, ColumnMappingMode::None);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in schema"));
}
#[test]
fn physical_to_logical_nested_struct_with_mapping() {
let inner_field = StructField::new("city", DataType::STRING, true).with_metadata([(
"delta.columnMapping.physicalName".to_string(),
MetadataValue::String("col-inner-456".to_string()),
)]);
let inner_struct = StructType::new_unchecked(vec![inner_field]);
let outer_field =
StructField::new("address", DataType::Struct(Box::new(inner_struct)), true)
.with_metadata([(
"delta.columnMapping.physicalName".to_string(),
MetadataValue::String("col-outer-123".to_string()),
)]);
let schema = StructType::new_unchecked(vec![outer_field]);
let physical_col = ColumnName::new(["col-outer-123", "col-inner-456"]);
let result =
physical_to_logical_column_name(&schema, &physical_col, ColumnMappingMode::Name)
.unwrap();
assert_eq!(result, ColumnName::new(["address", "city"]));
}
#[test]
fn physical_to_logical_non_struct_intermediate_errors() {
let schema =
StructType::new_unchecked(vec![StructField::new("id", DataType::INTEGER, false)]);
let physical_col = ColumnName::new(["id", "nested"]);
let result =
physical_to_logical_column_name(&schema, &physical_col, ColumnMappingMode::None);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("is not a struct type"));
}
}