use serde::{Deserialize, Serialize};
use crate::actions::DomainMetadata;
use crate::expressions::ColumnName;
use crate::scan::data_skipping::stats_schema::is_skipping_eligible_datatype;
use crate::schema::{DataType, StructType};
use crate::{DeltaResult, Error};
#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
struct ClusteringDomainMetadata {
clustering_columns: Vec<Vec<String>>,
}
pub(crate) const CLUSTERING_DOMAIN_NAME: &str = "delta.clustering";
pub(crate) fn validate_clustering_columns(
schema: &StructType,
columns: &[ColumnName],
) -> DeltaResult<()> {
use std::collections::HashSet;
if columns.is_empty() {
return Err(Error::generic("Clustering requires at least one column"));
}
let mut seen = HashSet::new();
for col in columns {
if !seen.insert(col) {
return Err(Error::generic(format!(
"Duplicate clustering column: '{col}'"
)));
}
let fields = schema.walk_column_fields(col)?;
let leaf_type = fields
.last()
.ok_or_else(|| Error::generic(format!("Could not resolve column '{col}' in schema")))?
.data_type();
match leaf_type {
DataType::Primitive(ptype) if is_skipping_eligible_datatype(ptype) => {}
dt => {
return Err(Error::generic(format!(
"Clustering column '{col}' has unsupported type '{dt}'. \
Supported types: Byte, Short, Integer, Long, Float, Double, \
Decimal, Date, Timestamp, TimestampNtz, String"
)));
}
}
}
Ok(())
}
pub(crate) fn create_clustering_domain_metadata(columns: &[ColumnName]) -> DomainMetadata {
let metadata = ClusteringDomainMetadata {
clustering_columns: columns
.iter()
.map(|c| c.path().iter().map(|s| s.to_string()).collect())
.collect(),
};
#[allow(clippy::unwrap_used)]
let config = serde_json::to_string(&metadata).unwrap();
DomainMetadata::new(CLUSTERING_DOMAIN_NAME.to_string(), config)
}
pub(crate) fn parse_clustering_columns(json_str: &str) -> DeltaResult<Vec<ColumnName>> {
let metadata: ClusteringDomainMetadata = serde_json::from_str(json_str)?;
Ok(metadata
.clustering_columns
.into_iter()
.map(ColumnName::new)
.collect())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::schema::{DataType, StructField};
#[rstest::rstest]
#[case::simple(
r#"{"clusteringColumns": [["col1"], ["col2"]]}"#,
vec![vec!["col1"], vec!["col2"]]
)]
#[case::empty(
r#"{"clusteringColumns": []}"#,
vec![]
)]
#[case::nested(
r#"{"clusteringColumns": [["id"], ["user", "address", "city"], ["a", "b", "c", "d", "e"]]}"#,
vec![vec!["id"], vec!["user", "address", "city"], vec!["a", "b", "c", "d", "e"]]
)]
#[case::special_characters(
r#"{"clusteringColumns": [["col.with.dot"], ["`backticks`", "nested"]]}"#,
vec![vec!["col.with.dot"], vec!["`backticks`", "nested"]]
)]
#[case::tolerates_unknown_fields(
r#"{"clusteringColumns": [["col1"]], "foo": "bar", "futureField": 123}"#,
vec![vec!["col1"]]
)]
fn test_parse_clustering_columns(#[case] json: &str, #[case] expected: Vec<Vec<&str>>) {
let columns = parse_clustering_columns(json).unwrap();
let expected_cols: Vec<ColumnName> = expected.into_iter().map(ColumnName::new).collect();
assert_eq!(columns, expected_cols);
}
#[test]
fn test_validate_clustering_columns_valid() {
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("name", DataType::STRING, true),
]);
let columns = vec![ColumnName::new(["id"])];
assert!(validate_clustering_columns(&schema, &columns).is_ok());
}
#[test]
fn test_validate_clustering_columns_not_found() {
let schema =
StructType::new_unchecked(vec![StructField::new("id", DataType::INTEGER, false)]);
let columns = vec![ColumnName::new(["nonexistent"])];
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in schema"));
}
#[test]
fn test_validate_clustering_columns_nested_valid() {
let address_struct = StructType::new_unchecked(vec![
StructField::new("city", DataType::STRING, true),
StructField::new("zip", DataType::STRING, true),
]);
let user_struct = StructType::new_unchecked(vec![
StructField::new("name", DataType::STRING, true),
StructField::new("address", DataType::Struct(Box::new(address_struct)), true),
]);
let schema = StructType::new_unchecked(vec![
StructField::new("id", DataType::INTEGER, false),
StructField::new("user", DataType::Struct(Box::new(user_struct)), true),
]);
let columns = vec![ColumnName::new(["user", "address", "city"])];
assert!(validate_clustering_columns(&schema, &columns).is_ok());
}
#[test]
fn test_validate_clustering_nested_struct_leaf_rejected() {
let inner_struct =
StructType::new_unchecked(vec![StructField::new("field", DataType::STRING, false)]);
let schema = StructType::new_unchecked(vec![StructField::new(
"parent",
DataType::Struct(Box::new(inner_struct)),
false,
)]);
let columns = vec![ColumnName::new(["parent"])];
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("unsupported type"));
}
#[test]
fn test_validate_clustering_nested_intermediate_not_struct() {
let schema =
StructType::new_unchecked(vec![StructField::new("flat_col", DataType::STRING, false)]);
let columns = vec![ColumnName::new(["flat_col", "child"])];
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("is not a struct type"));
}
#[test]
fn test_validate_clustering_nested_path_not_found() {
let inner_struct =
StructType::new_unchecked(vec![StructField::new("field", DataType::STRING, false)]);
let schema = StructType::new_unchecked(vec![StructField::new(
"parent",
DataType::Struct(Box::new(inner_struct)),
false,
)]);
let columns = vec![ColumnName::new(["parent", "nonexistent"])];
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("not found in schema"));
}
#[test]
fn test_create_clustering_domain_metadata() {
let columns = vec![ColumnName::new(["col1"]), ColumnName::new(["col2"])];
let dm = create_clustering_domain_metadata(&columns);
assert_eq!(dm.domain(), CLUSTERING_DOMAIN_NAME);
let parsed = parse_clustering_columns(dm.configuration()).unwrap();
assert_eq!(parsed, columns);
}
#[test]
fn test_create_and_parse_roundtrip() {
let original = vec![
ColumnName::new(["id"]),
ColumnName::new(["timestamp"]),
ColumnName::new(["region"]),
];
let dm = create_clustering_domain_metadata(&original);
let parsed = parse_clustering_columns(dm.configuration()).unwrap();
assert_eq!(original, parsed);
}
#[test]
fn test_validate_clustering_columns_supported_types() {
let schema = StructType::new_unchecked(vec![
StructField::new("byte_col", DataType::BYTE, false),
StructField::new("short_col", DataType::SHORT, false),
StructField::new("int_col", DataType::INTEGER, false),
StructField::new("long_col", DataType::LONG, false),
StructField::new("float_col", DataType::FLOAT, false),
StructField::new("double_col", DataType::DOUBLE, false),
StructField::new("date_col", DataType::DATE, false),
StructField::new("timestamp_col", DataType::TIMESTAMP, false),
StructField::new("timestamp_ntz_col", DataType::TIMESTAMP_NTZ, false),
StructField::new("string_col", DataType::STRING, false),
StructField::new("decimal_col", DataType::decimal(10, 2).unwrap(), false),
]);
for field in schema.fields() {
let columns = vec![ColumnName::new([field.name()])];
assert!(
validate_clustering_columns(&schema, &columns).is_ok(),
"Type {} should be supported for clustering",
field.data_type()
);
}
}
#[test]
fn test_validate_clustering_columns_unsupported_primitive_types() {
let schema = StructType::new_unchecked(vec![
StructField::new("bool_col", DataType::BOOLEAN, false),
StructField::new("binary_col", DataType::BINARY, false),
]);
for field in schema.fields() {
let columns = vec![ColumnName::new([field.name()])];
let result = validate_clustering_columns(&schema, &columns);
assert!(
result.is_err(),
"Type {} should NOT be supported for clustering",
field.data_type()
);
assert!(result.unwrap_err().to_string().contains("unsupported type"));
}
}
#[test]
fn test_validate_clustering_columns_complex_types_rejected() {
use crate::schema::{ArrayType, MapType};
let inner_struct =
StructType::new_unchecked(vec![StructField::new("inner", DataType::STRING, false)]);
let schema = StructType::new_unchecked(vec![
StructField::new(
"struct_col",
DataType::Struct(Box::new(inner_struct)),
false,
),
StructField::new(
"array_col",
DataType::Array(Box::new(ArrayType::new(DataType::INTEGER, false))),
false,
),
StructField::new(
"map_col",
DataType::Map(Box::new(MapType::new(
DataType::STRING,
DataType::INTEGER,
false,
))),
false,
),
]);
for field in schema.fields() {
let columns = vec![ColumnName::new([field.name()])];
let result = validate_clustering_columns(&schema, &columns);
assert!(
result.is_err(),
"Complex type {} should NOT be supported for clustering",
field.data_type()
);
assert!(result.unwrap_err().to_string().contains("unsupported type"));
}
}
#[rstest::rstest]
#[case::four(4)]
#[case::five(5)]
#[case::ten(10)]
fn test_validate_clustering_column_count(#[case] num_columns: usize) {
let fields: Vec<StructField> = (0..num_columns)
.map(|i| StructField::new(format!("col{i}"), DataType::INTEGER, false))
.collect();
let schema = StructType::new_unchecked(fields);
let columns: Vec<ColumnName> = (0..num_columns)
.map(|i| ColumnName::new([format!("col{i}")]))
.collect();
assert!(validate_clustering_columns(&schema, &columns).is_ok());
}
#[rstest::rstest]
#[case::empty_columns(vec![], "at least one column")]
#[case::duplicate_columns(vec!["id", "id"], "Duplicate clustering column")]
fn test_validate_clustering_structural_errors(
#[case] column_names: Vec<&str>,
#[case] expected_error: &str,
) {
let schema =
StructType::new_unchecked(vec![StructField::new("id", DataType::INTEGER, false)]);
let columns: Vec<ColumnName> = column_names
.into_iter()
.map(|s| ColumnName::new([s]))
.collect();
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(
result.unwrap_err().to_string().contains(expected_error),
"Expected error containing '{expected_error}'"
);
}
#[test]
fn test_validate_clustering_columns_empty_name_rejected() {
let schema =
StructType::new_unchecked(vec![StructField::new("id", DataType::INTEGER, false)]);
let columns: Vec<ColumnName> = vec![ColumnName::new(Vec::<String>::new())];
let result = validate_clustering_columns(&schema, &columns);
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("cannot be empty"));
}
}