use super::{
result::{ValidationIssue, ValidationMetrics, ValidationReport},
Check, ConstraintStatus, Level, ValidationResult,
};
use crate::prelude::*;
use crate::telemetry::{utils, TermSpan, TermTelemetry};
use datafusion::prelude::*;
use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, error, info, instrument, warn};
#[derive(Debug, Clone)]
pub struct ValidationSuite {
name: String,
description: Option<String>,
checks: Vec<Arc<Check>>,
telemetry: Option<Arc<TermTelemetry>>,
use_optimizer: bool,
table_name: String,
}
impl ValidationSuite {
async fn run_sequential(
&self,
ctx: &SessionContext,
report: &mut ValidationReport,
metrics: &mut ValidationMetrics,
has_errors: &mut bool,
#[allow(unused_variables)] start_time: &Instant,
_suite_span: &mut TermSpan,
) -> Result<()> {
for check in &self.checks {
debug!(
check.name = %check.name(),
check.level = ?check.level(),
check.constraints = check.constraints().len(),
"Running validation check"
);
#[allow(unused_variables)]
let check_start = Instant::now();
let _check_span = if let Some(telemetry) = &self.telemetry {
telemetry.start_check_span(check.name(), check.constraints().len())
} else {
TermSpan::noop()
};
for constraint in check.constraints() {
metrics.total_checks += 1;
let mut constraint_span = if let Some(telemetry) = &self.telemetry {
let column = None; telemetry.start_constraint_span(constraint.name(), column)
} else {
TermSpan::noop()
};
let validation_ctx = crate::core::ValidationContext::new(self.table_name.clone());
let result = crate::core::validation_context::CURRENT_CONTEXT
.scope(validation_ctx, constraint.evaluate(ctx))
.await;
match result {
Ok(result) => {
if let Some(telemetry) = &self.telemetry {
if telemetry.detailed_metrics {
utils::record_constraint_result(&mut constraint_span, &result);
}
}
match result.status {
ConstraintStatus::Success => {
metrics.passed_checks += 1;
debug!(
constraint.name = %constraint.name(),
check.name = %check.name(),
constraint.metric = ?result.metric,
"Constraint passed"
);
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics_collector) = telemetry.metrics() {
let attrs = vec![
opentelemetry::KeyValue::new(
"check.name",
check.name().to_string(),
),
opentelemetry::KeyValue::new(
"check.type",
constraint.name().to_string(),
),
opentelemetry::KeyValue::new("check.passed", true),
];
metrics_collector.increment_checks_passed(&attrs);
}
}
}
ConstraintStatus::Failure => {
metrics.failed_checks += 1;
let failure_message = result.message.clone().unwrap_or_else(|| {
let name = constraint.name();
format!("Constraint {name} failed")
});
let issue = ValidationIssue {
check_name: check.name().to_string(),
constraint_name: constraint.name().to_string(),
level: check.level(),
message: failure_message.clone(),
metric: result.metric,
};
if check.level() == Level::Error {
*has_errors = true;
}
warn!(
constraint.name = %constraint.name(),
check.name = %check.name(),
check.level = ?check.level(),
failure.message = %issue.message,
constraint.metric = ?result.metric,
"Constraint failed"
);
report.add_issue(issue);
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics_collector) = telemetry.metrics() {
let attrs = vec![
opentelemetry::KeyValue::new(
"check.name",
check.name().to_string(),
),
opentelemetry::KeyValue::new(
"check.type",
constraint.name().to_string(),
),
opentelemetry::KeyValue::new("check.passed", false),
opentelemetry::KeyValue::new(
"failure.reason",
failure_message,
),
];
metrics_collector.increment_checks_failed(&attrs);
}
}
}
ConstraintStatus::Skipped => {
metrics.skipped_checks += 1;
debug!(
constraint.name = %constraint.name(),
check.name = %check.name(),
skip.reason = %result.message.as_deref().unwrap_or("No reason provided"),
"Constraint skipped"
);
}
}
if let Some(metric_value) = result.metric {
let check_name = check.name();
let constraint_name = constraint.name();
let metric_name = format!("{check_name}.{constraint_name}");
metrics
.custom_metrics
.insert(metric_name.clone(), metric_value);
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics_collector) = telemetry.metrics() {
let attrs = vec![
opentelemetry::KeyValue::new("metric.name", metric_name),
opentelemetry::KeyValue::new(
"check.name",
check.name().to_string(),
),
opentelemetry::KeyValue::new(
"constraint.type",
constraint.name().to_string(),
),
];
metrics_collector.record_custom_metric(metric_value, &attrs);
}
}
}
}
Err(e) => {
constraint_span.record_error(&e as &dyn std::error::Error);
metrics.failed_checks += 1;
let issue = ValidationIssue {
check_name: check.name().to_string(),
constraint_name: constraint.name().to_string(),
level: check.level(),
message: format!("Error evaluating constraint: {e}"),
metric: None,
};
if check.level() == Level::Error {
*has_errors = true;
}
error!(
constraint.name = %constraint.name(),
check.name = %check.name(),
error = %e,
error.type = "constraint_evaluation",
"Error evaluating constraint"
);
report.add_issue(issue);
}
}
}
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics_collector) = telemetry.metrics() {
let check_duration = check_start.elapsed().as_secs_f64();
let attrs = vec![
opentelemetry::KeyValue::new("check.name", check.name().to_string()),
opentelemetry::KeyValue::new(
"check.constraint_count",
check.constraints().len() as i64,
),
];
metrics_collector.record_check_duration(check_duration, &attrs);
}
}
}
Ok(())
}
fn record_final_metrics(
&self,
metrics: &ValidationMetrics,
has_errors: bool,
start_time: &Instant,
suite_span: &mut TermSpan,
) {
let _ = start_time;
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics_collector) = telemetry.metrics() {
let suite_duration = start_time.elapsed().as_secs_f64();
let attrs = vec![
opentelemetry::KeyValue::new("suite.name", self.name.clone()),
opentelemetry::KeyValue::new("suite.passed", !has_errors),
opentelemetry::KeyValue::new("checks.total", metrics.total_checks as i64),
opentelemetry::KeyValue::new("checks.passed", metrics.passed_checks as i64),
opentelemetry::KeyValue::new("checks.failed", metrics.failed_checks as i64),
];
metrics_collector.record_validation_duration(suite_duration, &attrs);
if has_errors {
metrics_collector.increment_validation_failures(&attrs);
}
}
}
if let Some(telemetry) = &self.telemetry {
if telemetry.record_timing {
utils::record_validation_metrics(
suite_span,
metrics.passed_checks as u32,
metrics.failed_checks as u32,
metrics.skipped_checks as u32,
metrics.execution_time_ms,
);
}
}
info!(
suite.name = %self.name,
metrics.passed = metrics.passed_checks,
metrics.failed = metrics.failed_checks,
metrics.skipped = metrics.skipped_checks,
metrics.total = metrics.total_checks,
metrics.duration_ms = metrics.execution_time_ms,
metrics.success_rate = %format!("{:.2}%", metrics.success_rate()),
suite.result = %if has_errors { "failed" } else { "passed" },
"Validation suite completed"
);
}
pub fn builder(name: impl Into<String>) -> ValidationSuiteBuilder {
ValidationSuiteBuilder::new(name)
}
pub fn name(&self) -> &str {
&self.name
}
pub fn description(&self) -> Option<&str> {
self.description.as_deref()
}
pub fn checks(&self) -> &[Arc<Check>] {
&self.checks
}
pub fn telemetry_enabled(&self) -> bool {
self.telemetry.is_some()
}
pub fn telemetry(&self) -> Option<&Arc<TermTelemetry>> {
self.telemetry.as_ref()
}
pub fn optimizer_enabled(&self) -> bool {
self.use_optimizer
}
#[instrument(skip(self, ctx), fields(
suite.name = %self.name,
suite.checks = self.checks.len(),
telemetry.enabled = self.telemetry_enabled()
))]
pub async fn run(&self, ctx: &SessionContext) -> Result<ValidationResult> {
info!(
suite.name = %self.name,
suite.checks = self.checks.len(),
suite.description = ?self.description,
"Starting validation suite"
);
let start_time = Instant::now();
#[cfg(feature = "telemetry")]
let _active_guard = if let Some(telemetry) = &self.telemetry {
telemetry.metrics().map(|m| m.start_validation())
} else {
None
};
let mut suite_span = if let Some(telemetry) = &self.telemetry {
telemetry.start_suite_span(&self.name, self.checks.len())
} else {
TermSpan::noop()
};
#[cfg(feature = "telemetry")]
if let Some(telemetry) = &self.telemetry {
if let Some(metrics) = telemetry.metrics() {
let attrs = vec![
opentelemetry::KeyValue::new("suite.name", self.name.clone()),
opentelemetry::KeyValue::new("check.count", self.checks.len() as i64),
];
metrics.increment_validation_runs(&attrs);
let table_query = format!("SELECT COUNT(*) as row_count FROM {}", self.table_name);
if let Ok(df) = ctx.sql(&table_query).await {
if let Ok(batches) = df.collect().await {
if !batches.is_empty() && batches[0].num_rows() > 0 {
if let Some(array) = batches[0]
.column(0)
.as_any()
.downcast_ref::<arrow::array::Int64Array>()
{
let row_count = array.value(0) as u64;
metrics.add_rows_processed(row_count, &attrs);
}
}
}
}
}
}
let mut report = ValidationReport::new(&self.name);
let mut metrics = ValidationMetrics::new();
let mut has_errors = false;
if self.use_optimizer {
warn!("Query optimizer is not yet implemented, falling back to sequential execution");
self.run_sequential(
ctx,
&mut report,
&mut metrics,
&mut has_errors,
&start_time,
&mut suite_span,
)
.await?;
} else {
self.run_sequential(
ctx,
&mut report,
&mut metrics,
&mut has_errors,
&start_time,
&mut suite_span,
)
.await?;
}
metrics.execution_time_ms = start_time.elapsed().as_millis() as u64;
report.metrics = metrics.clone();
self.record_final_metrics(&metrics, has_errors, &start_time, &mut suite_span);
info!(
suite.name = %self.name,
metrics.passed = metrics.passed_checks,
metrics.failed = metrics.failed_checks,
"Validation suite completed (optimized)"
);
if has_errors {
Ok(ValidationResult::failure(report))
} else {
Ok(ValidationResult::success(metrics, report))
}
}
}
#[derive(Debug)]
pub struct ValidationSuiteBuilder {
name: String,
description: Option<String>,
checks: Vec<Arc<Check>>,
telemetry: Option<Arc<TermTelemetry>>,
use_optimizer: bool,
table_name: String,
}
impl ValidationSuiteBuilder {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
description: None,
checks: Vec::new(),
telemetry: None,
use_optimizer: false,
table_name: "data".to_string(),
}
}
pub fn description(mut self, description: impl Into<String>) -> Self {
self.description = Some(description.into());
self
}
pub fn table_name(mut self, table_name: impl Into<String>) -> Self {
self.table_name = table_name.into();
self
}
pub fn check(mut self, check: Check) -> Self {
self.checks.push(Arc::new(check));
self
}
pub fn checks<I>(mut self, checks: I) -> Self
where
I: IntoIterator<Item = Check>,
{
self.checks.extend(checks.into_iter().map(Arc::new));
self
}
pub fn with_telemetry(mut self, telemetry: TermTelemetry) -> Self {
self.telemetry = Some(Arc::new(telemetry));
self
}
pub fn with_optimizer(mut self, enabled: bool) -> Self {
self.use_optimizer = enabled;
self
}
pub fn build(self) -> ValidationSuite {
ValidationSuite {
name: self.name,
description: self.description,
checks: self.checks,
telemetry: self.telemetry,
use_optimizer: self.use_optimizer,
table_name: self.table_name,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_validation_suite_builder() {
let suite = ValidationSuite::builder("test_suite")
.description("Test validation suite")
.check(Check::builder("test_check").build())
.build();
assert_eq!(suite.name(), "test_suite");
assert_eq!(suite.description(), Some("Test validation suite"));
assert!(!suite.telemetry_enabled()); assert_eq!(suite.checks().len(), 1);
}
#[test]
fn test_validation_suite_default_telemetry() {
let suite = ValidationSuite::builder("test_suite").build();
assert!(!suite.telemetry_enabled()); }
#[cfg(feature = "telemetry")]
#[test]
fn test_validation_suite_with_telemetry() {
let telemetry = TermTelemetry::disabled();
let suite = ValidationSuite::builder("test_suite")
.with_telemetry(telemetry)
.build();
assert!(suite.telemetry_enabled());
}
#[test]
fn test_validation_suite_with_optimizer() {
let suite = ValidationSuite::builder("test_suite")
.with_optimizer(true)
.build();
assert!(suite.optimizer_enabled());
let suite_no_opt = ValidationSuite::builder("test_suite")
.with_optimizer(false)
.build();
assert!(!suite_no_opt.optimizer_enabled());
let suite_default = ValidationSuite::builder("test_suite").build();
assert!(!suite_default.optimizer_enabled());
}
}