use crate::core::{current_validation_context, Constraint, ConstraintResult, ConstraintStatus};
use crate::error::Result;
use crate::security::SqlSecurity;
use arrow::array::Array;
use async_trait::async_trait;
use datafusion::execution::context::SessionContext;
use serde::{Deserialize, Serialize};
use std::fmt;
use tracing::instrument;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub enum LengthAssertion {
Min(usize),
Max(usize),
Between(usize, usize),
Exactly(usize),
NotEmpty,
}
impl LengthAssertion {
fn sql_condition(&self, column: &str) -> String {
match self {
LengthAssertion::Min(min) => format!("LENGTH({column}) >= {min}"),
LengthAssertion::Max(max) => format!("LENGTH({column}) <= {max}"),
LengthAssertion::Between(min, max) => {
format!("LENGTH({column}) >= {min} AND LENGTH({column}) <= {max}")
}
LengthAssertion::Exactly(len) => format!("LENGTH({column}) = {len}"),
LengthAssertion::NotEmpty => format!("LENGTH({column}) >= 1"),
}
}
fn name(&self) -> &str {
match self {
LengthAssertion::Min(_) => "min_length",
LengthAssertion::Max(_) => "max_length",
LengthAssertion::Between(_, _) => "length_between",
LengthAssertion::Exactly(_) => "exact_length",
LengthAssertion::NotEmpty => "not_empty",
}
}
fn description(&self) -> String {
match self {
LengthAssertion::Min(min) => format!("at least {min} characters"),
LengthAssertion::Max(max) => format!("at most {max} characters"),
LengthAssertion::Between(min, max) => format!("between {min} and {max} characters"),
LengthAssertion::Exactly(len) => format!("exactly {len} characters"),
LengthAssertion::NotEmpty => "not empty".to_string(),
}
}
}
impl fmt::Display for LengthAssertion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.description())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LengthConstraint {
column: String,
assertion: LengthAssertion,
}
impl LengthConstraint {
pub fn new(column: impl Into<String>, assertion: LengthAssertion) -> Self {
Self {
column: column.into(),
assertion,
}
}
pub fn min(column: impl Into<String>, min_length: usize) -> Self {
Self::new(column, LengthAssertion::Min(min_length))
}
pub fn max(column: impl Into<String>, max_length: usize) -> Self {
Self::new(column, LengthAssertion::Max(max_length))
}
pub fn between(column: impl Into<String>, min_length: usize, max_length: usize) -> Self {
assert!(min_length <= max_length, "min_length must be <= max_length");
Self::new(column, LengthAssertion::Between(min_length, max_length))
}
pub fn exactly(column: impl Into<String>, length: usize) -> Self {
Self::new(column, LengthAssertion::Exactly(length))
}
pub fn not_empty(column: impl Into<String>) -> Self {
Self::new(column, LengthAssertion::NotEmpty)
}
}
#[async_trait]
impl Constraint for LengthConstraint {
#[instrument(skip(self, ctx), fields(
column = %self.column,
assertion = %self.assertion
))]
async fn evaluate(&self, ctx: &SessionContext) -> Result<ConstraintResult> {
let column_identifier = SqlSecurity::escape_identifier(&self.column)?;
let condition = self.assertion.sql_condition(&column_identifier);
let validation_ctx = current_validation_context();
let table_name = validation_ctx.table_name();
let sql = format!(
"SELECT
COUNT(CASE WHEN {condition} OR {column_identifier} IS NULL THEN 1 END) * 1.0 / NULLIF(COUNT(*), 0) as ratio
FROM {table_name}"
);
let df = ctx.sql(&sql).await?;
let batches = df.collect().await?;
if batches.is_empty() || batches[0].num_rows() == 0 {
return Ok(ConstraintResult::skipped("No data to validate"));
}
let ratio_array = batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Float64Array>()
.ok_or_else(|| {
crate::error::TermError::constraint_evaluation(
self.name(),
"Failed to extract ratio from result",
)
})?;
if ratio_array.is_null(0) {
return Ok(ConstraintResult::skipped("No data to validate"));
}
let ratio = ratio_array.value(0);
let status = if ratio >= 1.0 {
ConstraintStatus::Success
} else {
ConstraintStatus::Failure
};
let message = if status == ConstraintStatus::Failure {
Some(format!(
"Length constraint failed: {:.2}% of values are {}",
ratio * 100.0,
self.assertion.description()
))
} else {
None
};
Ok(ConstraintResult {
status,
metric: Some(ratio),
message,
})
}
fn name(&self) -> &str {
self.assertion.name()
}
fn column(&self) -> Option<&str> {
Some(&self.column)
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow::array::StringArray;
use arrow::record_batch::RecordBatch;
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use std::sync::Arc;
use crate::test_helpers::evaluate_constraint_with_context;
async fn create_test_context(data: Vec<Option<&str>>) -> SessionContext {
let ctx = SessionContext::new();
let string_data = StringArray::from(data);
let schema = Arc::new(Schema::new(vec![Field::new("text", DataType::Utf8, true)]));
let batch = RecordBatch::try_new(schema, vec![Arc::new(string_data)]).unwrap();
ctx.register_batch("data", batch).unwrap();
ctx
}
#[tokio::test]
async fn test_min_length_constraint() {
let ctx = create_test_context(vec![
Some("hello"), Some("world"), Some("testing"), Some("great"), None, ])
.await;
let constraint = LengthConstraint::min("text", 5);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Success);
assert_eq!(result.metric, Some(1.0)); assert_eq!(constraint.name(), "min_length");
}
#[tokio::test]
async fn test_min_length_constraint_failure() {
let ctx = create_test_context(vec![
Some("hi"), Some("hello"), Some("a"), Some("testing"), None, ])
.await;
let constraint = LengthConstraint::min("text", 5);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Failure);
assert_eq!(result.metric, Some(0.6)); assert!(result.message.unwrap().contains("at least 5 characters"));
}
#[tokio::test]
async fn test_max_length_constraint() {
let ctx = create_test_context(vec![Some("hi"), Some("hey"), Some("test"), None]).await;
let constraint = LengthConstraint::max("text", 10);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Success);
assert_eq!(result.metric, Some(1.0));
assert_eq!(constraint.name(), "max_length");
}
#[tokio::test]
async fn test_max_length_constraint_failure() {
let ctx = create_test_context(vec![
Some("short"),
Some("this is a very long string that exceeds the limit"),
Some("ok"),
None,
])
.await;
let constraint = LengthConstraint::max("text", 10);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Failure);
assert_eq!(result.metric, Some(0.75)); assert!(result.message.unwrap().contains("at most 10 characters"));
}
#[tokio::test]
async fn test_between_length_constraint() {
let ctx = create_test_context(vec![
Some("hello"), Some("testing"), Some("hi"), Some("this is way too long"), None,
])
.await;
let constraint = LengthConstraint::between("text", 3, 10);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Failure);
assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "length_between");
assert!(result
.message
.unwrap()
.contains("between 3 and 10 characters"));
}
#[tokio::test]
async fn test_exactly_length_constraint() {
let ctx = create_test_context(vec![
Some("hello"), Some("world"), Some("test"), Some("testing"), None,
])
.await;
let constraint = LengthConstraint::exactly("text", 5);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Failure);
assert_eq!(result.metric, Some(0.6)); assert_eq!(constraint.name(), "exact_length");
assert!(result.message.unwrap().contains("exactly 5 characters"));
}
#[tokio::test]
async fn test_not_empty_constraint() {
let ctx = create_test_context(vec![
Some("hello"),
Some("a"), Some(""), Some("testing"),
None, ])
.await;
let constraint = LengthConstraint::not_empty("text");
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Failure);
assert_eq!(result.metric, Some(0.8)); assert_eq!(constraint.name(), "not_empty");
assert!(result.message.unwrap().contains("not empty"));
}
#[tokio::test]
async fn test_utf8_multibyte_characters() {
let ctx = create_test_context(vec![
Some("hello"), Some("你好"), Some("🦀🔥"), Some("café"), None,
])
.await;
let constraint = LengthConstraint::min("text", 2);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Success);
}
#[tokio::test]
async fn test_all_null_values() {
let ctx = create_test_context(vec![None, None, None]).await;
let constraint = LengthConstraint::min("text", 5);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Success);
assert_eq!(result.metric, Some(1.0));
}
#[tokio::test]
async fn test_empty_data() {
let ctx = create_test_context(vec![]).await;
let constraint = LengthConstraint::min("text", 5);
let result = evaluate_constraint_with_context(&constraint, &ctx, "data")
.await
.unwrap();
assert_eq!(result.status, ConstraintStatus::Skipped);
}
#[test]
fn test_length_assertion_display() {
assert_eq!(LengthAssertion::Min(5).to_string(), "at least 5 characters");
assert_eq!(
LengthAssertion::Max(10).to_string(),
"at most 10 characters"
);
assert_eq!(
LengthAssertion::Between(3, 8).to_string(),
"between 3 and 8 characters"
);
assert_eq!(
LengthAssertion::Exactly(6).to_string(),
"exactly 6 characters"
);
assert_eq!(LengthAssertion::NotEmpty.to_string(), "not empty");
}
#[test]
#[should_panic(expected = "min_length must be <= max_length")]
fn test_invalid_between_constraint() {
LengthConstraint::between("test", 10, 5); }
}