use std::borrow::Cow;
use std::sync::Arc;
use arrow_schema::{
DataType as ArrowDataType, Field, Schema as ArrowSchema, SchemaRef as ArrowSchemaRef,
};
use delta_kernel::arrow::array::BooleanArray;
use delta_kernel::arrow::compute::filter_record_batch;
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoArrow;
use delta_kernel::engine::arrow_data::ArrowEngineData;
use delta_kernel::expressions::{ColumnName, Scalar, StructData};
use delta_kernel::scan::ScanMetadata;
use delta_kernel::schema::{
ArrayType, DataType, MapType, PrimitiveType, Schema, SchemaRef, StructField, StructType,
};
use delta_kernel::snapshot::Snapshot;
use delta_kernel::table_configuration::TableConfiguration;
use delta_kernel::table_properties::{DataSkippingNumIndexedCols, TableProperties};
use delta_kernel::transforms::SchemaTransform;
use delta_kernel::{DeltaResult, ExpressionEvaluator};
use crate::errors::{DeltaResult as DeltaResultLocal, DeltaTableError};
use crate::kernel::SCAN_ROW_ARROW_SCHEMA;
pub(crate) trait SnapshotExt {
fn stats_schema(&self) -> DeltaResult<SchemaRef>;
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>>;
fn scan_row_parsed_schema_arrow(&self) -> DeltaResultLocal<ArrowSchemaRef> {
let mut fields = SCAN_ROW_ARROW_SCHEMA.fields().to_vec();
let stats_idx = SCAN_ROW_ARROW_SCHEMA.index_of("stats").unwrap();
let stats_schema = self.stats_schema()?;
let stats_schema: ArrowSchema = stats_schema.as_ref().try_into_arrow()?;
fields[stats_idx] = Arc::new(Field::new(
"stats_parsed",
ArrowDataType::Struct(stats_schema.fields().to_owned()),
true,
));
if let Some(partition_schema) = self.partitions_schema()? {
let partition_schema: ArrowSchema = partition_schema.as_ref().try_into_arrow()?;
fields.push(Arc::new(Field::new(
"partitionValues_parsed",
ArrowDataType::Struct(partition_schema.fields().to_owned()),
false,
)));
}
let schema = Arc::new(ArrowSchema::new(fields));
Ok(schema)
}
}
impl SnapshotExt for TableConfiguration {
fn stats_schema(&self) -> DeltaResult<SchemaRef> {
Ok(Arc::new(stats_schema(
self.physical_schema().as_ref(),
self.table_properties(),
)))
}
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>> {
Ok(partitions_schema(
self.logical_schema().as_ref(),
self.metadata().partition_columns(),
)?
.map(Arc::new))
}
}
impl SnapshotExt for Snapshot {
fn stats_schema(&self) -> DeltaResult<SchemaRef> {
self.table_configuration().stats_schema()
}
fn partitions_schema(&self) -> DeltaResultLocal<Option<SchemaRef>> {
self.table_configuration().partitions_schema()
}
}
fn partitions_schema(
schema: &StructType,
partition_columns: &[String],
) -> DeltaResultLocal<Option<StructType>> {
if partition_columns.is_empty() {
return Ok(None);
}
Ok(Some(StructType::try_new(
partition_columns
.iter()
.map(|col| {
schema.field(col).cloned().ok_or_else(|| {
DeltaTableError::Generic(format!("Partition column {col} not found in schema"))
})
})
.collect::<Result<Vec<_>, _>>()?,
)?))
}
pub(crate) fn stats_schema(
physical_file_schema: &Schema,
table_properties: &TableProperties,
) -> Schema {
let mut fields = Vec::with_capacity(4);
fields.push(StructField::nullable("numRecords", DataType::LONG));
let mut base_transform = BaseStatsTransform::new(table_properties);
if let Some(base_schema) = base_transform.transform_struct(physical_file_schema) {
let base_schema = base_schema.into_owned();
let mut null_count_transform = NullCountStatsTransform;
if let Some(null_count_schema) = null_count_transform.transform_struct(&base_schema) {
fields.push(StructField::nullable(
"nullCount",
null_count_schema.into_owned(),
));
};
let mut min_max_transform = MinMaxStatsTransform;
if let Some(min_max_schema) = min_max_transform.transform_struct(&base_schema) {
let min_max_schema = min_max_schema.into_owned();
fields.push(StructField::nullable("minValues", min_max_schema.clone()));
fields.push(StructField::nullable("maxValues", min_max_schema));
}
}
StructType::try_new(fields).expect("Failed to construct StructType for stats_schema")
}
pub(crate) struct NullCountStatsTransform;
impl<'a> SchemaTransform<'a> for NullCountStatsTransform {
fn transform_primitive(&mut self, _ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
Some(Cow::Owned(PrimitiveType::Long))
}
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
use Cow::*;
if matches!(
&field.data_type,
DataType::Array(_) | DataType::Map(_) | DataType::Variant(_)
) {
return Some(Cow::Owned(StructField {
name: field.name.clone(),
data_type: DataType::LONG,
nullable: true,
metadata: Default::default(),
}));
}
match self.transform(&field.data_type)? {
Borrowed(_) => Some(Borrowed(field)),
dt => Some(Owned(StructField {
name: field.name.clone(),
data_type: dt.into_owned(),
nullable: true,
metadata: Default::default(),
})),
}
}
}
struct BaseStatsTransform {
n_columns: Option<DataSkippingNumIndexedCols>,
added_columns: u64,
column_names: Option<Vec<ColumnName>>,
path: Vec<String>,
}
impl BaseStatsTransform {
fn new(props: &TableProperties) -> Self {
if let Some(columns_names) = &props.data_skipping_stats_columns {
Self {
n_columns: None,
added_columns: 0,
column_names: Some(columns_names.clone()),
path: Vec::new(),
}
} else {
Self {
n_columns: Some(
props
.data_skipping_num_indexed_cols
.unwrap_or(DataSkippingNumIndexedCols::NumColumns(32)),
),
added_columns: 0,
column_names: None,
path: Vec::new(),
}
}
}
}
impl<'a> SchemaTransform<'a> for BaseStatsTransform {
fn transform_struct_field(&mut self, field: &'a StructField) -> Option<Cow<'a, StructField>> {
use Cow::*;
if let Some(DataSkippingNumIndexedCols::NumColumns(n_cols)) = self.n_columns
&& self.added_columns >= n_cols
{
return None;
}
self.path.push(field.name.clone());
let data_type = field.data_type();
let should_include = matches!(data_type, DataType::Struct(_))
|| self
.column_names
.as_ref()
.map(|ns| should_include_column(&ColumnName::new(&self.path), ns))
.unwrap_or(true);
if !should_include {
self.path.pop();
return None;
}
if !matches!(data_type, DataType::Struct(_)) {
self.added_columns += 1;
}
let field = match self.transform(&field.data_type)? {
Borrowed(_) if field.is_nullable() => Borrowed(field),
data_type => Owned(StructField {
name: field.name.clone(),
data_type: data_type.into_owned(),
nullable: true,
metadata: Default::default(),
}),
};
self.path.pop();
if matches!(field.data_type(), DataType::Struct(dt) if dt.num_fields() == 0) {
None
} else {
Some(field)
}
}
}
struct MinMaxStatsTransform;
impl<'a> SchemaTransform<'a> for MinMaxStatsTransform {
fn transform_array(&mut self, _: &'a ArrayType) -> Option<Cow<'a, ArrayType>> {
None
}
fn transform_map(&mut self, _: &'a MapType) -> Option<Cow<'a, MapType>> {
None
}
fn transform_variant(&mut self, _: &'a StructType) -> Option<Cow<'a, StructType>> {
None
}
fn transform_primitive(&mut self, ptype: &'a PrimitiveType) -> Option<Cow<'a, PrimitiveType>> {
if is_skipping_eligeble_datatype(ptype) {
Some(Cow::Borrowed(ptype))
} else {
None
}
}
}
fn should_include_column(column_name: &ColumnName, column_names: &[ColumnName]) -> bool {
column_names.iter().any(|name| {
name.as_ref().starts_with(column_name) || column_name.as_ref().starts_with(name)
})
}
fn is_skipping_eligeble_datatype(data_type: &PrimitiveType) -> bool {
matches!(
data_type,
&PrimitiveType::Byte
| &PrimitiveType::Short
| &PrimitiveType::Integer
| &PrimitiveType::Long
| &PrimitiveType::Float
| &PrimitiveType::Double
| &PrimitiveType::Boolean
| &PrimitiveType::Date
| &PrimitiveType::Timestamp
| &PrimitiveType::TimestampNtz
| &PrimitiveType::String
| PrimitiveType::Decimal(_)
)
}
pub(crate) fn rb_from_scan_meta(metadata: ScanMetadata) -> DeltaResult<RecordBatch> {
let (underlying_data, selection_vector) = metadata.scan_files.into_parts();
let batch = ArrowEngineData::try_from_engine_data(underlying_data)?.into();
Ok(filter_record_batch(
&batch,
&BooleanArray::from(selection_vector),
)?)
}
pub(crate) trait ExpressionEvaluatorExt {
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch>;
}
impl<T: ExpressionEvaluator + ?Sized> ExpressionEvaluatorExt for T {
fn evaluate_arrow(&self, batch: RecordBatch) -> DeltaResult<RecordBatch> {
let engine_data = ArrowEngineData::new(batch);
Ok(ArrowEngineData::try_from_engine_data(T::evaluate(self, &engine_data)?)?.into())
}
}
pub trait StructDataExt {
fn field(&self, name: &str) -> Option<&StructField>;
fn value(&self, index: usize) -> Option<&Scalar>;
fn index_of(&self, name: &str) -> Option<usize>;
}
impl StructDataExt for StructData {
fn field(&self, name: &str) -> Option<&StructField> {
self.fields().iter().find(|f| f.name() == name)
}
fn index_of(&self, name: &str) -> Option<usize> {
self.fields().iter().position(|f| f.name() == name)
}
fn value(&self, index: usize) -> Option<&Scalar> {
self.values().get(index)
}
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use super::*;
use delta_kernel::EvaluationHandler;
use delta_kernel::arrow::array::Int32Array;
use delta_kernel::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
use delta_kernel::arrow::record_batch::RecordBatch;
use delta_kernel::engine::arrow_conversion::TryIntoKernel;
use delta_kernel::engine::arrow_expression::ArrowEvaluationHandler;
use delta_kernel::expressions::*;
use delta_kernel::schema::{ArrayType, DataType};
use pretty_assertions::assert_eq;
#[test]
fn test_evaluate_arrow() {
let handler = ArrowEvaluationHandler;
let schema = Schema::new(vec![Field::new("a", ArrowDataType::Int32, false)]);
let values = Int32Array::from(vec![1, 2, 3]);
let batch = RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(values)]).unwrap();
let expression = column_expr!("a");
let expr = handler
.new_expression_evaluator(
Arc::new((&schema).try_into_kernel().unwrap()),
expression.into(),
DataType::INTEGER,
)
.unwrap();
let result = expr.evaluate_arrow(batch);
assert!(result.is_ok());
}
#[test]
fn test_should_include_column() {
let full_name = vec![ColumnName::new(["lvl1", "lvl2", "lvl3", "lvl4"])];
let parent = ColumnName::new(["lvl1", "lvl2", "lvl3"]);
assert!(should_include_column(&parent, &full_name));
assert!(should_include_column(&full_name[0], &full_name));
assert!(should_include_column(&full_name[0], &[parent]));
let not_parent = ColumnName::new(["lvl1", "lvl2", "lvl3", "lvl5"]);
assert!(!should_include_column(¬_parent, &full_name));
let not_parent = ColumnName::new(["lvl1", "lvl3", "lvl4"]);
assert!(!should_include_column(¬_parent, &full_name));
let not_parent = ColumnName::new(["lvl1", "lvl2", "lvl4"]);
assert!(!should_include_column(¬_parent, &full_name));
}
#[test]
fn test_stats_schema_simple() {
let properties: TableProperties = [("key", "value")].into();
let file_schema =
StructType::try_new([StructField::nullable("id", DataType::LONG)]).unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", file_schema.clone()),
StructField::nullable("minValues", file_schema.clone()),
StructField::nullable("maxValues", file_schema),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_with_non_eligible_field() {
let properties: TableProperties = [("key", "value")].into();
let array_type = DataType::Array(Box::new(ArrayType::new(DataType::STRING, false)));
let metadata_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("tags", array_type),
StructField::nullable("score", DataType::DOUBLE),
])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable(
"metadata",
DataType::Struct(Box::new(metadata_struct.clone())),
),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_nested = StructType::try_new([
StructField::nullable("name", DataType::LONG),
StructField::nullable("tags", DataType::LONG),
StructField::nullable("score", DataType::LONG),
])
.unwrap();
let expected_null = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("metadata", DataType::Struct(Box::new(expected_null_nested))),
])
.unwrap();
let expected_nested = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("score", DataType::DOUBLE),
])
.unwrap();
let expected_fields = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("metadata", DataType::Struct(Box::new(expected_nested))),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_col_names() {
let properties: TableProperties = [(
"delta.dataSkippingStatsColumns".to_string(),
"`user.info`.name".to_string(),
)]
.into();
let user_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user.info", DataType::Struct(Box::new(user_struct.clone()))),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_nested =
StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap();
let expected_fields = StructType::try_new([StructField::nullable(
"user.info",
DataType::Struct(Box::new(expected_nested)),
)])
.unwrap();
let null_count = NullCountStatsTransform
.transform_struct(&expected_fields)
.unwrap()
.into_owned();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", null_count),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_n_cols() {
let properties: TableProperties = [(
"delta.dataSkippingNumIndexedCols".to_string(),
"1".to_string(),
)]
.into();
let logical_schema = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let stats_schema = stats_schema(&logical_schema, &properties);
let expected_fields =
StructType::try_new([StructField::nullable("name", DataType::STRING)]).unwrap();
let null_count = NullCountStatsTransform
.transform_struct(&expected_fields)
.unwrap()
.into_owned();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", null_count),
StructField::nullable("minValues", expected_fields.clone()),
StructField::nullable("maxValues", expected_fields.clone()),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_different_fields_in_null_vs_minmax() {
let properties: TableProperties = [("key", "value")].into();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::BOOLEAN),
StructField::nullable("metadata", DataType::BINARY),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_count = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::LONG),
StructField::nullable("metadata", DataType::LONG),
])
.unwrap();
let expected_min_max = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("is_active", DataType::BOOLEAN),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
StructField::nullable("minValues", expected_min_max.clone()),
StructField::nullable("maxValues", expected_min_max),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_nested_different_fields_in_null_vs_minmax() {
let properties: TableProperties = [("key", "value")].into();
let user_struct = StructType::try_new([
StructField::nullable("name", DataType::STRING), StructField::nullable("is_admin", DataType::BOOLEAN), StructField::nullable("age", DataType::INTEGER), StructField::nullable("profile_pic", DataType::BINARY), ])
.unwrap();
let file_schema = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(user_struct.clone()))),
StructField::nullable("is_deleted", DataType::BOOLEAN), ])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_user = StructType::try_new([
StructField::nullable("name", DataType::LONG),
StructField::nullable("is_admin", DataType::LONG),
StructField::nullable("age", DataType::LONG),
StructField::nullable("profile_pic", DataType::LONG),
])
.unwrap();
let expected_null_count = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(expected_null_user))),
StructField::nullable("is_deleted", DataType::LONG),
])
.unwrap();
let expected_minmax_user = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("is_admin", DataType::BOOLEAN),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let expected_min_max = StructType::try_new([
StructField::nullable("id", DataType::LONG),
StructField::nullable("user", DataType::Struct(Box::new(expected_minmax_user))),
StructField::nullable("is_deleted", DataType::BOOLEAN),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
StructField::nullable("minValues", expected_min_max.clone()),
StructField::nullable("maxValues", expected_min_max),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_stats_schema_only_non_eligible_fields() {
let properties: TableProperties = [("key", "value")].into();
let file_schema = StructType::try_new([
StructField::nullable("metadata", DataType::BINARY),
StructField::nullable(
"tags",
DataType::Array(Box::new(ArrayType::new(DataType::STRING, false))),
),
])
.unwrap();
let stats_schema = stats_schema(&file_schema, &properties);
let expected_null_count = StructType::try_new([
StructField::nullable("metadata", DataType::LONG),
StructField::nullable("tags", DataType::LONG),
])
.unwrap();
let expected = StructType::try_new([
StructField::nullable("numRecords", DataType::LONG),
StructField::nullable("nullCount", expected_null_count),
])
.unwrap();
assert_eq!(&expected, &stats_schema);
}
#[test]
fn test_partitions_schema() -> DeltaResultLocal<()> {
let logical_schema = StructType::try_new([
StructField::nullable("name", DataType::STRING),
StructField::nullable("age", DataType::INTEGER),
])
.unwrap();
let result = partitions_schema(&logical_schema, &[])?;
assert_eq!(None, result);
Ok(())
}
#[tokio::test]
async fn test_partition_schema2() {
let schema = StructType::try_new(vec![
StructField::new("id", DataType::LONG, true),
StructField::new("name", DataType::STRING, true),
StructField::new("date", DataType::DATE, true),
])
.unwrap();
let partition_columns = vec!["date".to_string()];
let expected =
StructType::try_new(vec![StructField::new("date", DataType::DATE, true)]).unwrap();
assert_eq!(
partitions_schema(&schema, &partition_columns).unwrap(),
Some(expected)
);
assert_eq!(partitions_schema(&schema, &[]).unwrap(), None);
}
}