use super::ReaderFeatures;
use crate::actions::Protocol;
use crate::schema::{ColumnName, DataType, MetadataValue, Schema, SchemaTransform, StructField};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};
use std::borrow::Cow;
use serde::{Deserialize, Serialize};
use strum::EnumString;
#[derive(Debug, EnumString, Serialize, Deserialize, Copy, Clone, PartialEq, Eq)]
#[strum(serialize_all = "camelCase")]
#[serde(rename_all = "camelCase")]
pub enum ColumnMappingMode {
None,
Id,
Name,
}
pub(crate) fn column_mapping_mode(
protocol: &Protocol,
table_properties: &TableProperties,
) -> ColumnMappingMode {
match (
table_properties.column_mapping_mode,
protocol.min_reader_version(),
) {
(Some(mode), 2) => mode,
(Some(mode), 3) if protocol.has_reader_feature(&ReaderFeatures::ColumnMapping) => mode,
_ => ColumnMappingMode::None,
}
}
pub fn validate_schema_column_mapping(schema: &Schema, mode: ColumnMappingMode) -> DeltaResult<()> {
if mode == ColumnMappingMode::Id {
return Err(Error::unsupported("Column mapping ID mode not supported"));
}
let mut validator = ValidateColumnMappings {
mode,
path: vec![],
err: None,
};
let _ = validator.transform_struct(schema);
match validator.err {
Some(err) => Err(err),
None => Ok(()),
}
}
struct ValidateColumnMappings<'a> {
mode: ColumnMappingMode,
path: Vec<&'a str>,
err: Option<Error>,
}
impl<'a> ValidateColumnMappings<'a> {
fn transform_inner_type(
&mut self,
data_type: &'a DataType,
name: &'a str,
) -> Option<Cow<'a, DataType>> {
if self.err.is_none() {
self.path.push(name);
let _ = self.transform(data_type);
self.path.pop();
}
None
}
fn check_annotations(&mut self, field: &StructField) {
let column_name = || ColumnName::new(self.path.iter().copied());
let annotation = "delta.columnMapping.physicalName";
match (self.mode, field.metadata.get(annotation)) {
(ColumnMappingMode::None, None) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::String(_))) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"The {annotation} annotation on field '{}' must be a string",
column_name()
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
column_name()
)));
}
(ColumnMappingMode::None, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is not enabled but field '{annotation}' is annotated with {}",
column_name()
)));
}
}
let annotation = "delta.columnMapping.id";
match (self.mode, field.metadata.get(annotation)) {
(ColumnMappingMode::None, None) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(MetadataValue::Number(_))) => {}
(ColumnMappingMode::Name | ColumnMappingMode::Id, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"The {annotation} annotation on field '{}' must be a number",
column_name()
)));
}
(ColumnMappingMode::Name | ColumnMappingMode::Id, None) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is enabled but field '{}' lacks the {annotation} annotation",
column_name()
)));
}
(ColumnMappingMode::None, Some(_)) => {
self.err = Some(Error::invalid_column_mapping_mode(format!(
"Column mapping is not enabled but field '{}' is annotated with {annotation}",
column_name()
)));
}
}
}
}
impl<'a> SchemaTransform<'a> for ValidateColumnMappings<'a> {
fn transform_array_element(&mut self, etype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(etype, "<array element>")
}
fn transform_map_key(&mut self, ktype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(ktype, "<map key>")
}
fn transform_map_value(&mut self, vtype: &'a DataType) -> Option<Cow<'a, DataType>> {
self.transform_inner_type(vtype, "<map value>")
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
if self.err.is_none() {
self.path.push(&field.name);
self.check_annotations(field);
let _ = self.recurse_into_struct_field(field);
self.path.pop();
}
None
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::StructType;
use std::collections::HashMap;
#[test]
fn test_column_mapping_mode() {
let table_properties: HashMap<_, _> =
[("delta.columnMapping.mode".to_string(), "id".to_string())]
.into_iter()
.collect();
let table_properties = TableProperties::from(table_properties.iter());
let empty_table_properties = TableProperties::from([] as [(String, String); 0]);
let protocol = Protocol::try_new(2, 5, None::<Vec<String>>, None::<Vec<String>>).unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::Id
);
assert_eq!(
column_mapping_mode(&protocol, &empty_table_properties),
ColumnMappingMode::None
);
let empty_features = Some::<[String; 0]>([]);
let protocol =
Protocol::try_new(3, 7, empty_features.clone(), empty_features.clone()).unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::None
);
assert_eq!(
column_mapping_mode(&protocol, &empty_table_properties),
ColumnMappingMode::None
);
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::ColumnMapping]),
empty_features.clone(),
)
.unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::Id
);
assert_eq!(
column_mapping_mode(&protocol, &empty_table_properties),
ColumnMappingMode::None
);
let protocol = Protocol::try_new(
3,
7,
Some([ReaderFeatures::DeletionVectors]),
empty_features.clone(),
)
.unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::None
);
assert_eq!(
column_mapping_mode(&protocol, &empty_table_properties),
ColumnMappingMode::None
);
let protocol = Protocol::try_new(
3,
7,
Some([
ReaderFeatures::DeletionVectors,
ReaderFeatures::ColumnMapping,
]),
empty_features,
)
.unwrap();
assert_eq!(
column_mapping_mode(&protocol, &table_properties),
ColumnMappingMode::Id
);
assert_eq!(
column_mapping_mode(&protocol, &empty_table_properties),
ColumnMappingMode::None
);
}
fn create_annotations<'a>(
id: impl Into<Option<&'a str>>,
name: impl Into<Option<&'a str>>,
) -> String {
let mut annotations = vec![];
if let Some(id) = id.into() {
annotations.push(format!("\"delta.columnMapping.id\": {id}"));
}
if let Some(name) = name.into() {
annotations.push(format!("\"delta.columnMapping.physicalName\": {name}"));
}
annotations.join(", ")
}
fn create_schema<'a>(
inner_id: impl Into<Option<&'a str>>,
inner_name: impl Into<Option<&'a str>>,
outer_id: impl Into<Option<&'a str>>,
outer_name: impl Into<Option<&'a str>>,
) -> StructType {
let schema = format!(
r#"
{{
"name": "e",
"type": {{
"type": "array",
"elementType": {{
"type": "struct",
"fields": [
{{
"name": "d",
"type": "integer",
"nullable": false,
"metadata": {{ {} }}
}}
]
}},
"containsNull": true
}},
"nullable": true,
"metadata": {{ {} }}
}}
"#,
create_annotations(inner_id, inner_name),
create_annotations(outer_id, outer_name)
);
println!("{}", schema);
StructType::new([serde_json::from_str(&schema).unwrap()])
}
#[test]
fn test_column_mapping_enabled() {
let schema = create_schema("5", "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name).unwrap();
validate_schema_column_mapping(&schema, ColumnMappingMode::Id).expect_err("not supported");
let schema = create_schema(None, "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("missing field id");
let schema = create_schema("5", None, "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("missing field name");
let schema = create_schema("5", "\"col-a7f4159c\"", None, "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("missing field id");
let schema = create_schema("5", "\"col-a7f4159c\"", "4", None);
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("missing field name");
let schema = create_schema("\"5\"", "\"col-a7f4159c\"", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("invalid field id");
let schema = create_schema("5", "\"col-a7f4159c\"", "\"4\"", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("invalid field id");
let schema = create_schema("5", "555", "4", "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("invalid field name");
let schema = create_schema("5", "\"col-a7f4159c\"", "4", "444");
validate_schema_column_mapping(&schema, ColumnMappingMode::Name)
.expect_err("invalid field name");
}
#[test]
fn test_column_mapping_disabled() {
let schema = create_schema(None, None, None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).unwrap();
let schema = create_schema("5", None, None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field id");
let schema = create_schema(None, "\"col-a7f4159c\"", None, None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field name");
let schema = create_schema(None, None, "4", None);
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field id");
let schema = create_schema(None, None, None, "\"col-5f422f40\"");
validate_schema_column_mapping(&schema, ColumnMappingMode::None).expect_err("field name");
}
}