use std::borrow::Cow;
use std::sync::Arc;
use arrow_schema::{
DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use delta_kernel::arrow::array::BooleanArray;
use delta_kernel::arrow::compute::filter_record_batch;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::{ColumnName, Scalar, StructData};
use delta_kernel::scan::ScanMetadata;
use delta_kernel::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField, StructType,
};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::table_configuration::TableConfiguration;
use delta_kernel::table_features::ColumnMappingMode;
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
use delta_kernel::transforms::{SchemaTransform, transform_output_type};
use delta_kernel::{DeltaResult, ExpressionEvaluator};
use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError};
use crate::kernel::{SCAN_ROW_ARROW_SCHEMA, StatsProjection};
pub(crate) trait SnapshotExt {
fn stats_schema(&self) -> DeltaResult<SchemaRef>;
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>>;
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef>;
}
impl SnapshotExt for TableConfiguration {
fn stats_schema(&self) -> DeltaResult<SchemaRef> {
let (source_schema, table_properties) = stats_inputs(self)?;
Ok(Arc::new(stats_schema(
&source_schema,
table_properties.as_ref(),
)))
}
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>> {
Ok(partitions_schema(
self.logical_schema().as_ref(),
self.metadata().partition_columns(),
)?
.map(Arc::new))
}
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef> {
build_scan_row_parsed_schema_arrow(self.stats_schema()?, self.partitions_schema()?)
}
}
impl SnapshotExt for Snapshot {
fn stats_schema(&self) -> DeltaResult<SchemaRef> {
self.table_configuration().stats_schema()
}
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>> {
self.table_configuration().partitions_schema()
}
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef> {
StatsProjection::Full.parsed_scan_row_schema_arrow(self)
}
}
fn build_scan_row_parsed_schema_arrow(
stats_schema: SchemaRef,
partition_schema: Option<SchemaRef>,
) -> DeltaResultLocal<ArrowSchemaRef> {
let mut fields = SCAN_ROW_ARROW_SCHEMA.fields().to_vec();
let stats_schema: ArrowSchema = stats_schema.as_ref().try_into_arrow()?;
fields.push(Arc::new(Field::new(
"stats_parsed",
ArrowDataType::Struct(stats_schema.fields().to_owned()),
true,
)));
if let Some(partition_schema) = partition_schema {
let partition_schema: ArrowSchema = partition_schema.as_ref().try_into_arrow()?;
fields.push(Arc::new(Field::new(
"partitionValues_parsed",
ArrowDataType::Struct(partition_schema.fields().to_owned()),
false,
)));
}
Ok(Arc::new(ArrowSchema::new(fields)))
}
fn partitions_schema(
schema: &StructType,
partition_columns: &[String],
) -> DeltaResultLocal<Option<StructType>> {
if partition_columns.is_empty() {
return Ok(None);
}
Ok(Some(StructType::try_new(
partition_columns
.iter()
.map(|col| {
schema.field(col).cloned().ok_or_else(|| {
DeltaTableError::Generic(format!("Partition column {col} not found in schema"))
})
})
.collect::<Result<Vec<_>, _>>()?,
)?))
}
pub(crate) fn stats_inputs(
table_configuration: &TableConfiguration,
) -> DeltaResult<(StructType, Cow<'_, TableProperties>)> {
let source_schema = stats_source_schema(
table_configuration.logical_schema().as_ref(),
table_configuration.metadata().partition_columns(),
table_configuration.column_mapping_mode(),
)?;
let table_properties = stats_table_properties(
table_configuration.logical_schema().as_ref(),
table_configuration.table_properties(),
table_configuration.column_mapping_mode(),
);
Ok((source_schema, table_properties))
}
fn stats_source_schema(
logical_schema: &StructType,
partition_columns: &[String],
column_mapping_mode: ColumnMappingMode,
) -> DeltaResult<StructType> {
let fields = logical_schema
.fields()
.filter(|field| !partition_columns.contains(field.name()))
.map(|field| field.make_physical(column_mapping_mode))
.collect::<DeltaResult<Vec<_>>>()?;
StructType::try_new(fields)
}
fn stats_table_properties<'a>(
logical_schema: &StructType,
table_properties: &'a TableProperties,
column_mapping_mode: ColumnMappingMode,
) -> Cow<'a, TableProperties> {
if column_mapping_mode == ColumnMappingMode::None
|| table_properties.data_skipping_stats_columns.is_none()
{
return Cow::Borrowed(table_properties);
}
let mut table_properties = table_properties.clone();
table_properties.data_skipping_stats_columns =
table_properties.data_skipping_stats_columns.map(|columns| {
columns
.iter()
.filter_map(|column| {
physical_column_name(logical_schema, column, column_mapping_mode)
})
.collect()
});
Cow::Owned(table_properties)
}
fn physical_column_name(
logical_schema: &StructType,
column: &ColumnName,
column_mapping_mode: ColumnMappingMode,
) -> Option<ColumnName> {
let mut path = column.path().iter().peekable();
let mut physical_path = Vec::with_capacity(column.path().len());
let mut current_struct = logical_schema;
while let Some(field_name) = path.next() {
let field = current_struct.field(field_name)?;
physical_path.push(field.physical_name(column_mapping_mode).to_string());
if path.peek().is_some() {
let DataType::Struct(inner) = field.data_type() else {
return None;
};
current_struct = inner;
}
}
Some(ColumnName::new(physical_path))
}
pub(crate) fn stats_schema(
stats_source_schema: &Schema,
table_properties: &TableProperties,
) -> Schema {
let mut fields = Vec::with_capacity(4);
fields.push(StructField::nullable("numRecords", DataType::LONG));
let mut base_transform = BaseStatsTransform::new(table_properties);
if let Some(base_schema) = base_transform.transform_struct(stats_source_schema) {
let base_schema = base_schema.into_owned();
let mut null_count_transform = NullCountStatsTransform;
if let Some(null_count_schema) = null_count_transform.transform_struct(&base_schema) {
fields.push(StructField::nullable(
"nullCount",
null_count_schema.into_owned(),
));
};
let mut min_max_transform = MinMaxStatsTransform;
if let Some(min_max_schema) = min_max_transform.transform_struct(&base_schema) {
let min_max_schema = min_max_schema.into_owned();
fields.push(StructField::nullable("minValues", min_max_schema.clone()));
fields.push(StructField::nullable("maxValues", min_max_schema));
}
}
StructType::try_new(fields).expect("Failed to construct StructType for stats_schema")
}
pub(crate) struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
transform_output_type!(|'a, T| Option<Cow<'a, T>>);
fn transform_primitive(&mut self, _ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
Some(Cow::Owned(PrimitiveType::Long))
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
use Cow::*;
if matches!(
&field.data_type,
DataType::Array(_) | DataType::Map(_) | DataType::Variant(_)
) {
return Some(Cow::Owned(StructField {
name: field.name.clone(),
data_type: DataType::LONG,
nullable: true,
metadata: Default::default(),
}));
}
match self.transform(&field.data_type)? {
Borrowed(_) => Some(Borrowed(field)),
dt => Some(Owned(StructField {
name: field.name.clone(),
data_type: dt.into_owned(),
nullable: true,
metadata: Default::default(),
})),
}
}
}
struct BaseStatsTransform {
n_columns: Option<DataSkippingNumIndexedCols>,
added_columns: u64,
column_names: Option<Vec<ColumnName>>,
path: Vec<String>,
}
impl BaseStatsTransform {
fn new(props: &TableProperties) -> Self {
if let Some(columns_names) = &props.data_skipping_stats_columns {
Self {
n_columns: None,
added_columns: 0,
column_names: Some(columns_names.clone()),
path: Vec::new(),
}
} else {
Self {
n_columns: Some(
props
.data_skipping_num_indexed_cols
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32)),
),
added_columns: 0,
column_names: None,
path: Vec::new(),
}
}
}
}
impl<'a> SchemaTransform<'a> for BaseStatsTransform {
transform_output_type!(|'a, T| Option<Cow<'a, T>>);
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
use Cow::*;
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns
&& self.added_columns >= n_cols
{
return None;
}
self.path.push(field.name.clone());
let data_type = field.data_type();
let should_include = matches!(data_type, DataType::Struct(_))
|| self
.column_names
.as_ref()
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
.unwrap_or(true);
if !should_include {
self.path.pop();
return None;
}
if !matches!(data_type, DataType::Struct(_)) {
self.added_columns += 1;
}
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: Default::default(),
}),
};
self.path.pop();
if matches!(field.data_type(), DataType::Struct(dt) if dt.num_fields() == 0) {
None
} else {
Some(field)
}
}
}
struct MinMaxStatsTransform;
impl<'a> SchemaTransform<'a> for MinMaxStatsTransform {
transform_output_type!(|'a, T| Option<Cow<'a, T>>);
fn transform_array(&mut self, _: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
None
}
fn transform_map(&mut self, _: &'a MapType) -> Option<Cow<'a, MapType>> {
None
}
fn transform_variant(&mut self, _: &'a StructType) -> Option<Cow<'a, StructType>> {
None
}
fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
if is_skipping_eligeble_datatype(ptype) {
Some(Cow::Borrowed(ptype))
} else {
None
}
}
}
fn should_include_column(column_name: &ColumnName, column_names: &[ColumnName]) -> bool {
column_names.iter().any(|name| {
name.as_ref().starts_with(column_name) || column_name.as_ref().starts_with(name)
})
}
fn is_skipping_eligeble_datatype(data_type: &PrimitiveType) -> bool {
let matches_nanos = false;
#[cfg(feature = "nanosecond-timestamps")]
let matches_nanos = matches!(data_type, &PrimitiveType::TimestampNanos);
matches!(
data_type,
&PrimitiveType::Byte
| &PrimitiveType::Short
| &PrimitiveType::Integer
| &PrimitiveType::Long
| &PrimitiveType::Float
| &PrimitiveType::Double
| &PrimitiveType::Boolean
| &PrimitiveType::Date
| &PrimitiveType::Timestamp
| &PrimitiveType::TimestampNtz
| &PrimitiveType::String
| PrimitiveType::Decimal(_)
) || matches_nanos
}
pub(crate) fn rb_from_scan_meta(metadata: ScanMetadata) -> DeltaResult<RecordBatch> {
let (underlying_data, selection_vector) = metadata.scan_files.into_parts();
let batch = ArrowEngineData::try_from_engine_data(underlying_data)?.into();
Ok(filter_record_batch(
&batch,
&BooleanArray::from(selection_vector),
)?)
}
pub(crate) trait ExpressionEvaluatorExt {
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
}
impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch> {
let engine_data = ArrowEngineData::new(batch);
Ok(ArrowEngineData::try_from_engine_data(T::evaluate(self, &engine_data)?)?.into())
}
}
pub trait StructDataExt {
fn field(&self, name: &str) -> Option<&StructField>;
fn value(&self, index: usize) -> Option<&Scalar>;
fn index_of(&self, name: &str) -> Option<usize>;
}
impl StructDataExt for StructData {
fn field(&self, name: &str) -> Option<&StructField> {
self.fields().iter().find(|f| f.name() == name)
}
fn index_of(&self, name: &str) -> Option<usize> {
self.fields().iter().position(|f| f.name() == name)
}
fn value(&self, index: usize) -> Option<&Scalar> {
self.values().get(index)
}
}
#[cfg(test)]
mod tests {
use std::collections::HashMap;
use std::sync::Arc;
use super::*;
use crate::test_utils::{
build_test_table_configuration, column_mapping_test_field,
column_mapping_test_field_with_type,
};
use delta_kernel::EvaluationHandler;
use delta_kernel::arrow::array::Int32Array;
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
use delta_kernel::expressions::*;
use delta_kernel::schema::{ArrayType, DataType};
use pretty_assertions::assert_eq;
#[test]
fn test_evaluate_arrow() {
let handler = ArrowEvaluationHandler;
let schema = Schema::new(vec![Field::new("a", ArrowDataType::Int32, false)]);
let values = Int32Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
let expression = column_expr!("a");
let expr = handler
.new_expression_evaluator(
Arc::new((&schema).try_into_kernel().unwrap()),
expression.into(),
DataType::INTEGER,
)
.unwrap();
let result = expr.evaluate_arrow(batch);
assert!(result.is_ok());
}
#[test]
fn test_should_include_column() {
let full_name = vec![ColumnName::new(["lvl1", "lvl2", "lvl3", "lvl4"])];
let parent = ColumnName::new(["lvl1", "lvl2", "lvl3"]);
assert!(should_include_column(&parent, &full_name));
assert!(should_include_column(&full_name[0], &full_name));
assert!(should_include_column(&full_name[0], &[parent]));
let not_parent = ColumnName::new(["lvl1", "lvl2", "lvl3", "lvl5"]);
assert!(!should_include_column(¬_parent, &full_name));
let not_parent = ColumnName::new(["lvl1", "lvl3", "lvl4"]);
assert!(!should_include_column(¬_parent, &full_name));
let not_parent = ColumnName::new(["lvl1", "lvl2", "lvl4"]);
assert!(!should_include_column(¬_parent, &full_name));
}
#[test]
fn test_stats_schema_simple() {
let properties: TableProperties = [("key", "value")].into();
let file_schema =
StructType::try_new([StructField::nullable("id", DataType::LONG)]).unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", file_schema.clone()),
StructField::nullable("minValues", file_schema.clone()),
StructField::nullable("maxValues", file_schema),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_with_non_eligible_field() {
let properties: TableProperties = [("key", "value")].into();
let array_type = DataType::Array(Box::new(ArrayType::new(DataType::STRING, false)));
let metadata_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("tags", array_type),
StructField::nullable("score", DataType::DOUBLE),
])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable(
"metadata",
DataType::Struct(Box::new(metadata_struct.clone())),
),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_nested = StructType::try_new([
StructField::nullable("name", DataType::LONG),
StructField::nullable("tags", DataType::LONG),
StructField::nullable("score", DataType::LONG),
])
.unwrap();
let expected_null = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("metadata", DataType::Struct(Box::new(expected_null_nested))),
])
.unwrap();
let expected_nested = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("score", DataType::DOUBLE),
])
.unwrap();
let expected_fields = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("metadata", DataType::Struct(Box::new(expected_nested))),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_col_names() {
let properties: TableProperties = [(
"delta.dataSkippingStatsColumns".to_string(),
"`user.info`.name".to_string(),
)]
.into();
let user_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user.info", DataType::Struct(Box::new(user_struct.clone()))),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_nested =
StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap();
let expected_fields = StructType::try_new([StructField::nullable(
"user.info",
DataType::Struct(Box::new(expected_nested)),
)])
.unwrap();
let null_count = NullCountStatsTransform
.transform_struct(&expected_fields)
.unwrap()
.into_owned();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", null_count),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_source_schema_excludes_partition_columns() {
let logical_schema = StructType::try_new([
StructField::nullable("p", DataType::INTEGER),
StructField::nullable("a", DataType::INTEGER),
])
.unwrap();
let stats_schema =
stats_source_schema(&logical_schema, &["p".to_string()], ColumnMappingMode::None)
.unwrap();
assert!(stats_schema.field("a").is_some());
assert!(stats_schema.field("p").is_none());
}
#[test]
fn test_stats_source_schema_applies_column_mapping_physical_names() {
let logical_schema = StructType::try_new([
column_mapping_test_field("p", "col_p", 1),
column_mapping_test_field("a", "col_a", 2),
])
.unwrap();
for column_mapping_mode in [ColumnMappingMode::Name, ColumnMappingMode::Id] {
let stats_schema =
stats_source_schema(&logical_schema, &["p".to_string()], column_mapping_mode)
.unwrap();
assert!(stats_schema.field("col_a").is_some());
assert!(stats_schema.field("a").is_none());
assert!(stats_schema.field("col_p").is_none());
assert!(stats_schema.field("p").is_none());
}
}
#[test]
fn test_table_configuration_stats_schema_translates_stats_columns_to_physical_names() {
for column_mapping_mode in ["name", "id"] {
let logical_schema = StructType::try_new([
column_mapping_test_field("p", "col_p", 1),
column_mapping_test_field("a", "col_a", 2),
column_mapping_test_field("b", "col_b", 3),
])
.unwrap();
let table_configuration = build_test_table_configuration(
logical_schema,
vec!["p".to_string()],
HashMap::from([
(
"delta.columnMapping.mode".to_string(),
column_mapping_mode.to_string(),
),
(
"delta.dataSkippingStatsColumns".to_string(),
"a".to_string(),
),
]),
);
let stats_schema = table_configuration.stats_schema().unwrap();
let min_values = match stats_schema.field("minValues").unwrap().data_type() {
DataType::Struct(fields) => fields,
other => panic!("expected minValues struct, got {other:?}"),
};
assert!(min_values.field("col_a").is_some());
assert!(min_values.field("a").is_none());
assert!(min_values.field("col_p").is_none());
assert!(min_values.field("col_b").is_none());
}
}
#[test]
fn test_table_configuration_stats_schema_translates_nested_stats_columns_to_physical_names() {
for column_mapping_mode in ["name", "id"] {
let nested_schema = StructType::try_new([
column_mapping_test_field("a", "col_a", 3),
column_mapping_test_field("b", "col_b", 4),
])
.unwrap();
let logical_schema = StructType::try_new([
column_mapping_test_field("p", "col_p", 1),
column_mapping_test_field_with_type(
"s",
"col_s",
2,
DataType::Struct(Box::new(nested_schema)),
),
])
.unwrap();
let table_configuration = build_test_table_configuration(
logical_schema,
vec!["p".to_string()],
HashMap::from([
(
"delta.columnMapping.mode".to_string(),
column_mapping_mode.to_string(),
),
(
"delta.dataSkippingStatsColumns".to_string(),
"s.a".to_string(),
),
]),
);
let stats_schema = table_configuration.stats_schema().unwrap();
let min_values = match stats_schema.field("minValues").unwrap().data_type() {
DataType::Struct(fields) => fields,
other => panic!("expected minValues struct, got {other:?}"),
};
let nested_min_values = match min_values.field("col_s").unwrap().data_type() {
DataType::Struct(fields) => fields,
other => panic!("expected nested minValues struct, got {other:?}"),
};
assert!(nested_min_values.field("col_a").is_some());
assert!(nested_min_values.field("a").is_none());
assert!(nested_min_values.field("col_b").is_none());
assert!(min_values.field("s").is_none());
assert!(min_values.field("col_p").is_none());
}
}
#[test]
fn test_table_configuration_stats_schema_drops_stats_columns_with_non_struct_intermediate() {
for column_mapping_mode in ["name", "id"] {
let logical_schema = StructType::try_new([
column_mapping_test_field("p", "col_p", 1),
column_mapping_test_field("a", "col_a", 2),
])
.unwrap();
let table_configuration = build_test_table_configuration(
logical_schema,
vec!["p".to_string()],
HashMap::from([
(
"delta.columnMapping.mode".to_string(),
column_mapping_mode.to_string(),
),
(
"delta.dataSkippingStatsColumns".to_string(),
"a.b".to_string(),
),
]),
);
let stats_schema = table_configuration.stats_schema().unwrap();
assert!(stats_schema.field("nullCount").is_none());
assert!(stats_schema.field("minValues").is_none());
assert!(stats_schema.field("maxValues").is_none());
}
}
#[test]
fn test_table_configuration_stats_schema_num_indexed_cols_ignores_partition_columns() {
let logical_schema = StructType::try_new([
StructField::nullable("p", DataType::INTEGER),
StructField::nullable("a", DataType::INTEGER),
])
.unwrap();
let table_configuration = build_test_table_configuration(
logical_schema,
vec!["p".to_string()],
HashMap::from([(
"delta.dataSkippingNumIndexedCols".to_string(),
"1".to_string(),
)]),
);
let stats_schema = table_configuration.stats_schema().unwrap();
let min_values = match stats_schema.field("minValues").unwrap().data_type() {
DataType::Struct(fields) => fields,
other => panic!("expected minValues struct, got {other:?}"),
};
assert!(min_values.field("a").is_some());
assert!(min_values.field("p").is_none());
assert_eq!(1, min_values.fields().count());
}
#[test]
fn test_stats_schema_n_cols() {
let properties: TableProperties = [(
"delta.dataSkippingNumIndexedCols".to_string(),
"1".to_string(),
)]
.into();
let logical_schema = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let stats_schema = stats_schema(&logical_schema, &properties);
let expected_fields =
StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap();
let null_count = NullCountStatsTransform
.transform_struct(&expected_fields)
.unwrap()
.into_owned();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", null_count),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_different_fields_in_null_vs_minmax() {
let properties: TableProperties = [("key", "value")].into();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::BOOLEAN),
StructField::nullable("metadata", DataType::BINARY),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_count = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::LONG),
StructField::nullable("metadata", DataType::LONG),
])
.unwrap();
let expected_min_max = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::BOOLEAN),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
StructField::nullable("minValues", expected_min_max.clone()),
StructField::nullable("maxValues", expected_min_max),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_nested_different_fields_in_null_vs_minmax() {
let properties: TableProperties = [("key", "value")].into();
let user_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING), StructField::nullable("is_admin", DataType::BOOLEAN), StructField::nullable("age", DataType::INTEGER), StructField::nullable("profile_pic", DataType::BINARY), ])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(user_struct.clone()))),
StructField::nullable("is_deleted", DataType::BOOLEAN), ])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_user = StructType::try_new([
StructField::nullable("name", DataType::LONG),
StructField::nullable("is_admin", DataType::LONG),
StructField::nullable("age", DataType::LONG),
StructField::nullable("profile_pic", DataType::LONG),
])
.unwrap();
let expected_null_count = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(expected_null_user))),
StructField::nullable("is_deleted", DataType::LONG),
])
.unwrap();
let expected_minmax_user = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("is_admin", DataType::BOOLEAN),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let expected_min_max = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(expected_minmax_user))),
StructField::nullable("is_deleted", DataType::BOOLEAN),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
StructField::nullable("minValues", expected_min_max.clone()),
StructField::nullable("maxValues", expected_min_max),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_only_non_eligible_fields() {
let properties: TableProperties = [("key", "value")].into();
let file_schema = StructType::try_new([
StructField::nullable("metadata", DataType::BINARY),
StructField::nullable(
"tags",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))),
),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_count = StructType::try_new([
StructField::nullable("metadata", DataType::LONG),
StructField::nullable("tags", DataType::LONG),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_partitions_schema() -> DeltaResultLocal<()> {
let logical_schema = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let result = partitions_schema(&logical_schema, &[])?;
assert_eq!(None, result);
Ok(())
}
#[tokio::test]
async fn test_partition_schema2() {
let schema = StructType::try_new(vec![
StructField::new("id", DataType::LONG, true),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
])
.unwrap();
let partition_columns = vec!["date".to_string()];
let expected =
StructType::try_new(vec![StructField::new("date", DataType::DATE, true)]).unwrap();
assert_eq!(
partitions_schema(&schema, &partition_columns).unwrap(),
Some(expected)
);
assert_eq!(partitions_schema(&schema, &[]).unwrap(), None);
}
}