1use super::{
4 result::{ValidationIssue, ValidationMetrics, ValidationReport},
5 Check, ConstraintStatus, Level, ValidationResult,
6};
7use crate::prelude::*;
9use crate::telemetry::{utils, TermSpan, TermTelemetry};
10use datafusion::prelude::*;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, instrument, warn};
14
15#[derive(Debug, Clone)]
41pub struct ValidationSuite {
42 name: String,
44 description: Option<String>,
46 checks: Vec<Arc<Check>>,
48 telemetry: Option<Arc<TermTelemetry>>,
50 use_optimizer: bool,
52}
53
54impl ValidationSuite {
55 async fn run_sequential(
57 &self,
58 ctx: &SessionContext,
59 report: &mut ValidationReport,
60 metrics: &mut ValidationMetrics,
61 has_errors: &mut bool,
62 #[allow(unused_variables)] start_time: &Instant,
63 _suite_span: &mut TermSpan,
64 ) -> Result<()> {
65 for check in &self.checks {
66 debug!(
67 check.name = %check.name(),
68 check.level = ?check.level(),
69 check.constraints = check.constraints().len(),
70 "Running validation check"
71 );
72 #[allow(unused_variables)]
73 let check_start = Instant::now();
74
75 let _check_span = if let Some(telemetry) = &self.telemetry {
77 telemetry.start_check_span(check.name(), check.constraints().len())
78 } else {
79 TermSpan::noop()
80 };
81
82 for constraint in check.constraints() {
83 metrics.total_checks += 1;
84
85 let mut constraint_span = if let Some(telemetry) = &self.telemetry {
87 let column = None; telemetry.start_constraint_span(constraint.name(), column)
90 } else {
91 TermSpan::noop()
92 };
93
94 match constraint.evaluate(ctx).await {
95 Ok(result) => {
96 if let Some(telemetry) = &self.telemetry {
98 if telemetry.detailed_metrics {
99 utils::record_constraint_result(&mut constraint_span, &result);
100 }
101 }
102
103 match result.status {
104 ConstraintStatus::Success => {
105 metrics.passed_checks += 1;
106 debug!(
107 constraint.name = %constraint.name(),
108 check.name = %check.name(),
109 constraint.metric = ?result.metric,
110 "Constraint passed"
111 );
112
113 #[cfg(feature = "telemetry")]
115 if let Some(telemetry) = &self.telemetry {
116 if let Some(metrics_collector) = telemetry.metrics() {
117 let attrs = vec![
118 opentelemetry::KeyValue::new(
119 "check.name",
120 check.name().to_string(),
121 ),
122 opentelemetry::KeyValue::new(
123 "check.type",
124 constraint.name().to_string(),
125 ),
126 opentelemetry::KeyValue::new("check.passed", true),
127 ];
128 metrics_collector.increment_checks_passed(&attrs);
129 }
130 }
131 }
132 ConstraintStatus::Failure => {
133 metrics.failed_checks += 1;
134 let failure_message = result.message.clone().unwrap_or_else(|| {
135 let name = constraint.name();
136 format!("Constraint {name} failed")
137 });
138 let issue = ValidationIssue {
139 check_name: check.name().to_string(),
140 constraint_name: constraint.name().to_string(),
141 level: check.level(),
142 message: failure_message.clone(),
143 metric: result.metric,
144 };
145
146 if check.level() == Level::Error {
147 *has_errors = true;
148 }
149
150 warn!(
151 constraint.name = %constraint.name(),
152 check.name = %check.name(),
153 check.level = ?check.level(),
154 failure.message = %issue.message,
155 constraint.metric = ?result.metric,
156 "Constraint failed"
157 );
158 report.add_issue(issue);
159
160 #[cfg(feature = "telemetry")]
162 if let Some(telemetry) = &self.telemetry {
163 if let Some(metrics_collector) = telemetry.metrics() {
164 let attrs = vec![
165 opentelemetry::KeyValue::new(
166 "check.name",
167 check.name().to_string(),
168 ),
169 opentelemetry::KeyValue::new(
170 "check.type",
171 constraint.name().to_string(),
172 ),
173 opentelemetry::KeyValue::new("check.passed", false),
174 opentelemetry::KeyValue::new(
175 "failure.reason",
176 failure_message,
177 ),
178 ];
179 metrics_collector.increment_checks_failed(&attrs);
180 }
181 }
182 }
183 ConstraintStatus::Skipped => {
184 metrics.skipped_checks += 1;
185 debug!(
186 constraint.name = %constraint.name(),
187 check.name = %check.name(),
188 skip.reason = %result.message.as_deref().unwrap_or("No reason provided"),
189 "Constraint skipped"
190 );
191 }
192 }
193
194 if let Some(metric_value) = result.metric {
196 let check_name = check.name();
197 let constraint_name = constraint.name();
198 let metric_name = format!("{check_name}.{constraint_name}");
199 metrics
200 .custom_metrics
201 .insert(metric_name.clone(), metric_value);
202
203 #[cfg(feature = "telemetry")]
205 if let Some(telemetry) = &self.telemetry {
206 if let Some(metrics_collector) = telemetry.metrics() {
207 let attrs = vec![
208 opentelemetry::KeyValue::new("metric.name", metric_name),
209 opentelemetry::KeyValue::new(
210 "check.name",
211 check.name().to_string(),
212 ),
213 opentelemetry::KeyValue::new(
214 "constraint.type",
215 constraint.name().to_string(),
216 ),
217 ];
218 metrics_collector.record_custom_metric(metric_value, &attrs);
219 }
220 }
221 }
222 }
223 Err(e) => {
224 constraint_span.record_error(&e as &dyn std::error::Error);
226
227 metrics.failed_checks += 1;
228 let issue = ValidationIssue {
229 check_name: check.name().to_string(),
230 constraint_name: constraint.name().to_string(),
231 level: check.level(),
232 message: format!("Error evaluating constraint: {e}"),
233 metric: None,
234 };
235
236 if check.level() == Level::Error {
237 *has_errors = true;
238 }
239
240 error!(
241 constraint.name = %constraint.name(),
242 check.name = %check.name(),
243 error = %e,
244 error.type = "constraint_evaluation",
245 "Error evaluating constraint"
246 );
247 report.add_issue(issue);
248 }
249 }
250 }
251
252 #[cfg(feature = "telemetry")]
254 if let Some(telemetry) = &self.telemetry {
255 if let Some(metrics_collector) = telemetry.metrics() {
256 let check_duration = check_start.elapsed().as_secs_f64();
257 let attrs = vec![
258 opentelemetry::KeyValue::new("check.name", check.name().to_string()),
259 opentelemetry::KeyValue::new(
260 "check.constraint_count",
261 check.constraints().len() as i64,
262 ),
263 ];
264 metrics_collector.record_check_duration(check_duration, &attrs);
265 }
266 }
267 }
268
269 Ok(())
270 }
271
272 fn record_final_metrics(
274 &self,
275 metrics: &ValidationMetrics,
276 has_errors: bool,
277 start_time: &Instant,
278 suite_span: &mut TermSpan,
279 ) {
280 let _ = start_time;
282
283 #[cfg(feature = "telemetry")]
285 if let Some(telemetry) = &self.telemetry {
286 if let Some(metrics_collector) = telemetry.metrics() {
287 let suite_duration = start_time.elapsed().as_secs_f64();
288 let attrs = vec![
289 opentelemetry::KeyValue::new("suite.name", self.name.clone()),
290 opentelemetry::KeyValue::new("suite.passed", !has_errors),
291 opentelemetry::KeyValue::new("checks.total", metrics.total_checks as i64),
292 opentelemetry::KeyValue::new("checks.passed", metrics.passed_checks as i64),
293 opentelemetry::KeyValue::new("checks.failed", metrics.failed_checks as i64),
294 ];
295 metrics_collector.record_validation_duration(suite_duration, &attrs);
296
297 if has_errors {
299 metrics_collector.increment_validation_failures(&attrs);
300 }
301 }
302 }
303
304 if let Some(telemetry) = &self.telemetry {
306 if telemetry.record_timing {
307 utils::record_validation_metrics(
308 suite_span,
309 metrics.passed_checks as u32,
310 metrics.failed_checks as u32,
311 metrics.skipped_checks as u32,
312 metrics.execution_time_ms,
313 );
314 }
315 }
316
317 info!(
318 suite.name = %self.name,
319 metrics.passed = metrics.passed_checks,
320 metrics.failed = metrics.failed_checks,
321 metrics.skipped = metrics.skipped_checks,
322 metrics.total = metrics.total_checks,
323 metrics.duration_ms = metrics.execution_time_ms,
324 metrics.success_rate = %format!("{:.2}%", metrics.success_rate()),
325 suite.result = %if has_errors { "failed" } else { "passed" },
326 "Validation suite completed"
327 );
328 }
329
330 pub fn builder(name: impl Into<String>) -> ValidationSuiteBuilder {
344 ValidationSuiteBuilder::new(name)
345 }
346
347 pub fn name(&self) -> &str {
349 &self.name
350 }
351
352 pub fn description(&self) -> Option<&str> {
354 self.description.as_deref()
355 }
356
357 pub fn checks(&self) -> &[Arc<Check>] {
359 &self.checks
360 }
361
362 pub fn telemetry_enabled(&self) -> bool {
364 self.telemetry.is_some()
365 }
366
367 pub fn telemetry(&self) -> Option<&Arc<TermTelemetry>> {
369 self.telemetry.as_ref()
370 }
371
372 pub fn optimizer_enabled(&self) -> bool {
374 self.use_optimizer
375 }
376
377 #[instrument(skip(self, ctx), fields(
387 suite.name = %self.name,
388 suite.checks = self.checks.len(),
389 telemetry.enabled = self.telemetry_enabled()
390 ))]
391 pub async fn run(&self, ctx: &SessionContext) -> Result<ValidationResult> {
392 info!(
393 suite.name = %self.name,
394 suite.checks = self.checks.len(),
395 suite.description = ?self.description,
396 "Starting validation suite"
397 );
398 let start_time = Instant::now();
399
400 #[cfg(feature = "telemetry")]
402 let _active_guard = if let Some(telemetry) = &self.telemetry {
403 telemetry.metrics().map(|m| m.start_validation())
404 } else {
405 None
406 };
407
408 let mut suite_span = if let Some(telemetry) = &self.telemetry {
410 telemetry.start_suite_span(&self.name, self.checks.len())
411 } else {
412 TermSpan::noop()
413 };
414
415 #[cfg(feature = "telemetry")]
417 if let Some(telemetry) = &self.telemetry {
418 if let Some(metrics) = telemetry.metrics() {
419 let attrs = vec![
420 opentelemetry::KeyValue::new("suite.name", self.name.clone()),
421 opentelemetry::KeyValue::new("check.count", self.checks.len() as i64),
422 ];
423 metrics.increment_validation_runs(&attrs);
424
425 if let Ok(df) = ctx.sql("SELECT COUNT(*) as row_count FROM data").await {
427 if let Ok(batches) = df.collect().await {
428 if !batches.is_empty() && batches[0].num_rows() > 0 {
429 if let Some(array) = batches[0]
430 .column(0)
431 .as_any()
432 .downcast_ref::<arrow::array::Int64Array>()
433 {
434 let row_count = array.value(0) as u64;
435 metrics.add_rows_processed(row_count, &attrs);
436 }
437 }
438 }
439 }
440 }
441 }
442
443 let mut report = ValidationReport::new(&self.name);
444 let mut metrics = ValidationMetrics::new();
445 let mut has_errors = false;
446
447 if self.use_optimizer {
449 warn!("Query optimizer is not yet implemented, falling back to sequential execution");
452 self.run_sequential(
453 ctx,
454 &mut report,
455 &mut metrics,
456 &mut has_errors,
457 &start_time,
458 &mut suite_span,
459 )
460 .await?;
461 } else {
462 self.run_sequential(
464 ctx,
465 &mut report,
466 &mut metrics,
467 &mut has_errors,
468 &start_time,
469 &mut suite_span,
470 )
471 .await?;
472 }
473
474 metrics.execution_time_ms = start_time.elapsed().as_millis() as u64;
475 report.metrics = metrics.clone();
476
477 self.record_final_metrics(&metrics, has_errors, &start_time, &mut suite_span);
479
480 info!(
481 suite.name = %self.name,
482 metrics.passed = metrics.passed_checks,
483 metrics.failed = metrics.failed_checks,
484 "Validation suite completed (optimized)"
485 );
486
487 if has_errors {
488 Ok(ValidationResult::failure(report))
489 } else {
490 Ok(ValidationResult::success(metrics, report))
491 }
492 }
493}
494
495#[derive(Debug)]
522pub struct ValidationSuiteBuilder {
523 name: String,
524 description: Option<String>,
525 checks: Vec<Arc<Check>>,
526 telemetry: Option<Arc<TermTelemetry>>,
527 use_optimizer: bool,
528}
529
530impl ValidationSuiteBuilder {
531 pub fn new(name: impl Into<String>) -> Self {
533 Self {
534 name: name.into(),
535 description: None,
536 checks: Vec::new(),
537 telemetry: None,
538 use_optimizer: false,
539 }
540 }
541
542 pub fn description(mut self, description: impl Into<String>) -> Self {
548 self.description = Some(description.into());
549 self
550 }
551
552 pub fn check(mut self, check: Check) -> Self {
558 self.checks.push(Arc::new(check));
559 self
560 }
561
562 pub fn checks<I>(mut self, checks: I) -> Self
568 where
569 I: IntoIterator<Item = Check>,
570 {
571 self.checks.extend(checks.into_iter().map(Arc::new));
572 self
573 }
574
575 pub fn with_telemetry(mut self, telemetry: TermTelemetry) -> Self {
581 self.telemetry = Some(Arc::new(telemetry));
582 self
583 }
584
585 pub fn with_optimizer(mut self, enabled: bool) -> Self {
605 self.use_optimizer = enabled;
606 self
607 }
608
609 pub fn build(self) -> ValidationSuite {
615 ValidationSuite {
616 name: self.name,
617 description: self.description,
618 checks: self.checks,
619 telemetry: self.telemetry,
620 use_optimizer: self.use_optimizer,
621 }
622 }
623}
624
625#[cfg(test)]
626mod tests {
627 use super::*;
628
629 #[test]
630 fn test_validation_suite_builder() {
631 let suite = ValidationSuite::builder("test_suite")
632 .description("Test validation suite")
633 .check(Check::builder("test_check").build())
634 .build();
635
636 assert_eq!(suite.name(), "test_suite");
637 assert_eq!(suite.description(), Some("Test validation suite"));
638 assert!(!suite.telemetry_enabled()); assert_eq!(suite.checks().len(), 1);
640 }
641
642 #[test]
643 fn test_validation_suite_default_telemetry() {
644 let suite = ValidationSuite::builder("test_suite").build();
645 assert!(!suite.telemetry_enabled()); }
647
648 #[cfg(feature = "telemetry")]
649 #[test]
650 fn test_validation_suite_with_telemetry() {
651 let telemetry = TermTelemetry::disabled();
652 let suite = ValidationSuite::builder("test_suite")
653 .with_telemetry(telemetry)
654 .build();
655 assert!(suite.telemetry_enabled());
656 }
657
658 #[test]
659 fn test_validation_suite_with_optimizer() {
660 let suite = ValidationSuite::builder("test_suite")
661 .with_optimizer(true)
662 .build();
663 assert!(suite.optimizer_enabled());
664
665 let suite_no_opt = ValidationSuite::builder("test_suite")
666 .with_optimizer(false)
667 .build();
668 assert!(!suite_no_opt.optimizer_enabled());
669
670 let suite_default = ValidationSuite::builder("test_suite").build();
672 assert!(!suite_default.optimizer_enabled());
673 }
674}