#![allow(dead_code)]
use super::{ColumnMetadataKey, ColumnName, DataType, MetadataValue, StructField, StructType};
use std::collections::{HashMap, HashSet};
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct SchemaDiff {
pub(crate) added_fields: Vec<FieldChange>,
pub(crate) removed_fields: Vec<FieldChange>,
pub(crate) updated_fields: Vec<FieldUpdate>,
breaking_changes: bool,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct FieldChange {
pub(crate) field: StructField,
pub(crate) path: ColumnName,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) struct FieldUpdate {
pub(crate) before: StructField,
pub(crate) after: StructField,
pub(crate) path: ColumnName,
pub(crate) change_types: Vec<FieldChangeType>,
}
#[derive(Debug, Clone, PartialEq)]
pub(crate) enum FieldChangeType {
Renamed,
NullabilityLoosened,
NullabilityTightened,
TypeChanged,
MetadataChanged,
ContainerNullabilityLoosened,
ContainerNullabilityTightened,
}
#[derive(Debug, thiserror::Error)]
pub(crate) enum SchemaDiffError {
#[error("Schema diffing is not yet implemented")]
Unsupported,
#[error("Field at path '{path}' is missing column mapping ID")]
MissingFieldId { path: ColumnName },
#[error("Duplicate field ID {id} found at paths '{path1}' and '{path2}'")]
DuplicateFieldId {
id: i64,
path1: ColumnName,
path2: ColumnName,
},
#[error(
"Field at path '{path}' is missing physical name (required when column mapping is enabled)"
)]
MissingPhysicalName { path: ColumnName },
#[error("Field with ID {field_id} at path '{path}' has inconsistent physical names: '{before}' -> '{after}'. Physical names must not change for the same field ID.")]
PhysicalNameChanged {
field_id: i64,
path: ColumnName,
before: String,
after: String,
},
}
impl SchemaDiff {
pub(crate) fn new(before: &StructType, after: &StructType) -> Result<Self, SchemaDiffError> {
compute_schema_diff(before, after)
}
pub(crate) fn is_empty(&self) -> bool {
self.added_fields.is_empty()
&& self.removed_fields.is_empty()
&& self.updated_fields.is_empty()
}
pub(crate) fn change_count(&self) -> usize {
self.added_fields.len() + self.removed_fields.len() + self.updated_fields.len()
}
pub(crate) fn has_breaking_changes(&self) -> bool {
self.breaking_changes
}
pub(crate) fn all_changes(&self) -> (&[FieldChange], &[FieldChange], &[FieldUpdate]) {
(
&self.added_fields,
&self.removed_fields,
&self.updated_fields,
)
}
pub(crate) fn top_level_changes(
&self,
) -> (Vec<&FieldChange>, Vec<&FieldChange>, Vec<&FieldUpdate>) {
let added = self
.added_fields
.iter()
.filter(|f| f.path.path().len() == 1)
.collect();
let removed = self
.removed_fields
.iter()
.filter(|f| f.path.path().len() == 1)
.collect();
let updated = self
.updated_fields
.iter()
.filter(|f| f.path.path().len() == 1)
.collect();
(added, removed, updated)
}
pub(crate) fn nested_changes(
&self,
) -> (Vec<&FieldChange>, Vec<&FieldChange>, Vec<&FieldUpdate>) {
let added = self
.added_fields
.iter()
.filter(|f| f.path.path().len() > 1)
.collect();
let removed = self
.removed_fields
.iter()
.filter(|f| f.path.path().len() > 1)
.collect();
let updated = self
.updated_fields
.iter()
.filter(|f| f.path.path().len() > 1)
.collect();
(added, removed, updated)
}
}
#[derive(Debug, Clone)]
struct FieldWithPath {
field: StructField,
path: ColumnName,
field_id: i64,
}
fn compute_schema_diff(
before: &StructType,
after: &StructType,
) -> Result<SchemaDiff, SchemaDiffError> {
let empty_path: Vec<String> = vec![];
let mut before_fields = Vec::new();
collect_all_fields_with_paths(
before,
&ColumnName::new(empty_path.clone()),
&mut before_fields,
)?;
let mut after_fields = Vec::new();
collect_all_fields_with_paths(after, &ColumnName::new(empty_path), &mut after_fields)?;
let before_by_id = build_field_map_by_id(&before_fields)?;
let after_by_id = build_field_map_by_id(&after_fields)?;
let before_field_ids: HashSet<i64> = before_by_id.keys().cloned().collect();
let after_field_ids: HashSet<i64> = after_by_id.keys().cloned().collect();
let added_ids: Vec<i64> = after_field_ids
.difference(&before_field_ids)
.cloned()
.collect();
let removed_ids: Vec<i64> = before_field_ids
.difference(&after_field_ids)
.cloned()
.collect();
let common_ids: Vec<i64> = before_field_ids
.intersection(&after_field_ids)
.cloned()
.collect();
let added_fields: Vec<FieldChange> = added_ids
.into_iter()
.map(|id| {
let field_with_path = &after_by_id[&id];
FieldChange {
field: field_with_path.field.clone(),
path: field_with_path.path.clone(),
}
})
.collect();
let added_fields = filter_ancestor_fields(added_fields);
let removed_fields: Vec<FieldChange> = removed_ids
.into_iter()
.map(|id| {
let field_with_path = &before_by_id[&id];
FieldChange {
field: field_with_path.field.clone(),
path: field_with_path.path.clone(),
}
})
.collect();
let removed_fields = filter_ancestor_fields(removed_fields);
let mut updated_fields = Vec::new();
for id in common_ids {
let before_field_with_path = &before_by_id[&id];
let after_field_with_path = &after_by_id[&id];
#[cfg(debug_assertions)]
{
let added_paths: HashSet<ColumnName> =
added_fields.iter().map(|f| f.path.clone()).collect();
let removed_paths: HashSet<ColumnName> =
removed_fields.iter().map(|f| f.path.clone()).collect();
debug_assert!(
!has_added_ancestor(&after_field_with_path.path, &added_paths),
"Field with ID {} at path '{}' is in common_ids but has an added ancestor. \
This violates the invariant that common fields must have existed in both schemas.",
id,
after_field_with_path.path
);
debug_assert!(
!has_added_ancestor(&after_field_with_path.path, &removed_paths),
"Field with ID {} at path '{}' is in common_ids but has a removed ancestor. \
This violates the invariant that common fields must have existed in both schemas.",
id,
after_field_with_path.path
);
}
if let Some(field_update) =
compute_field_update(before_field_with_path, after_field_with_path)?
{
updated_fields.push(field_update);
}
}
let has_breaking_changes =
compute_has_breaking_changes(&added_fields, &removed_fields, &updated_fields);
Ok(SchemaDiff {
added_fields,
removed_fields,
updated_fields,
breaking_changes: has_breaking_changes,
})
}
fn is_breaking_change_type(change_type: &FieldChangeType) -> bool {
matches!(
change_type,
FieldChangeType::TypeChanged
| FieldChangeType::NullabilityTightened
| FieldChangeType::ContainerNullabilityTightened
| FieldChangeType::MetadataChanged
)
}
fn compute_has_breaking_changes(
added_fields: &[FieldChange],
_removed_fields: &[FieldChange],
updated_fields: &[FieldUpdate],
) -> bool {
added_fields.iter().any(|add| !add.field.nullable)
|| updated_fields.iter().any(|update| {
update
.change_types
.iter()
.any(is_breaking_change_type)
})
}
fn filter_ancestor_fields(fields: Vec<FieldChange>) -> Vec<FieldChange> {
let all_paths: HashSet<ColumnName> = fields.iter().map(|f| f.path.clone()).collect();
fields
.into_iter()
.filter(|field_change| !has_added_ancestor(&field_change.path, &all_paths))
.collect()
}
fn has_added_ancestor(path: &ColumnName, added_ancestor_paths: &HashSet<ColumnName>) -> bool {
let mut curr = path.parent();
while let Some(parent) = curr {
if added_ancestor_paths.contains(&parent) {
return true;
}
curr = parent.parent();
}
false
}
fn physical_name(field: &StructField) -> Option<&str> {
match field.get_config_value(&ColumnMetadataKey::ColumnMappingPhysicalName) {
Some(MetadataValue::String(s)) => Some(s.as_str()),
_ => None,
}
}
fn validate_physical_name(
before: &FieldWithPath,
after: &FieldWithPath,
) -> Result<(), SchemaDiffError> {
let before_physical = physical_name(&before.field);
let after_physical = physical_name(&after.field);
match (before_physical, after_physical) {
(Some(b), Some(a)) if b == a => {
Ok(())
}
(Some(b), Some(a)) => {
Err(SchemaDiffError::PhysicalNameChanged {
field_id: before.field_id,
path: after.path.clone(),
before: b.to_string(),
after: a.to_string(),
})
}
(Some(_), None) | (None, Some(_)) => {
Err(SchemaDiffError::MissingPhysicalName {
path: after.path.clone(),
})
}
(None, None) => {
Err(SchemaDiffError::MissingPhysicalName {
path: after.path.clone(),
})
}
}
}
fn collect_fields_from_datatype(
data_type: &DataType,
parent_path: &ColumnName,
out: &mut Vec<FieldWithPath>,
) -> Result<(), SchemaDiffError> {
match data_type {
DataType::Struct(struct_type) => {
collect_all_fields_with_paths(struct_type, parent_path, out)?;
}
DataType::Array(array_type) => {
let element_path = parent_path.join(&ColumnName::new(["element"]));
collect_fields_from_datatype(array_type.element_type(), &element_path, out)?;
}
DataType::Map(map_type) => {
let key_path = parent_path.join(&ColumnName::new(["key"]));
collect_fields_from_datatype(map_type.key_type(), &key_path, out)?;
let value_path = parent_path.join(&ColumnName::new(["value"]));
collect_fields_from_datatype(map_type.value_type(), &value_path, out)?;
}
_ => {
}
}
Ok(())
}
fn collect_all_fields_with_paths(
schema: &StructType,
parent_path: &ColumnName,
out: &mut Vec<FieldWithPath>,
) -> Result<(), SchemaDiffError> {
for field in schema.fields() {
let field_path = parent_path.join(&ColumnName::new([field.name()]));
let field_id = get_field_id_for_path(field, &field_path)?;
out.push(FieldWithPath {
field: field.clone(),
path: field_path.clone(),
field_id,
});
collect_fields_from_datatype(field.data_type(), &field_path, out)?;
}
Ok(())
}
fn build_field_map_by_id(
fields: &[FieldWithPath],
) -> Result<HashMap<i64, FieldWithPath>, SchemaDiffError> {
let mut field_map = HashMap::new();
for field_with_path in fields {
let field_id = field_with_path.field_id;
if let Some(existing) = field_map.insert(field_id, field_with_path.clone()) {
return Err(SchemaDiffError::DuplicateFieldId {
id: field_id,
path1: existing.path,
path2: field_with_path.path.clone(),
});
}
}
Ok(field_map)
}
fn get_field_id_for_path(field: &StructField, path: &ColumnName) -> Result<i64, SchemaDiffError> {
match field.get_config_value(&ColumnMetadataKey::ColumnMappingId) {
Some(MetadataValue::Number(id)) => Ok(*id),
_ => Err(SchemaDiffError::MissingFieldId { path: path.clone() }),
}
}
fn compute_field_update(
before: &FieldWithPath,
after: &FieldWithPath,
) -> Result<Option<FieldUpdate>, SchemaDiffError> {
let mut changes = Vec::new();
if before.field.name() != after.field.name() {
changes.push(FieldChangeType::Renamed);
}
if let Some(change) =
check_field_nullability_change(before.field.nullable, after.field.nullable)
{
changes.push(change);
}
validate_physical_name(before, after)?;
changes.extend(classify_data_type_change(
before.field.data_type(),
after.field.data_type(),
));
if has_metadata_changes(&before.field, &after.field) {
changes.push(FieldChangeType::MetadataChanged);
}
if changes.is_empty() {
return Ok(None);
}
Ok(Some(FieldUpdate {
before: before.field.clone(),
after: after.field.clone(),
path: after.path.clone(), change_types: changes,
}))
}
fn check_field_nullability_change(
before_nullable: bool,
after_nullable: bool,
) -> Option<FieldChangeType> {
match (before_nullable, after_nullable) {
(false, true) => Some(FieldChangeType::NullabilityLoosened),
(true, false) => Some(FieldChangeType::NullabilityTightened),
(true, true) | (false, false) => None,
}
}
fn check_container_nullability_change(
before_nullable: bool,
after_nullable: bool,
) -> Option<FieldChangeType> {
match (before_nullable, after_nullable) {
(false, true) => Some(FieldChangeType::ContainerNullabilityLoosened),
(true, false) => Some(FieldChangeType::ContainerNullabilityTightened),
(true, true) | (false, false) => None,
}
}
fn classify_data_type_change(before: &DataType, after: &DataType) -> Vec<FieldChangeType> {
if before == after {
return Vec::new();
}
match (before, after) {
(DataType::Struct(_), DataType::Struct(_)) => Vec::new(),
(DataType::Array(before_array), DataType::Array(after_array)) => {
let element_type_changes =
match (before_array.element_type(), after_array.element_type()) {
(DataType::Struct(_), DataType::Struct(_)) => Vec::new(),
(e1, e2) => classify_data_type_change(e1, e2),
};
let nullability_change = check_container_nullability_change(
before_array.contains_null(),
after_array.contains_null(),
);
let mut changes = element_type_changes;
if let Some(null_change) = nullability_change {
changes.push(null_change);
}
changes
}
(DataType::Map(before_map), DataType::Map(after_map)) => {
let key_type_changes = match (before_map.key_type(), after_map.key_type()) {
(DataType::Struct(_), DataType::Struct(_)) => Vec::new(),
(k1, k2) => classify_data_type_change(k1, k2),
};
let value_type_changes = match (before_map.value_type(), after_map.value_type()) {
(DataType::Struct(_), DataType::Struct(_)) => Vec::new(),
(v1, v2) => classify_data_type_change(v1, v2),
};
let nullability_change = check_container_nullability_change(
before_map.value_contains_null(),
after_map.value_contains_null(),
);
let mut changes = key_type_changes;
changes.extend(value_type_changes);
if let Some(null_change) = nullability_change {
changes.push(null_change);
}
changes
}
_ => vec![FieldChangeType::TypeChanged],
}
}
fn has_metadata_changes(before: &StructField, after: &StructField) -> bool {
let before_filtered: HashMap<String, MetadataValue> = before
.metadata
.iter()
.filter(|(key, _)| {
!key.starts_with("delta.columnMapping") && !key.starts_with("parquet.field")
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
let after_filtered: HashMap<String, MetadataValue> = after
.metadata
.iter()
.filter(|(key, _)| {
!key.starts_with("delta.columnMapping") && !key.starts_with("parquet.field")
})
.map(|(k, v)| (k.clone(), v.clone()))
.collect();
before_filtered != after_filtered
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{ArrayType, DataType, MapType, StructField, StructType};
use std::collections::HashSet;
fn create_field_with_id(
name: &str,
data_type: DataType,
nullable: bool,
id: i64,
) -> StructField {
StructField::new(name, data_type, nullable).add_metadata([
("delta.columnMapping.id", MetadataValue::Number(id)),
(
"delta.columnMapping.physicalName",
MetadataValue::String(format!("col_{id}")),
),
])
}
fn updated_paths(diff: &SchemaDiff) -> HashSet<ColumnName> {
diff.updated_fields.iter().map(|u| u.path.clone()).collect()
}
#[test]
fn test_identical_schemas() {
let schema = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id("name", DataType::STRING, false, 2),
]);
let diff = SchemaDiff::new(&schema, &schema).unwrap();
assert!(diff.is_empty());
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_change_count() {
let before = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id("name", DataType::STRING, false, 2),
]);
let after = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, true, 1), create_field_with_id("email", DataType::STRING, false, 3), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.change_count(), 3);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 1);
}
#[test]
fn test_top_level_added_field() {
let before =
StructType::new_unchecked([create_field_with_id("id", DataType::LONG, false, 1)]);
let after = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id("name", DataType::STRING, false, 2),
]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
assert_eq!(diff.added_fields[0].path, ColumnName::new(["name"]));
assert_eq!(diff.added_fields[0].field.name(), "name");
assert!(diff.has_breaking_changes()); }
#[test]
fn test_added_required_field_is_breaking() {
let before =
StructType::new_unchecked([create_field_with_id("id", DataType::LONG, false, 1)]);
let after = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id("required_field", DataType::STRING, false, 2), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
assert!(diff.has_breaking_changes());
}
#[test]
fn test_added_nullable_field_is_not_breaking() {
let before =
StructType::new_unchecked([create_field_with_id("id", DataType::LONG, false, 1)]);
let after = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id("optional_field", DataType::STRING, true, 2), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_physical_name_validation() {
let before = StructType::new_unchecked([StructField::new("name", DataType::STRING, false)
.add_metadata([
("delta.columnMapping.id", MetadataValue::Number(1)),
(
"delta.columnMapping.physicalName",
MetadataValue::String("col_1".to_string()),
),
])]);
let after =
StructType::new_unchecked([StructField::new("full_name", DataType::STRING, false)
.add_metadata([
("delta.columnMapping.id", MetadataValue::Number(1)),
(
"delta.columnMapping.physicalName",
MetadataValue::String("col_1".to_string()),
),
])]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
assert!(!diff.has_breaking_changes());
let before = StructType::new_unchecked([StructField::new("name", DataType::STRING, false)
.add_metadata([
("delta.columnMapping.id", MetadataValue::Number(1)),
(
"delta.columnMapping.physicalName",
MetadataValue::String("col_001".to_string()),
),
])]);
let after = StructType::new_unchecked([StructField::new("name", DataType::STRING, false)
.add_metadata([
("delta.columnMapping.id", MetadataValue::Number(1)),
(
"delta.columnMapping.physicalName",
MetadataValue::String("col_002".to_string()),
),
])]);
let result = SchemaDiff::new(&before, &after);
assert!(matches!(
result,
Err(SchemaDiffError::PhysicalNameChanged { .. })
));
let before = StructType::new_unchecked([StructField::new("name", DataType::STRING, false)
.add_metadata([
("delta.columnMapping.id", MetadataValue::Number(1)),
(
"delta.columnMapping.physicalName",
MetadataValue::String("col_1".to_string()),
),
])]);
let after = StructType::new_unchecked([StructField::new("name", DataType::STRING, false)
.add_metadata([("delta.columnMapping.id", MetadataValue::Number(1))])]);
let result = SchemaDiff::new(&before, &after);
assert!(matches!(
result,
Err(SchemaDiffError::MissingPhysicalName { .. })
));
}
#[test]
fn test_multiple_change_types() {
let before = StructType::new_unchecked([create_field_with_id(
"user_name",
DataType::STRING,
false,
1,
)
.add_metadata([("custom", MetadataValue::String("old_value".to_string()))])]);
let after = StructType::new_unchecked([
create_field_with_id("userName", DataType::STRING, true, 1) .add_metadata([("custom", MetadataValue::String("new_value".to_string()))]), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
let update = &diff.updated_fields[0];
assert_eq!(update.change_types.len(), 3);
assert!(update.change_types.contains(&FieldChangeType::Renamed));
assert!(update
.change_types
.contains(&FieldChangeType::NullabilityLoosened));
assert!(update
.change_types
.contains(&FieldChangeType::MetadataChanged));
assert!(diff.has_breaking_changes());
}
#[test]
fn test_multiple_with_breaking_change() {
let before = StructType::new_unchecked([create_field_with_id(
"user_name",
DataType::STRING,
true,
1,
)
.add_metadata([("custom", MetadataValue::String("old_value".to_string()))])]);
let after = StructType::new_unchecked([
create_field_with_id("userName", DataType::STRING, false, 1) .add_metadata([("custom", MetadataValue::String("new_value".to_string()))]), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
let update = &diff.updated_fields[0];
assert_eq!(update.change_types.len(), 3);
assert!(update.change_types.contains(&FieldChangeType::Renamed));
assert!(update
.change_types
.contains(&FieldChangeType::NullabilityTightened));
assert!(update
.change_types
.contains(&FieldChangeType::MetadataChanged));
assert!(diff.has_breaking_changes());
}
#[test]
fn test_duplicate_field_id_error() {
let schema_with_duplicates = StructType::new_unchecked([
create_field_with_id("field1", DataType::STRING, false, 1),
create_field_with_id("field2", DataType::STRING, false, 1), ]);
let result = SchemaDiff::new(&schema_with_duplicates, &schema_with_duplicates);
assert!(result.is_err());
match result {
Err(SchemaDiffError::DuplicateFieldId { id, path1, path2 }) => {
assert_eq!(id, 1);
assert_eq!(path1, ColumnName::new(["field1"]));
assert_eq!(path2, ColumnName::new(["field2"]));
}
_ => panic!("Expected DuplicateFieldId error"),
}
}
#[test]
fn test_top_level_and_nested_change_filters() {
let top_level_field = create_field_with_id("name", DataType::STRING, false, 1);
let nested_field = create_field_with_id("street", DataType::STRING, false, 2);
let deeply_nested_field = create_field_with_id("city", DataType::STRING, false, 3);
let diff = SchemaDiff {
added_fields: vec![
FieldChange {
field: top_level_field.clone(),
path: ColumnName::new(["name"]), },
FieldChange {
field: nested_field.clone(),
path: ColumnName::new(["address", "street"]), },
],
removed_fields: vec![FieldChange {
field: deeply_nested_field.clone(),
path: ColumnName::new(["user", "address", "city"]), }],
updated_fields: vec![],
breaking_changes: false,
};
let (top_added, top_removed, top_updated) = diff.top_level_changes();
assert_eq!(top_added.len(), 1);
assert_eq!(top_added[0].path, ColumnName::new(["name"]));
assert_eq!(top_removed.len(), 0);
assert_eq!(top_updated.len(), 0);
let (nested_added, nested_removed, nested_updated) = diff.nested_changes();
assert_eq!(nested_added.len(), 1);
assert_eq!(nested_added[0].path, ColumnName::new(["address", "street"]));
assert_eq!(nested_removed.len(), 1);
assert_eq!(
nested_removed[0].path,
ColumnName::new(["user", "address", "city"])
);
assert_eq!(nested_updated.len(), 0);
}
#[test]
fn test_ancestor_filtering() {
let without_user =
StructType::new_unchecked([create_field_with_id("id", DataType::LONG, false, 1)]);
let with_user = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("name", DataType::STRING, false, 3),
create_field_with_id("email", DataType::STRING, true, 4),
create_field_with_id(
"address",
DataType::try_struct_type([
create_field_with_id("street", DataType::STRING, false, 6),
create_field_with_id("city", DataType::STRING, false, 7),
])
.unwrap(),
true,
5,
),
])
.unwrap(),
false,
2,
),
]);
let diff = SchemaDiff::new(&without_user, &with_user).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.added_fields[0].path, ColumnName::new(["user"]));
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
assert!(diff.has_breaking_changes());
let diff = SchemaDiff::new(&with_user, &without_user).unwrap();
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.removed_fields[0].path, ColumnName::new(["user"]));
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_array_of_struct_addition_reports_only_ancestor_field() {
let before = StructType::new_unchecked([]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
false,
))),
true,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.added_fields[0].path, ColumnName::new(["items"]));
let (nested_added, nested_removed, nested_updated) = diff.nested_changes();
assert_eq!(nested_added.len(), 0);
assert_eq!(nested_removed.len(), 0);
assert_eq!(nested_updated.len(), 0);
}
#[test]
fn test_container_with_nested_changes_not_reported_as_type_change() {
let before = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("name", DataType::STRING, false, 2),
create_field_with_id("email", DataType::STRING, true, 3),
])
.unwrap(),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("full_name", DataType::STRING, false, 2), create_field_with_id("email", DataType::STRING, true, 3),
create_field_with_id("age", DataType::INTEGER, true, 4), ])
.unwrap(),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.added_fields[0].path, ColumnName::new(["user", "age"]));
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["user", "full_name"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
let top_level_updates: Vec<_> = diff
.updated_fields
.iter()
.filter(|u| u.path.path().len() == 1)
.collect();
assert_eq!(top_level_updates.len(), 0);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_actual_struct_type_change_still_reported() {
let before =
StructType::new_unchecked([create_field_with_id("data", DataType::STRING, false, 1)]);
let after = StructType::new_unchecked([
create_field_with_id(
"data",
DataType::try_struct_type([create_field_with_id(
"nested",
DataType::STRING,
false,
2,
)])
.unwrap(),
false,
1,
), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["data"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::TypeChanged]
);
assert!(diff.has_breaking_changes());
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["data", "nested"])
);
assert!(diff.has_breaking_changes());
}
#[test]
fn test_array_with_struct_element_changes() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
true,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("title", DataType::STRING, false, 2), ])
.unwrap(),
true,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["items", "element", "title"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
let array_updates: Vec<_> = diff
.updated_fields
.iter()
.filter(|u| u.path == ColumnName::new(["items"]))
.collect();
assert_eq!(array_updates.len(), 0);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_ancestor_filtering_with_mixed_changes() {
let before = StructType::new_unchecked([
create_field_with_id("existing", DataType::STRING, false, 1),
create_field_with_id(
"existing_struct",
DataType::try_struct_type([create_field_with_id(
"old_name",
DataType::STRING,
false,
3,
)])
.unwrap(),
false,
2,
),
]);
let after = StructType::new_unchecked([
create_field_with_id("existing", DataType::STRING, true, 1), create_field_with_id(
"existing_struct",
DataType::try_struct_type([
create_field_with_id("new_name", DataType::STRING, false, 3), ])
.unwrap(),
true, 2,
),
create_field_with_id(
"new_struct", DataType::try_struct_type([create_field_with_id(
"nested_field",
DataType::INTEGER,
false,
5,
)])
.unwrap(),
false,
4,
),
]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.added_fields[0].path, ColumnName::new(["new_struct"]));
assert_eq!(diff.updated_fields.len(), 3);
let paths = updated_paths(&diff);
assert!(paths.contains(&ColumnName::new(["existing"])));
assert!(paths.contains(&ColumnName::new(["existing_struct"])));
assert!(paths.contains(&ColumnName::new(["existing_struct", "new_name"])));
assert!(diff.has_breaking_changes());
}
#[test]
fn test_nested_field_rename() {
let before = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([create_field_with_id("name", DataType::STRING, false, 2)])
.unwrap(),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("full_name", DataType::STRING, false, 2), ])
.unwrap(),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
let update = &diff.updated_fields[0];
assert_eq!(update.path, ColumnName::new(["user", "full_name"]));
assert_eq!(update.change_types, vec![FieldChangeType::Renamed]);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_nested_field_added() {
let before = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([create_field_with_id("name", DataType::STRING, false, 2)])
.unwrap(),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("name", DataType::STRING, false, 2),
create_field_with_id("age", DataType::INTEGER, true, 3), ])
.unwrap(),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 0);
let added = &diff.added_fields[0];
assert_eq!(added.path, ColumnName::new(["user", "age"]));
assert_eq!(added.field.name(), "age");
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_deeply_nested_changes() {
let before = StructType::new_unchecked([create_field_with_id(
"level1",
DataType::try_struct_type([create_field_with_id(
"level2",
DataType::try_struct_type([create_field_with_id(
"deep_field",
DataType::STRING,
false,
3,
)])
.unwrap(),
false,
2,
)])
.unwrap(),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"level1",
DataType::try_struct_type([create_field_with_id(
"level2",
DataType::try_struct_type([
create_field_with_id("very_deep_field", DataType::STRING, false, 3), ])
.unwrap(),
false,
2,
)])
.unwrap(),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
let update = &diff.updated_fields[0];
assert_eq!(
update.path,
ColumnName::new(["level1", "level2", "very_deep_field"])
);
assert_eq!(update.change_types, vec![FieldChangeType::Renamed]);
}
#[test]
fn test_top_level_vs_nested_filtering() {
let before = StructType::new_unchecked([
create_field_with_id("top_field", DataType::STRING, false, 1),
create_field_with_id(
"user",
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
3,
)])
.unwrap(),
false,
2,
),
]);
let after = StructType::new_unchecked([
create_field_with_id("renamed_top", DataType::STRING, false, 1), create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("full_name", DataType::STRING, false, 3), create_field_with_id("age", DataType::INTEGER, true, 4), ])
.unwrap(),
false,
2,
),
]);
let diff = SchemaDiff::new(&before, &after).unwrap();
let (top_added, _, top_updated) = diff.top_level_changes();
let (nested_added, _, nested_updated) = diff.nested_changes();
assert_eq!(top_added.len(), 0);
assert_eq!(top_updated.len(), 1);
assert_eq!(top_updated[0].path, ColumnName::new(["renamed_top"]));
assert_eq!(nested_added.len(), 1);
assert_eq!(nested_added[0].path, ColumnName::new(["user", "age"]));
assert_eq!(nested_updated.len(), 1);
assert_eq!(
nested_updated[0].path,
ColumnName::new(["user", "full_name"])
);
}
#[test]
fn test_mixed_changes() {
let before = StructType::new_unchecked([
create_field_with_id("id", DataType::LONG, false, 1),
create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("name", DataType::STRING, false, 3),
create_field_with_id("email", DataType::STRING, true, 4),
])
.unwrap(),
false,
2,
),
]);
let after = StructType::new_unchecked([
create_field_with_id("identifier", DataType::LONG, false, 1), create_field_with_id(
"user",
DataType::try_struct_type([
create_field_with_id("full_name", DataType::STRING, false, 3), create_field_with_id("age", DataType::INTEGER, true, 5), ])
.unwrap(),
false,
2,
),
create_field_with_id("created_at", DataType::TIMESTAMP, false, 6), ]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 2);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 2);
let added_paths: HashSet<ColumnName> =
diff.added_fields.iter().map(|f| f.path.clone()).collect();
assert!(added_paths.contains(&ColumnName::new(["user", "age"])));
assert!(added_paths.contains(&ColumnName::new(["created_at"])));
let removed_paths: HashSet<ColumnName> =
diff.removed_fields.iter().map(|f| f.path.clone()).collect();
assert!(removed_paths.contains(&ColumnName::new(["user", "email"])));
let paths = updated_paths(&diff);
assert!(paths.contains(&ColumnName::new(["identifier"])));
assert!(paths.contains(&ColumnName::new(["user", "full_name"])));
}
#[test]
fn test_array_element_struct_field_changes() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("name", DataType::STRING, false, 2),
create_field_with_id("removed_field", DataType::INTEGER, true, 3),
])
.unwrap(),
true,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("title", DataType::STRING, false, 2), create_field_with_id("added_field", DataType::STRING, true, 4), ])
.unwrap(),
true,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["items", "element", "added_field"])
);
assert_eq!(
diff.removed_fields[0].path,
ColumnName::new(["items", "element", "removed_field"])
);
let update = &diff.updated_fields[0];
assert_eq!(update.path, ColumnName::new(["items", "element", "title"]));
assert_eq!(update.change_types, vec![FieldChangeType::Renamed]);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_doubly_nested_array_type_change() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::DOUBLE, false))),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["matrix"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::TypeChanged]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_array_primitive_element_type_change() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::TypeChanged]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_nested_array_nullability_loosened() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
false, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
true, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["matrix"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityLoosened]
);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_nested_array_nullability_tightened() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
true, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
false, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["matrix"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_nested_array_inner_nullability_loosened() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))), false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))), false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["matrix"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityLoosened]
);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_nested_array_inner_nullability_tightened() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))), false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))), false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["matrix"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_array_nullability_loosened() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))), false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, true))), false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityLoosened]
);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_array_nullability_tightened() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, true))), false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))), false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_map_value_struct_field_changes() {
let before = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::try_struct_type([
create_field_with_id("value", DataType::INTEGER, false, 2),
create_field_with_id("removed_field", DataType::STRING, true, 3),
])
.unwrap(),
true,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::try_struct_type([
create_field_with_id("count", DataType::INTEGER, false, 2), create_field_with_id("added_field", DataType::STRING, true, 4), ])
.unwrap(),
true,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["lookup", "value", "added_field"])
);
assert_eq!(
diff.removed_fields[0].path,
ColumnName::new(["lookup", "value", "removed_field"])
);
let update = &diff.updated_fields[0];
assert_eq!(update.path, ColumnName::new(["lookup", "value", "count"]));
assert_eq!(update.change_types, vec![FieldChangeType::Renamed]);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_array_struct_element_nullability_loosened() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
false, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
true, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityLoosened]
);
assert!(!diff.has_breaking_changes()); }
#[test]
fn test_map_nullability_loosened() {
let before = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::INTEGER,
false, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::INTEGER,
true, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["lookup"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityLoosened]
);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_array_struct_element_nullability_tightened() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
true, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"name",
DataType::STRING,
false,
2,
)])
.unwrap(),
false, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes()); }
#[test]
fn test_map_nullability_tightened() {
let before = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::INTEGER,
true, ))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::INTEGER,
false, ))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["lookup"]));
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes());
}
#[test]
fn test_array_combined_nullability_and_type_change() {
let before = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))), false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, true))), false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(diff.updated_fields[0].path, ColumnName::new(["items"]));
let change_types = &diff.updated_fields[0].change_types;
assert!(change_types.contains(&FieldChangeType::TypeChanged));
assert!(change_types.contains(&FieldChangeType::ContainerNullabilityLoosened));
assert!(diff.has_breaking_changes()); }
#[test]
fn test_map_with_struct_key() {
let before = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"id",
DataType::INTEGER,
false,
2,
)])
.unwrap(),
DataType::STRING,
true,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"identifier", DataType::INTEGER,
false,
2,
)])
.unwrap(),
DataType::STRING,
true,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["lookup", "key", "identifier"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
}
#[test]
fn test_nested_struct_in_array_in_struct_field_changes() {
let before = StructType::new_unchecked([create_field_with_id(
"data",
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"inner",
DataType::try_struct_type([
create_field_with_id("a", DataType::INTEGER, false, 3),
create_field_with_id("removed", DataType::STRING, true, 4),
])
.unwrap(),
false,
2,
)])
.unwrap(),
false,
))),
false,
5,
)])
.unwrap(),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"data",
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"inner",
DataType::try_struct_type([
create_field_with_id("renamed_a", DataType::INTEGER, false, 3), create_field_with_id("added", DataType::LONG, true, 6), ])
.unwrap(),
false,
2,
)])
.unwrap(),
false,
))),
false,
5,
)])
.unwrap(),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["data", "items", "element", "inner", "added"])
);
assert_eq!(
diff.removed_fields[0].path,
ColumnName::new(["data", "items", "element", "inner", "removed"])
);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["data", "items", "element", "inner", "renamed_a"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_nested_map_within_struct_within_map() {
let before = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::try_struct_type([create_field_with_id(
"nested",
DataType::Map(Box::new(MapType::new(
DataType::INTEGER,
DataType::try_struct_type([create_field_with_id(
"x",
DataType::INTEGER,
false,
3,
)])
.unwrap(),
false,
))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"lookup",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::try_struct_type([create_field_with_id(
"nested",
DataType::Map(Box::new(MapType::new(
DataType::INTEGER,
DataType::try_struct_type([
create_field_with_id("renamed_x", DataType::INTEGER, false, 3), ])
.unwrap(),
false,
))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["lookup", "value", "nested", "value", "renamed_x"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_doubly_nested_array_with_struct_elements() {
let before = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"x",
DataType::INTEGER,
false,
2,
)])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"matrix",
DataType::Array(Box::new(ArrayType::new(
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("renamed_x", DataType::INTEGER, false, 2), create_field_with_id("y", DataType::INTEGER, true, 3), ])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["matrix", "element", "element", "y"])
);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["matrix", "element", "element", "renamed_x"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::Renamed]
);
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_map_with_array_of_struct_key_and_value() {
let before = StructType::new_unchecked([create_field_with_id(
"complex_map",
DataType::Map(Box::new(MapType::new(
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"key_field",
DataType::INTEGER,
false,
2,
)])
.unwrap(),
false,
))),
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"value_field",
DataType::STRING,
false,
3,
)])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"complex_map",
DataType::Map(Box::new(MapType::new(
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("renamed_key_field", DataType::INTEGER, false, 2), ])
.unwrap(),
false,
))),
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("renamed_value_field", DataType::STRING, false, 3), ])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 2);
let paths = updated_paths(&diff);
assert!(paths.contains(&ColumnName::new([
"complex_map",
"key",
"element",
"renamed_key_field"
])));
assert!(paths.contains(&ColumnName::new([
"complex_map",
"value",
"element",
"renamed_value_field"
])));
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_map_struct_key_nested_map_value() {
let before = StructType::new_unchecked([create_field_with_id(
"nested_maps",
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"outer_key",
DataType::INTEGER,
false,
2,
)])
.unwrap(),
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"inner_key",
DataType::INTEGER,
false,
3,
)])
.unwrap(),
DataType::try_struct_type([
create_field_with_id("data", DataType::STRING, false, 4),
create_field_with_id("removed", DataType::INTEGER, true, 5),
])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"nested_maps",
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"renamed_outer_key", DataType::INTEGER,
false,
2,
)])
.unwrap(),
DataType::Map(Box::new(MapType::new(
DataType::try_struct_type([create_field_with_id(
"renamed_inner_key", DataType::INTEGER,
false,
3,
)])
.unwrap(),
DataType::try_struct_type([
create_field_with_id("renamed_data", DataType::STRING, false, 4), create_field_with_id("added", DataType::LONG, true, 6), ])
.unwrap(),
false,
))),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 1);
assert_eq!(diff.removed_fields.len(), 1);
assert_eq!(diff.updated_fields.len(), 3);
assert_eq!(
diff.added_fields[0].path,
ColumnName::new(["nested_maps", "value", "value", "added"])
);
assert_eq!(
diff.removed_fields[0].path,
ColumnName::new(["nested_maps", "value", "value", "removed"])
);
let paths = updated_paths(&diff);
assert!(paths.contains(&ColumnName::new([
"nested_maps",
"key",
"renamed_outer_key"
])));
assert!(paths.contains(&ColumnName::new([
"nested_maps",
"value",
"key",
"renamed_inner_key"
])));
assert!(paths.contains(&ColumnName::new([
"nested_maps",
"value",
"value",
"renamed_data"
])));
assert!(!diff.has_breaking_changes());
}
#[test]
fn test_deeply_nested_nullability_tightening_is_breaking() {
let before = StructType::new_unchecked([create_field_with_id(
"wrapper",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("value", DataType::INTEGER, true, 3), ])
.unwrap(),
false,
))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"wrapper",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([
create_field_with_id("value", DataType::INTEGER, false, 3), ])
.unwrap(),
false,
))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["wrapper", "element", "items", "element", "value"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::NullabilityTightened]
);
assert!(diff.has_breaking_changes());
}
#[test]
fn test_deeply_nested_container_nullability_tightening_is_breaking() {
let before = StructType::new_unchecked([create_field_with_id(
"wrapper",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"value",
DataType::INTEGER,
false,
3,
)])
.unwrap(),
true, ))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let after = StructType::new_unchecked([create_field_with_id(
"wrapper",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"items",
DataType::Array(Box::new(ArrayType::new(
DataType::try_struct_type([create_field_with_id(
"value",
DataType::INTEGER,
false,
3,
)])
.unwrap(),
false, ))),
false,
2,
)])
.unwrap(),
false,
))),
false,
1,
)]);
let diff = SchemaDiff::new(&before, &after).unwrap();
assert_eq!(diff.added_fields.len(), 0);
assert_eq!(diff.removed_fields.len(), 0);
assert_eq!(diff.updated_fields.len(), 1);
assert_eq!(
diff.updated_fields[0].path,
ColumnName::new(["wrapper", "element", "items"])
);
assert_eq!(
diff.updated_fields[0].change_types,
vec![FieldChangeType::ContainerNullabilityTightened]
);
assert!(diff.has_breaking_changes());
}
}