use std::sync::{Arc, LazyLock};
use crate::actions::{ADD_NAME, STATS_PARSED as STATS_PARSED_FIELD};
use crate::expressions::{col, Expression, ExpressionRef, UnaryExpressionOp};
use crate::schema::{DataType, SchemaRef, SchemaStructPatchBuilder, StructField, StructType};
use crate::struct_patch::ProjectionStructPatchBuilder;
use crate::table_properties::TableProperties;
use crate::{DeltaResult, Error};
pub(crate) const STATS_FIELD: &str = "stats";
pub(crate) const PARTITION_VALUES_FIELD: &str = "partitionValues";
pub(crate) const PARTITION_VALUES_PARSED_FIELD: &str = "partitionValues_parsed";
#[derive(Debug, Clone, Copy)]
pub(crate) struct StatsTransformConfig {
pub write_stats_as_json: bool,
pub write_stats_as_struct: bool,
}
impl StatsTransformConfig {
pub(super) fn from_table_properties(properties: &TableProperties) -> Self {
Self {
write_stats_as_json: properties.should_write_stats_as_json(),
write_stats_as_struct: properties.should_write_stats_as_struct(),
}
}
}
pub(crate) fn build_checkpoint_transform(
config: &StatsTransformConfig,
read_schema: &StructType,
stats_schema: &SchemaRef,
partition_schema: Option<&SchemaRef>,
) -> DeltaResult<(SchemaRef, ExpressionRef)> {
let mut patch_builder = ProjectionStructPatchBuilder::new(read_schema);
if config.write_stats_as_json {
patch_builder =
patch_builder.replace_expr_at([ADD_NAME], STATS_FIELD, STATS_JSON_EXPR.clone());
} else {
patch_builder = patch_builder.drop_at([ADD_NAME], STATS_FIELD);
}
if config.write_stats_as_struct {
let stats_parsed_expr = build_stats_parsed_expr(stats_schema);
patch_builder =
patch_builder.replace_expr_at([ADD_NAME], STATS_PARSED_FIELD, stats_parsed_expr);
} else {
patch_builder = patch_builder.drop_at([ADD_NAME], STATS_PARSED_FIELD);
}
if partition_schema.is_some() {
if config.write_stats_as_struct {
let pv_parsed_expr = build_partition_values_parsed_expr();
patch_builder = patch_builder.replace_expr_at(
[ADD_NAME],
PARTITION_VALUES_PARSED_FIELD,
pv_parsed_expr,
);
} else {
patch_builder = patch_builder.drop_at([ADD_NAME], PARTITION_VALUES_PARSED_FIELD);
}
}
patch_builder.build()
}
pub(crate) fn build_checkpoint_read_schema(
base_schema: &StructType,
stats_schema: &StructType,
partition_schema: Option<&StructType>,
) -> DeltaResult<SchemaRef> {
transform_add_schema(base_schema, |add_struct| {
if add_struct.field(STATS_PARSED_FIELD).is_some() {
return Err(Error::generic(
"stats_parsed field already exists in Add schema",
));
}
if partition_schema.is_some() && add_struct.field(PARTITION_VALUES_PARSED_FIELD).is_some() {
return Err(Error::generic(
"partitionValues_parsed field already exists in Add schema",
));
}
let mut patch = SchemaStructPatchBuilder::new().insert_after(
STATS_FIELD,
StructField::nullable(STATS_PARSED_FIELD, stats_schema.clone()),
);
if let Some(pv_schema) = partition_schema {
patch = patch.insert_after(
PARTITION_VALUES_FIELD,
StructField::nullable(PARTITION_VALUES_PARSED_FIELD, pv_schema.clone()),
);
}
patch.build(add_struct)
})
}
fn build_stats_parsed_expr(stats_schema: &SchemaRef) -> ExpressionRef {
Arc::new(Expression::coalesce([
col!(ADD_NAME, STATS_PARSED_FIELD),
Expression::parse_json(col!(ADD_NAME, STATS_FIELD), stats_schema.clone()),
]))
}
fn build_partition_values_parsed_expr() -> ExpressionRef {
Arc::new(Expression::coalesce([
col!(ADD_NAME, PARTITION_VALUES_PARSED_FIELD),
Expression::map_to_struct(col!(ADD_NAME, PARTITION_VALUES_FIELD)),
]))
}
static STATS_JSON_EXPR: LazyLock<ExpressionRef> = LazyLock::new(|| {
Arc::new(Expression::coalesce([
col!(ADD_NAME, STATS_FIELD),
Expression::unary(
UnaryExpressionOp::ToJson,
col!(ADD_NAME, STATS_PARSED_FIELD),
),
]))
});
fn transform_add_schema(
base_schema: &StructType,
transform_fn: impl FnOnce(&StructType) -> DeltaResult<StructType>,
) -> DeltaResult<SchemaRef> {
let add_field = base_schema
.field(ADD_NAME)
.ok_or_else(|| Error::generic("Expected 'add' field in checkpoint schema"))?;
let DataType::Struct(add_struct) = &add_field.data_type else {
return Err(Error::generic(format!(
"Expected 'add' field to be a struct type, got {:?}",
add_field.data_type
)));
};
let modified_add = transform_fn(add_struct)?;
let new_add_field = StructField::new(ADD_NAME, modified_add, add_field.nullable)
.with_metadata(add_field.metadata.clone());
let new_schema = SchemaStructPatchBuilder::new()
.replace(ADD_NAME, new_add_field)
.build(base_schema)?;
Ok(Arc::new(new_schema))
}
#[cfg(test)]
mod tests {
use rstest::rstest;
use super::*;
use crate::actions::NUM_RECORDS;
use crate::expressions::ExpressionStructPatch;
use crate::schema::MapType;
#[test]
fn test_config_defaults() {
let props = TableProperties::default();
let config = StatsTransformConfig::from_table_properties(&props);
assert!(config.write_stats_as_json);
assert!(!config.write_stats_as_struct);
}
#[test]
fn test_config_with_struct_enabled() {
let props = TableProperties {
checkpoint_write_stats_as_struct: Some(true),
..Default::default()
};
let config = StatsTransformConfig::from_table_properties(&props);
assert!(config.write_stats_as_json);
assert!(config.write_stats_as_struct);
}
fn extract_patches(expr: &Expression) -> (&ExpressionStructPatch, &ExpressionStructPatch) {
let Expression::StructPatch(outer) = expr else {
panic!("Expected outer StructPatch expression");
};
assert!(
outer.input_path.is_none(),
"Outer patch should be top-level"
);
let add_field_patch = outer
.field_patches
.get(ADD_NAME)
.expect("Outer patch should have 'add' field patch");
assert!(
!add_field_patch.keep_input && !add_field_patch.insertions.is_empty(),
"Should replace 'add' field"
);
let Expression::StructPatch(inner) = add_field_patch.insertions[0].as_ref() else {
panic!("Expected inner StructPatch expression for 'add' field");
};
assert_eq!(
inner.input_path.as_ref().map(|p| p.to_string()),
Some("add".to_string()),
"Inner patch should target 'add' path"
);
(outer, inner)
}
fn is_drop(patch: &ExpressionStructPatch, field: &str) -> bool {
patch
.field_patches
.get(field)
.is_some_and(|ft| !ft.keep_input && ft.insertions.is_empty())
}
fn is_replacement(patch: &ExpressionStructPatch, field: &str) -> bool {
patch
.field_patches
.get(field)
.is_some_and(|ft| !ft.keep_input && !ft.insertions.is_empty())
}
fn build_checkpoint_transform(
config: &StatsTransformConfig,
stats_schema: &SchemaRef,
partition_schema: Option<&SchemaRef>,
) -> (SchemaRef, ExpressionRef) {
let base_schema = StructType::new_unchecked([StructField::nullable(
ADD_NAME,
add_schema(partition_schema.is_some()),
)]);
let read_schema = build_checkpoint_read_schema(
&base_schema,
stats_schema.as_ref(),
partition_schema.map(AsRef::as_ref),
)
.expect("build checkpoint read schema should produce a valid schema");
super::build_checkpoint_transform(config, &read_schema, stats_schema, partition_schema)
.expect("build checkpoint transform should produce a valid schema and expression")
}
fn add_schema(with_partition_schema: bool) -> StructType {
let fields = [
Some(StructField::not_null("path", DataType::STRING)),
with_partition_schema.then(|| {
let partition_values = MapType::new(DataType::STRING, DataType::STRING, true);
StructField::nullable(PARTITION_VALUES_FIELD, partition_values)
}),
Some(StructField::nullable(STATS_FIELD, DataType::STRING)),
];
StructType::new_unchecked(fields.into_iter().flatten())
}
#[test]
fn test_build_transform_with_json_only() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: false,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let (_, transform_expr) = build_checkpoint_transform(&config, &stats_schema, None);
let (_, inner) = extract_patches(&transform_expr);
assert!(
is_replacement(inner, STATS_FIELD),
"stats should be replaced"
);
assert!(
is_drop(inner, STATS_PARSED_FIELD),
"stats_parsed should be dropped"
);
}
#[test]
fn test_build_transform_populates_struct_stats_and_partition_values() {
let config = StatsTransformConfig {
write_stats_as_json: true,
write_stats_as_struct: true,
};
let stats_schema = Arc::new(StructType::new_unchecked([]));
let pv_schema = Arc::new(StructType::new_unchecked([
StructField::nullable("year", DataType::INTEGER),
StructField::nullable("month", DataType::INTEGER),
]));
let (_, transform_expr) =
build_checkpoint_transform(&config, &stats_schema, Some(&pv_schema));
let (_, inner) = extract_patches(&transform_expr);
assert!(
is_replacement(inner, STATS_FIELD),
"stats should be replaced"
);
assert!(
is_replacement(inner, STATS_PARSED_FIELD),
"stats_parsed should be replaced"
);
assert!(
is_replacement(inner, PARTITION_VALUES_PARSED_FIELD),
"partitionValues_parsed should be replaced"
);
}
#[rstest]
#[case::no_stats(false, false, false, &["path"])]
#[case::json_only(true, false, false, &["path", "stats"])]
#[case::struct_only(false, true, false, &["path", "stats_parsed"])]
#[case::json_and_struct(true, true, false, &["path", "stats", "stats_parsed"])]
#[case::with_partition_values(
true,
true,
true,
&["path", "partitionValues", "partitionValues_parsed", "stats", "stats_parsed"])]
#[case::partition_values_struct_disabled(
true,
false,
true,
&["path", "partitionValues", "stats"])]
fn test_build_add_output_schema(
#[case] write_stats_as_json: bool,
#[case] write_stats_as_struct: bool,
#[case] with_partition_schema: bool,
#[case] expected_names: &[&str],
) {
let config = StatsTransformConfig {
write_stats_as_json,
write_stats_as_struct,
};
let num_records = StructField::nullable(NUM_RECORDS, DataType::LONG);
let stats_schema = Arc::new(StructType::new_unchecked([num_records]));
let pv_schema = with_partition_schema.then(|| {
Arc::new(StructType::new_unchecked([
StructField::nullable("year", DataType::INTEGER),
StructField::nullable("month", DataType::INTEGER),
]))
});
let (output_schema, _) =
build_checkpoint_transform(&config, &stats_schema, pv_schema.as_ref());
let add_field = output_schema
.field(ADD_NAME)
.expect("Expected output schema to contain add field");
let DataType::Struct(add_schema) = add_field.data_type() else {
panic!("Expected output add field to be a struct");
};
let field_names: Vec<_> = add_schema
.fields()
.map(|field| field.name.as_str())
.collect();
assert_eq!(field_names, expected_names);
}
}