use std::sync::{Arc, LazyLock};
use crate::actions::ADD_NAME;
use crate::expressions::{Expression, ExpressionRef, Transform, UnaryExpressionOp};
use crate::schema::{DataType, SchemaRef, StructField, StructType};
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};
pub(crate) const STATS_FIELD: &str = "stats";
pub(crate) const STATS_PARSED_FIELD: &str = "stats_parsed";
pub(crate) const PARTITION_VALUES_FIELD: &str = "partitionValues";
pub(crate) const PARTITION_VALUES_PARSED_FIELD: &str = "partitionValues_parsed";
#[derive(Debug, Clone, Copy)]
pub(crate) struct StatsTransformConfig {
pub write_stats_as_json: bool,
pub write_stats_as_struct: bool,
}
impl StatsTransformConfig {
pub(super) fn from_table_properties(properties: &TableProperties) -> Self {
Self {
write_stats_as_json: properties.should_write_stats_as_json(),
write_stats_as_struct: properties.should_write_stats_as_struct(),
}
}
}
pub(crate) fn build_checkpoint_transform(
config: &StatsTransformConfig,
stats_schema: &SchemaRef,
partition_schema: Option<&SchemaRef>,
) -> ExpressionRef {
let mut add_transform = Transform::new_nested([ADD_NAME]);
if config.write_stats_as_json {
add_transform = add_transform.with_replaced_field(STATS_FIELD, STATS_JSON_EXPR.clone());
} else {
add_transform = add_transform.with_dropped_field(STATS_FIELD);
}
if config.write_stats_as_struct {
let stats_parsed_expr = build_stats_parsed_expr(stats_schema);
add_transform = add_transform.with_replaced_field(STATS_PARSED_FIELD, stats_parsed_expr);
} else {
add_transform = add_transform.with_dropped_field(STATS_PARSED_FIELD);
}
if partition_schema.is_some() {
if config.write_stats_as_struct {
let pv_parsed_expr = build_partition_values_parsed_expr();
add_transform =
add_transform.with_replaced_field(PARTITION_VALUES_PARSED_FIELD, pv_parsed_expr);
} else {
add_transform = add_transform.with_dropped_field(PARTITION_VALUES_PARSED_FIELD);
}
}
let add_transform_expr: ExpressionRef = Arc::new(Expression::transform(add_transform));
let outer_transform =
Transform::new_top_level().with_replaced_field(ADD_NAME, add_transform_expr);
Arc::new(Expression::transform(outer_transform))
}
pub(crate) fn build_checkpoint_read_schema(
base_schema: &StructType,
stats_schema: &StructType,
partition_schema: Option<&StructType>,
) -> DeltaResult<SchemaRef> {
transform_add_schema(base_schema, |add_struct| {
if add_struct.field(STATS_PARSED_FIELD).is_some() {
return Err(Error::generic(
"stats_parsed field already exists in Add schema",
));
}
if partition_schema.is_some() && add_struct.field(PARTITION_VALUES_PARSED_FIELD).is_some() {
return Err(Error::generic(
"partitionValues_parsed field already exists in Add schema",
));
}
let mut result = add_struct.clone().with_field_inserted_after(
Some(STATS_FIELD),
StructField::nullable(
STATS_PARSED_FIELD,
DataType::Struct(Box::new(stats_schema.clone())),
),
)?;
if let Some(pv_schema) = partition_schema {
result = result.with_field_inserted_after(
Some(PARTITION_VALUES_FIELD),
StructField::nullable(
PARTITION_VALUES_PARSED_FIELD,
DataType::Struct(Box::new(pv_schema.clone())),
),
)?;
}
Ok(result)
})
}
pub(crate) fn build_checkpoint_output_schema(
config: &StatsTransformConfig,
base_schema: &StructType,
stats_schema: &StructType,
partition_schema: Option<&StructType>,
) -> DeltaResult<SchemaRef> {
transform_add_schema(base_schema, |add_struct| {
build_add_output_schema(config, add_struct, stats_schema, partition_schema)
})
}
fn build_stats_parsed_expr(stats_schema: &SchemaRef) -> ExpressionRef {
Arc::new(Expression::coalesce([
Expression::column([ADD_NAME, STATS_PARSED_FIELD]),
Expression::parse_json(
Expression::column([ADD_NAME, STATS_FIELD]),
stats_schema.clone(),
),
]))
}
fn build_partition_values_parsed_expr() -> ExpressionRef {
Arc::new(Expression::coalesce([
Expression::column([ADD_NAME, PARTITION_VALUES_PARSED_FIELD]),
Expression::map_to_struct(Expression::column([ADD_NAME, PARTITION_VALUES_FIELD])),
]))
}
static STATS_JSON_EXPR: LazyLock<ExpressionRef> = LazyLock::new(|| {
Arc::new(Expression::coalesce([
Expression::column([ADD_NAME, STATS_FIELD]),
Expression::unary(
UnaryExpressionOp::ToJson,
Expression::column([ADD_NAME, STATS_PARSED_FIELD]),
),
]))
});
fn transform_add_schema(
base_schema: &StructType,
transform_fn: impl FnOnce(&StructType) -> DeltaResult<StructType>,
) -> DeltaResult<SchemaRef> {
let add_field = base_schema
.field(ADD_NAME)
.ok_or_else(|| Error::generic("Expected 'add' field in checkpoint schema"))?;
let DataType::Struct(add_struct) = &add_field.data_type else {
return Err(Error::generic(format!(
"Expected 'add' field to be a struct type, got {:?}",
add_field.data_type
)));
};
let modified_add = transform_fn(add_struct)?;
let new_schema = base_schema.clone().with_field_replaced(
ADD_NAME,
StructField {
name: ADD_NAME.to_string(),
data_type: DataType::Struct(Box::new(modified_add)),
nullable: add_field.nullable,
metadata: add_field.metadata.clone(),
},
)?;
Ok(Arc::new(new_schema))
}
fn build_add_output_schema(
config: &StatsTransformConfig,
add_schema: &StructType,
stats_schema: &StructType,
partition_schema: Option<&StructType>,
) -> DeltaResult<StructType> {
let mut new_schema = add_schema.clone();
if config.write_stats_as_struct {
new_schema = new_schema.with_field_inserted_after(
Some(STATS_FIELD),
StructField::nullable(
STATS_PARSED_FIELD,
DataType::Struct(Box::new(stats_schema.clone())),
),
)?;
if let Some(pv_schema) = partition_schema {
new_schema = new_schema.with_field_inserted_after(
Some(PARTITION_VALUES_FIELD),
StructField::nullable(
PARTITION_VALUES_PARSED_FIELD,
DataType::Struct(Box::new(pv_schema.clone())),
),
)?;
}
}
if config.write_stats_as_json {
Ok(new_schema)
} else {
Ok(new_schema.with_field_removed(STATS_FIELD))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_defaults() {
let props = TableProperties::default();
let config = StatsTransformConfig::from_table_properties(&props);
assert!(config.write_stats_as_json);
assert!(!config.write_stats_as_struct);
}
#[test]
fn test_config_with_struct_enabled() {
let props = TableProperties {
checkpoint_write_stats_as_struct: Some(true),
..Default::default()
};
let config = StatsTransformConfig::from_table_properties(&props);
assert!(config.write_stats_as_json);
assert!(config.write_stats_as_struct);
}
fn extract_transforms(expr: &Expression) -> (&Transform, &Transform) {
let Expression::Transform(outer) = expr else {
panic!("Expected outer Transform expression");
};
assert!(
outer.input_path.is_none(),
"Outer transform should be top-level"
);
let add_field_transform = outer
.field_transforms
.get(ADD_NAME)
.expect("Outer transform should have 'add' field transform");
assert!(add_field_transform.is_replace, "Should replace 'add' field");
assert_eq!(
add_field_transform.exprs.len(),
1,
"Should have exactly one replacement expression"
);
let Expression::Transform(inner) = add_field_transform.exprs[0].as_ref() else {
panic!("Expected inner Transform expression for 'add' field");
};
assert_eq!(
inner.input_path.as_ref().map(|p| p.to_string()),
Some("add".to_string()),
"Inner transform should target 'add' path"
);
(outer, inner)
}
fn is_drop(transform: &Transform, field: &str) -> bool {
transform
.field_transforms
.get(field)
.map(|ft| ft.is_replace && ft.exprs.is_empty())
.unwrap_or(false)
}
fn is_replacement(transform: &Transform, field: &str) -> bool {
transform
.field_transforms
.get(field)
.map(|ft| ft.is_replace && ft.exprs.len() == 1)
.unwrap_or(false)
}
#[test]
fn test_build_transform_with_json_only() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: false,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_transforms(&transform_expr);
assert!(
is_replacement(inner, STATS_FIELD),
"stats should be replaced"
);
assert!(
is_drop(inner, STATS_PARSED_FIELD),
"stats_parsed should be dropped"
);
}
#[test]
fn test_build_transform_drops_both_when_false() {
let config = StatsTransformConfig {
write_stats_as_json: false,
write_stats_as_struct: false,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_transforms(&transform_expr);
assert!(is_drop(inner, STATS_FIELD), "stats should be dropped");
assert!(
is_drop(inner, STATS_PARSED_FIELD),
"stats_parsed should be dropped"
);
}
#[test]
fn test_build_transform_with_both_enabled() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_transforms(&transform_expr);
assert!(
is_replacement(inner, STATS_FIELD),
"stats should be replaced"
);
assert!(
is_replacement(inner, STATS_PARSED_FIELD),
"stats_parsed should be replaced"
);
}
#[test]
fn test_build_transform_struct_only() {
let config = StatsTransformConfig {
write_stats_as_json: false,
write_stats_as_struct: true,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_transforms(&transform_expr);
assert!(is_drop(inner, STATS_FIELD), "stats should be dropped");
assert!(
is_replacement(inner, STATS_PARSED_FIELD),
"stats_parsed should be replaced"
);
}
#[test]
fn test_build_transform_with_partition_values() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let pv_schema = Arc::new(StructType::new_unchecked([
StructField::nullable("year", DataType::INTEGER),
StructField::nullable("month", DataType::INTEGER),
]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, Some(&pv_schema));
let (_, inner) = extract_transforms(&transform_expr);
assert!(
is_replacement(inner, PARTITION_VALUES_PARSED_FIELD),
"partitionValues_parsed should be replaced"
);
}
#[test]
fn test_build_transform_no_partition_values_when_struct_disabled() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: false,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let pv_schema = Arc::new(StructType::new_unchecked([StructField::nullable(
"year",
DataType::INTEGER,
)]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, Some(&pv_schema));
let (_, inner) = extract_transforms(&transform_expr);
assert!(
is_drop(inner, PARTITION_VALUES_PARSED_FIELD),
"partitionValues_parsed should be dropped"
);
}
#[test]
fn test_build_transform_non_partitioned_table() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let transform_expr = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_transforms(&transform_expr);
assert!(
!inner
.field_transforms
.contains_key(PARTITION_VALUES_PARSED_FIELD),
"non-partitioned table should not have partitionValues_parsed transform"
);
}
#[test]
fn test_field_inserted_after_in_add_schema() {
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("stats", DataType::STRING),
StructField::nullable("tags", DataType::STRING),
]);
let injected_schema =
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
let result = add_schema
.with_field_inserted_after(
Some(STATS_FIELD),
StructField::nullable(
STATS_PARSED_FIELD,
DataType::Struct(Box::new(injected_schema)),
),
)
.expect("inserting stats_parsed should succeed");
assert_eq!(result.fields().count(), 4);
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(field_names, vec!["path", "stats", "stats_parsed", "tags"]);
}
#[test]
fn test_build_add_output_schema_json_only() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: false,
};
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("stats", DataType::STRING),
]);
let stats_schema = StructType::new_unchecked([]);
let result = build_add_output_schema(&config, &add_schema, &stats_schema, None)
.expect("build add output schema should produce a valid schema");
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(field_names, vec!["path", "stats"]);
}
#[test]
fn test_build_add_output_schema_struct_only() {
let config = StatsTransformConfig {
write_stats_as_json: false,
write_stats_as_struct: true,
};
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("stats", DataType::STRING),
]);
let stats_schema =
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
let result = build_add_output_schema(&config, &add_schema, &stats_schema, None)
.expect("build add output schema should produce a valid schema");
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(field_names, vec!["path", "stats_parsed"]);
}
#[test]
fn test_build_add_output_schema_both() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable("stats", DataType::STRING),
]);
let stats_schema =
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
let result = build_add_output_schema(&config, &add_schema, &stats_schema, None)
.expect("build add output schema should produce a valid schema");
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(field_names, vec!["path", "stats", "stats_parsed"]);
}
#[test]
fn test_build_add_output_schema_with_partition_values() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable(
"partitionValues",
DataType::Map(Box::new(crate::schema::MapType::new(
DataType::STRING,
DataType::STRING,
true,
))),
),
StructField::nullable("stats", DataType::STRING),
]);
let stats_schema =
StructType::new_unchecked([StructField::nullable("numRecords", DataType::LONG)]);
let pv_schema = StructType::new_unchecked([
StructField::nullable("year", DataType::INTEGER),
StructField::nullable("month", DataType::INTEGER),
]);
let result = build_add_output_schema(&config, &add_schema, &stats_schema, Some(&pv_schema))
.expect("build add output schema should produce a valid schema");
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(
field_names,
vec![
"path",
"partitionValues",
"partitionValues_parsed",
"stats",
"stats_parsed"
]
);
}
#[test]
fn test_build_add_output_schema_no_partition_values_when_struct_disabled() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: false,
};
let add_schema = StructType::new_unchecked([
StructField::not_null("path", DataType::STRING),
StructField::nullable(
"partitionValues",
DataType::Map(Box::new(crate::schema::MapType::new(
DataType::STRING,
DataType::STRING,
true,
))),
),
StructField::nullable("stats", DataType::STRING),
]);
let stats_schema = StructType::new_unchecked([]);
let pv_schema =
StructType::new_unchecked([StructField::nullable("year", DataType::INTEGER)]);
let result = build_add_output_schema(&config, &add_schema, &stats_schema, Some(&pv_schema))
.expect("build add output schema should produce a valid schema");
let field_names: Vec<&str> = result.fields().map(|f| f.name.as_str()).collect();
assert_eq!(field_names, vec!["path", "partitionValues", "stats"]);
}
}