invr 0.2.1

Declarative data validation engine using invariants executed on Polars DataFrames.
use crate::infrastructure::polars::kind::PolarsKind;
use crate::invariant::Invariant;
use crate::scope::Scope;
use crate::violation::Violation;
use crate::violation::value_object::metric_value::MetricValue;
use polars::prelude::AnyValue;
use polars::prelude::*;

/// Builds the Polars expression counting rows where a target column
/// is NULL while a given condition on another column is satisfied.
///
/// Required parameters:
/// - `condition_column`: name of the column used in the condition
/// - `condition_value`: value that activates the null constraint
///
/// Scope:
/// - Requires `Scope::Column`
///
/// Behavior:
/// - Compares `condition_column == condition_value`
/// - Checks whether the target column is NULL
/// - Counts rows where BOTH conditions are true
///
/// The resulting metric represents the number of rows that violate
/// the conditional-not-null constraint.
pub fn plan(inv: &Invariant<PolarsKind>) -> Option<Expr> {
    let Scope::Column { name } = inv.scope() else {
        return None;
    };

    let condition_column = inv.require_param("condition_column").ok()?;
    let condition_value = inv.require_param("condition_value").ok()?;

    Some(
        col(condition_column)
            .eq(lit(condition_value))
            .and(col(name).is_null())
            .sum(),
    )
}

/// Converts the computed conditional null violation count into a `Violation`.
///
/// Logic:
/// - Reads `violation_count` from the evaluated expression
/// - Returns a violation if `violation_count > 0`
///
/// Produced metric:
/// - `violation_count` (integer)
pub fn map(inv: &Invariant<PolarsKind>, value: AnyValue) -> Option<Violation> {
    let violation_count = value.try_extract::<i64>().ok()?;

    if violation_count > 0 {
        Some(
            Violation::new(
                inv.id().clone(),
                inv.severity(),
                inv.scope().clone(),
                format!("{violation_count} rows violate conditional not null rule"),
            )
            .with_metric("violation_count", MetricValue::Int(violation_count)),
        )
    } else {
        None
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::domain::invariant::value_object::id::InvariantId;
    use std::collections::BTreeMap;

    fn make_invariant(
        target: &str,
        condition_col: &str,
        condition_val: &str,
    ) -> Invariant<PolarsKind> {
        let id = InvariantId::new("conditional_not_null_test").unwrap();
        let mut params = BTreeMap::new();
        params.insert("condition_column".to_string(), condition_col.to_string());
        params.insert("condition_value".to_string(), condition_val.to_string());

        Invariant::new(
            id,
            PolarsKind::ConditionalNotNull,
            Scope::Column {
                name: target.to_string(),
            },
        )
        .with_params(params)
    }

    fn df(cond: Vec<&str>, target: Vec<Option<i32>>) -> DataFrame {
        let s1 = Series::new("cond".into(), cond);
        let s2 = Series::new("a".into(), target);
        let height = s1.len();
        DataFrame::new(height, vec![s1.into(), s2.into()]).unwrap()
    }

    #[test]
    fn test_plan_returns_expr() {
        let inv = make_invariant("a", "cond", "X");
        assert!(plan(&inv).is_some());
    }

    #[test]
    fn test_no_violation_when_condition_not_met() {
        let df = df(vec!["A", "B"], vec![None, None]);
        let inv = make_invariant("a", "cond", "X");

        let result = df.lazy().select([plan(&inv).unwrap()]).collect().unwrap();

        let count = result.columns()[0]
            .get(0)
            .unwrap()
            .try_extract::<i64>()
            .unwrap();

        assert_eq!(count, 0);
    }

    #[test]
    fn test_violation_when_condition_met_and_null() {
        let df = df(vec!["X", "X"], vec![None, Some(1)]);
        let inv = make_invariant("a", "cond", "X");

        let result = df.lazy().select([plan(&inv).unwrap()]).collect().unwrap();

        let count = result.columns()[0]
            .get(0)
            .unwrap()
            .try_extract::<i64>()
            .unwrap();

        assert_eq!(count, 1);
    }

    #[test]
    fn test_wrong_scope_returns_none() {
        let id = InvariantId::new("wrong_scope").unwrap();
        let inv = Invariant::new(id, PolarsKind::ConditionalNotNull, Scope::Dataset);

        assert!(plan(&inv).is_none());
    }
}