use polars::prelude::*;
use crate::infrastructure::polars::kind::PolarsKind;
use crate::invariant::Invariant;
use crate::violation::Violation;
mod column;
mod count;
mod custom;
mod date;
mod domain;
mod null;
mod numeric;
mod relational;
mod stat;
mod string;
mod unique;
pub type CheckResult = Result<Vec<Violation>, Box<dyn std::error::Error>>;
pub fn run_all(df: &DataFrame, invariants: &[Invariant<PolarsKind>]) -> CheckResult {
let mut violations = Vec::with_capacity(invariants.len());
for inv in invariants {
match inv.kind() {
PolarsKind::ColumnExists => {
if let Some(v) = column::column_exists::run_direct(df, inv) {
violations.push(v);
}
}
PolarsKind::ColumnMissing => {
if let Some(v) = column::column_missing::run_direct(df, inv) {
violations.push(v);
}
}
PolarsKind::DTypeIs => {
if let Some(v) = column::dtype_is::run_direct(df, inv) {
violations.push(v);
}
}
PolarsKind::SchemaEquals => {
if let Some(v) = column::schema_equals::run_direct(df, inv) {
violations.push(v);
}
}
_ => {}
}
}
let row_count = df.height().to_string();
let mut projections = Vec::with_capacity(invariants.len());
let mut planned_invariants: Vec<Invariant<PolarsKind>> = Vec::with_capacity(invariants.len());
for inv in invariants {
match inv.kind() {
PolarsKind::ColumnExists
| PolarsKind::ColumnMissing
| PolarsKind::DTypeIs
| PolarsKind::SchemaEquals => {
}
kind => {
let enriched;
let effective = match kind {
PolarsKind::Unique
| PolarsKind::CompositeUnique
| PolarsKind::DuplicateRatioMax
if !inv.has_param("row_count_cache") =>
{
enriched = inv.clone().with_param_value("row_count_cache", &row_count);
&enriched
}
_ => inv,
};
if let Some(expr) = plan_expr(effective) {
projections.push(expr.alias(effective.id().as_str()));
planned_invariants.push(effective.clone());
}
}
}
}
if projections.is_empty() {
return Ok(violations);
}
let result = df.clone().lazy().select(projections).collect()?;
for inv in &planned_invariants {
let col = result.column(inv.id().as_str())?;
let value = col.get(0)?;
if let Some(v) = map_violation(inv, value) {
violations.push(v);
}
}
Ok(violations)
}
fn plan_expr(inv: &Invariant<PolarsKind>) -> Option<Expr> {
match inv.kind() {
PolarsKind::ColumnExists
| PolarsKind::ColumnMissing
| PolarsKind::DTypeIs
| PolarsKind::SchemaEquals => None,
PolarsKind::NotNull => null::not_null::plan(inv),
PolarsKind::NullRatioMax => null::null_ratio_max::plan(inv),
PolarsKind::Unique => unique::n_unique::plan(inv),
PolarsKind::CompositeUnique => unique::composite_unique::plan(inv),
PolarsKind::DuplicateRatioMax => unique::duplicate_ratio_max::plan(inv),
PolarsKind::RowCountMin => Some(count::plan_row_count()),
PolarsKind::RowCountMax => Some(count::plan_row_count()),
PolarsKind::RowCountBetween => Some(count::plan_row_count()),
PolarsKind::ValueMin => numeric::value_min::plan(inv),
PolarsKind::ValueMax => numeric::value_max::plan(inv),
PolarsKind::ValueBetween => numeric::value_between::plan(inv),
PolarsKind::MeanBetween => numeric::mean_between::plan(inv),
PolarsKind::StdDevMax => numeric::stddev_max::plan(inv),
PolarsKind::SumBetween => numeric::sum_between::plan(inv),
PolarsKind::DateBetween => date::date_between::plan(inv),
PolarsKind::NoFutureDates => date::no_future_dates::plan(inv),
PolarsKind::MonotonicIncreasing => date::monotonic_increasing::plan(inv),
PolarsKind::NoGapsInSequence => date::no_gaps_in_sequence::plan(inv),
PolarsKind::RegexMatch => string::regex_match::plan(inv),
PolarsKind::StringLengthMin => string::string_length_min::plan(inv),
PolarsKind::StringLengthMax => string::string_length_max::plan(inv),
PolarsKind::StringLengthBetween => string::string_length_between::plan(inv),
PolarsKind::AllowedValues => domain::allowed_values::plan(inv),
PolarsKind::ForbiddenValues => domain::forbidden_values::plan(inv),
PolarsKind::OutlierRatioMax => stat::outlier_ratio_max::plan(inv),
PolarsKind::PercentileBetween => stat::percentile_between::plan(inv),
PolarsKind::ForeignKey => relational::foreign_key::plan(inv),
PolarsKind::ColumnEquals => relational::column_equals::plan(inv),
PolarsKind::ConditionalNotNull => relational::conditional_not_null::plan(inv),
PolarsKind::CustomExpr => custom::plan(inv),
}
}
fn map_violation(inv: &Invariant<PolarsKind>, value: AnyValue) -> Option<Violation> {
match inv.kind() {
PolarsKind::ColumnExists
| PolarsKind::ColumnMissing
| PolarsKind::DTypeIs
| PolarsKind::SchemaEquals => None,
PolarsKind::NotNull => null::not_null::map(inv, value),
PolarsKind::NullRatioMax => null::null_ratio_max::map(inv, value),
PolarsKind::Unique => unique::n_unique::map(inv, value),
PolarsKind::CompositeUnique => unique::composite_unique::map(inv, value),
PolarsKind::DuplicateRatioMax => unique::duplicate_ratio_max::map(inv, value),
PolarsKind::RowCountMin => count::map_row_count(inv, value),
PolarsKind::RowCountMax => count::map_row_count(inv, value),
PolarsKind::RowCountBetween => count::map_row_count(inv, value),
PolarsKind::ValueMin => numeric::map_count_violation(inv, value, "value_min"),
PolarsKind::ValueMax => numeric::map_count_violation(inv, value, "value_max"),
PolarsKind::ValueBetween => numeric::map_count_violation(inv, value, "value_between"),
PolarsKind::MeanBetween => numeric::mean_between::map(inv, value),
PolarsKind::StdDevMax => numeric::stddev_max::map(inv, value),
PolarsKind::SumBetween => numeric::sum_between::map(inv, value),
PolarsKind::DateBetween => date::map(inv, value),
PolarsKind::NoFutureDates => date::map(inv, value),
PolarsKind::MonotonicIncreasing => date::map(inv, value),
PolarsKind::NoGapsInSequence => date::map(inv, value),
PolarsKind::RegexMatch => string::regex_match::map(inv, value),
PolarsKind::StringLengthMin => string::map(inv, value),
PolarsKind::StringLengthMax => string::map(inv, value),
PolarsKind::StringLengthBetween => string::map(inv, value),
PolarsKind::AllowedValues => domain::allowed_values::map(inv, value),
PolarsKind::ForbiddenValues => domain::forbidden_values::map(inv, value),
PolarsKind::OutlierRatioMax => stat::outlier_ratio_max::map(inv, value),
PolarsKind::PercentileBetween => stat::percentile_between::map(inv, value),
PolarsKind::ForeignKey => relational::foreign_key::map(inv, value),
PolarsKind::ColumnEquals => relational::column_equals::map(inv, value),
PolarsKind::ConditionalNotNull => relational::conditional_not_null::map(inv, value),
PolarsKind::CustomExpr => custom::map(inv, value),
}
}