use std::sync::LazyLock;
use crate::actions::{MAX_VALUES, MIN_VALUES, NULL_COUNT, NUM_RECORDS};
use crate::engine_data::{GetData, RowVisitor, TypedGetData as _};
use crate::error::Error;
use crate::expressions::{column_name, ColumnName};
use crate::schema::{ColumnNamesAndTypes, DataType, DecimalType, PrimitiveType};
use crate::utils::require;
use crate::DeltaResult;
pub struct StatsColumnVerifier {
required_columns: Vec<(ColumnName, DataType)>,
}
impl StatsColumnVerifier {
pub fn new(required_columns: Vec<(ColumnName, DataType)>) -> Self {
Self { required_columns }
}
pub fn verify(&self, add_files: &[Box<dyn crate::EngineData>]) -> DeltaResult<()> {
if self.required_columns.is_empty() {
return Ok(());
}
for (col, data_type) in &self.required_columns {
self.verify_column(add_files, col, data_type)?;
}
Ok(())
}
fn verify_column(
&self,
add_files: &[Box<dyn crate::EngineData>],
column: &ColumnName,
data_type: &DataType,
) -> DeltaResult<()> {
let column_names = vec![
ColumnName::new(["path"]),
ColumnName::new(["stats", NUM_RECORDS]),
build_stat_path(column, NULL_COUNT),
build_stat_path(column, MIN_VALUES),
build_stat_path(column, MAX_VALUES),
];
let types = column_types_for(data_type)?;
let mut missing_null_count: Vec<String> = Vec::new();
let mut missing_min: Vec<String> = Vec::new();
let mut missing_max: Vec<String> = Vec::new();
for batch in add_files {
let mut visitor = ColumnStatsValidator {
data_type,
types,
missing_null_count: &mut missing_null_count,
missing_min: &mut missing_min,
missing_max: &mut missing_max,
};
batch.visit_rows(&column_names, &mut visitor)?;
}
if !missing_null_count.is_empty() {
return Err(Error::stats_validation(format!(
"Required column '{column}' is missing 'nullCount' statistics for files: [{}]",
missing_null_count.join(", ")
)));
}
if !missing_min.is_empty() {
return Err(Error::stats_validation(format!(
"Required column '{column}' is missing 'minValues' statistics for files: [{}]",
missing_min.join(", ")
)));
}
if !missing_max.is_empty() {
return Err(Error::stats_validation(format!(
"Required column '{column}' is missing 'maxValues' statistics for files: [{}]",
missing_max.join(", ")
)));
}
Ok(())
}
}
fn build_stat_path(column: &ColumnName, category: &str) -> ColumnName {
let mut path = vec!["stats".to_string(), category.to_string()];
path.extend(column.iter().map(|s| s.to_string()));
ColumnName::new(path)
}
macro_rules! define_column_types {
($name:ident, $data_type:expr) => {
static $name: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
let names = vec![
column_name!("path"),
column_name!("nr"),
column_name!("nc"),
column_name!("min"),
column_name!("max"),
];
let types = vec![
DataType::STRING,
DataType::LONG,
DataType::LONG,
$data_type,
$data_type,
];
(names, types).into()
});
};
}
define_column_types!(COL_TYPES_BOOL, DataType::BOOLEAN);
define_column_types!(COL_TYPES_BYTE, DataType::BYTE);
define_column_types!(COL_TYPES_SHORT, DataType::SHORT);
define_column_types!(COL_TYPES_INT, DataType::INTEGER);
define_column_types!(COL_TYPES_LONG, DataType::LONG);
define_column_types!(COL_TYPES_STRING, DataType::STRING);
define_column_types!(COL_TYPES_BINARY, DataType::BINARY);
define_column_types!(COL_TYPES_FLOAT, DataType::FLOAT);
define_column_types!(COL_TYPES_DOUBLE, DataType::DOUBLE);
define_column_types!(COL_TYPES_DATE, DataType::DATE);
define_column_types!(COL_TYPES_TIMESTAMP, DataType::TIMESTAMP);
define_column_types!(COL_TYPES_TIMESTAMP_NTZ, DataType::TIMESTAMP_NTZ);
#[allow(clippy::unwrap_used)]
static COL_TYPES_DECIMAL: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
let names = vec![
column_name!("path"),
column_name!("nr"),
column_name!("nc"),
column_name!("min"),
column_name!("max"),
];
let types = vec![
DataType::STRING,
DataType::LONG,
DataType::LONG,
DataType::Primitive(PrimitiveType::Decimal(DecimalType::try_new(38, 0).unwrap())),
DataType::Primitive(PrimitiveType::Decimal(DecimalType::try_new(38, 0).unwrap())),
];
(names, types).into()
});
static NUM_RECORDS_TYPES: LazyLock<ColumnNamesAndTypes> = LazyLock::new(|| {
let names = vec![column_name!("path"), column_name!("stats.numRecords")];
let types = vec![DataType::STRING, DataType::LONG];
(names, types).into()
});
fn column_types_for(dt: &DataType) -> DeltaResult<&'static ColumnNamesAndTypes> {
match dt {
&DataType::BOOLEAN => Ok(&COL_TYPES_BOOL),
&DataType::BYTE => Ok(&COL_TYPES_BYTE),
&DataType::SHORT => Ok(&COL_TYPES_SHORT),
&DataType::INTEGER => Ok(&COL_TYPES_INT),
&DataType::LONG => Ok(&COL_TYPES_LONG),
&DataType::STRING => Ok(&COL_TYPES_STRING),
&DataType::BINARY => Ok(&COL_TYPES_BINARY),
&DataType::FLOAT => Ok(&COL_TYPES_FLOAT),
&DataType::DOUBLE => Ok(&COL_TYPES_DOUBLE),
&DataType::DATE => Ok(&COL_TYPES_DATE),
&DataType::TIMESTAMP => Ok(&COL_TYPES_TIMESTAMP),
&DataType::TIMESTAMP_NTZ => Ok(&COL_TYPES_TIMESTAMP_NTZ),
DataType::Primitive(PrimitiveType::Decimal(_)) => Ok(&COL_TYPES_DECIMAL),
&DataType::VOID
| DataType::Struct(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Variant(_) => Err(Error::internal_error(format!(
"Unsupported data type for stats validation: {dt}"
))),
}
}
fn is_stat_present<'b>(
getter: &'b dyn GetData<'b>,
row_idx: usize,
data_type: &DataType,
) -> DeltaResult<bool> {
let field_name = "stat";
match data_type {
&DataType::BOOLEAN => Ok(getter.get_bool(row_idx, field_name)?.is_some()),
&DataType::BYTE => Ok(getter.get_byte(row_idx, field_name)?.is_some()),
&DataType::SHORT => Ok(getter.get_short(row_idx, field_name)?.is_some()),
&DataType::INTEGER => Ok(getter.get_int(row_idx, field_name)?.is_some()),
&DataType::LONG => Ok(getter.get_long(row_idx, field_name)?.is_some()),
&DataType::FLOAT => Ok(getter.get_float(row_idx, field_name)?.is_some()),
&DataType::DOUBLE => Ok(getter.get_double(row_idx, field_name)?.is_some()),
&DataType::DATE => Ok(getter.get_date(row_idx, field_name)?.is_some()),
&DataType::TIMESTAMP | &DataType::TIMESTAMP_NTZ => {
Ok(getter.get_timestamp(row_idx, field_name)?.is_some())
}
&DataType::STRING => Ok(getter.get_str(row_idx, field_name)?.is_some()),
&DataType::BINARY => Ok(getter.get_binary(row_idx, field_name)?.is_some()),
DataType::Primitive(PrimitiveType::Decimal(_)) => {
Ok(getter.get_decimal(row_idx, field_name)?.is_some())
}
&DataType::VOID
| DataType::Struct(_)
| DataType::Array(_)
| DataType::Map(_)
| DataType::Variant(_) => Err(Error::internal_error(format!(
"Unsupported data type for stats presence check: {data_type}"
))),
}
}
struct ColumnStatsValidator<'a> {
data_type: &'a DataType,
types: &'static ColumnNamesAndTypes,
missing_null_count: &'a mut Vec<String>,
missing_min: &'a mut Vec<String>,
missing_max: &'a mut Vec<String>,
}
impl RowVisitor for ColumnStatsValidator<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
self.types.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 5,
Error::internal_error(format!(
"Expected 5 getters for column stats validation, got {}",
getters.len()
))
);
for row_idx in 0..row_count {
let path: String = getters[0].get(row_idx, "path")?;
let num_records = getters[1].get_long(row_idx, NUM_RECORDS)?;
let null_count = getters[2].get_long(row_idx, NULL_COUNT)?;
let all_null = matches!((num_records, null_count), (Some(nr), Some(nc)) if nr == nc);
if null_count.is_none() {
self.missing_null_count.push(path.clone());
}
if !(all_null || is_stat_present(getters[3], row_idx, self.data_type)?) {
self.missing_min.push(path.clone());
}
if !(all_null || is_stat_present(getters[4], row_idx, self.data_type)?) {
self.missing_max.push(path);
}
}
Ok(())
}
}
pub fn verify_num_records_present(add_files: &[Box<dyn crate::EngineData>]) -> DeltaResult<()> {
let column_names = vec![
ColumnName::new(["path"]),
ColumnName::new(["stats", NUM_RECORDS]),
];
let mut first_missing: Option<String> = None;
for batch in add_files {
let mut visitor = NumRecordsValidator {
first_missing: &mut first_missing,
};
batch.visit_rows(&column_names, &mut visitor)?;
if first_missing.is_some() {
break;
}
}
if let Some(path) = first_missing {
return Err(Error::stats_validation(format!(
"'stats.numRecords' is required for this table (see \
`TableConfiguration::requires_stats_num_records`), but is missing for file '{path}'",
)));
}
Ok(())
}
struct NumRecordsValidator<'a> {
first_missing: &'a mut Option<String>,
}
impl RowVisitor for NumRecordsValidator<'_> {
fn selected_column_names_and_types(&self) -> (&'static [ColumnName], &'static [DataType]) {
NUM_RECORDS_TYPES.as_ref()
}
fn visit<'b>(&mut self, row_count: usize, getters: &[&'b dyn GetData<'b>]) -> DeltaResult<()> {
require!(
getters.len() == 2,
Error::internal_error(format!(
"Expected 2 getters for numRecords validation, got {}",
getters.len()
))
);
for row_idx in 0..row_count {
if getters[1].get_long(row_idx, NUM_RECORDS)?.is_none() {
*self.first_missing = Some(getters[0].get(row_idx, "path")?);
return Ok(());
}
}
Ok(())
}
}