term_guard/core/suite.rs
1//! Validation suite for organizing and running checks.
2
3use super::{
4 result::{ValidationIssue, ValidationMetrics, ValidationReport},
5 Check, ConstraintStatus, Level, ValidationResult,
6};
7// use crate::optimizer::QueryOptimizer; // TODO: Re-enable once TermContext integration is resolved
8use crate::prelude::*;
9use crate::telemetry::{utils, TermSpan, TermTelemetry};
10use datafusion::prelude::*;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, instrument, warn};
14
15/// A collection of validation checks to be run together.
16///
17/// A `ValidationSuite` groups related checks and provides a way to execute
18/// them against data. It supports telemetry integration for monitoring
19/// validation performance and results.
20///
21/// # Examples
22///
23/// ```rust
24/// use term_guard::core::ValidationSuite;
25/// use term_guard::telemetry::TermTelemetry;
26///
27/// let suite = ValidationSuite::builder("data_quality_suite")
28/// .description("Comprehensive data quality validation")
29/// .build();
30///
31/// // Or with telemetry configuration:
32/// # #[cfg(feature = "telemetry")]
33/// # {
34/// let telemetry = TermTelemetry::disabled();
35/// let suite_with_telemetry = ValidationSuite::builder("data_quality_suite")
36/// .with_telemetry(telemetry)
37/// .build();
38/// # }
39/// ```
40#[derive(Debug, Clone)]
41pub struct ValidationSuite {
42 /// The name of the validation suite
43 name: String,
44 /// Optional description of the suite's purpose
45 description: Option<String>,
46 /// The checks to run as part of this suite
47 checks: Vec<Arc<Check>>,
48 /// Optional telemetry configuration
49 telemetry: Option<Arc<TermTelemetry>>,
50 /// Optional query optimizer for improving performance
51 use_optimizer: bool,
52 /// The name of the table to validate (defaults to "data")
53 table_name: String,
54}
55
56impl ValidationSuite {
57 /// Runs the validation suite sequentially without optimization.
58 async fn run_sequential(
59 &self,
60 ctx: &SessionContext,
61 report: &mut ValidationReport,
62 metrics: &mut ValidationMetrics,
63 has_errors: &mut bool,
64 #[allow(unused_variables)] start_time: &Instant,
65 _suite_span: &mut TermSpan,
66 ) -> Result<()> {
67 for check in &self.checks {
68 debug!(
69 check.name = %check.name(),
70 check.level = ?check.level(),
71 check.constraints = check.constraints().len(),
72 "Running validation check"
73 );
74 #[allow(unused_variables)]
75 let check_start = Instant::now();
76
77 // Create telemetry span for the check
78 let _check_span = if let Some(telemetry) = &self.telemetry {
79 telemetry.start_check_span(check.name(), check.constraints().len())
80 } else {
81 TermSpan::noop()
82 };
83
84 for constraint in check.constraints() {
85 metrics.total_checks += 1;
86
87 // Create telemetry span for the constraint
88 let mut constraint_span = if let Some(telemetry) = &self.telemetry {
89 // Try to extract column information from constraint metadata if available
90 let column = None; // TODO: Extract from constraint if possible
91 telemetry.start_constraint_span(constraint.name(), column)
92 } else {
93 TermSpan::noop()
94 };
95
96 // Run constraint evaluation with the proper table context
97 let validation_ctx = crate::core::ValidationContext::new(self.table_name.clone());
98 let result = crate::core::validation_context::CURRENT_CONTEXT
99 .scope(validation_ctx, constraint.evaluate(ctx))
100 .await;
101
102 match result {
103 Ok(result) => {
104 // Record constraint result in telemetry
105 if let Some(telemetry) = &self.telemetry {
106 if telemetry.detailed_metrics {
107 utils::record_constraint_result(&mut constraint_span, &result);
108 }
109 }
110
111 match result.status {
112 ConstraintStatus::Success => {
113 metrics.passed_checks += 1;
114 debug!(
115 constraint.name = %constraint.name(),
116 check.name = %check.name(),
117 constraint.metric = ?result.metric,
118 "Constraint passed"
119 );
120
121 // Record success in metrics
122 #[cfg(feature = "telemetry")]
123 if let Some(telemetry) = &self.telemetry {
124 if let Some(metrics_collector) = telemetry.metrics() {
125 let attrs = vec![
126 opentelemetry::KeyValue::new(
127 "check.name",
128 check.name().to_string(),
129 ),
130 opentelemetry::KeyValue::new(
131 "check.type",
132 constraint.name().to_string(),
133 ),
134 opentelemetry::KeyValue::new("check.passed", true),
135 ];
136 metrics_collector.increment_checks_passed(&attrs);
137 }
138 }
139 }
140 ConstraintStatus::Failure => {
141 metrics.failed_checks += 1;
142 let failure_message = result.message.clone().unwrap_or_else(|| {
143 let name = constraint.name();
144 format!("Constraint {name} failed")
145 });
146 let issue = ValidationIssue {
147 check_name: check.name().to_string(),
148 constraint_name: constraint.name().to_string(),
149 level: check.level(),
150 message: failure_message.clone(),
151 metric: result.metric,
152 };
153
154 if check.level() == Level::Error {
155 *has_errors = true;
156 }
157
158 warn!(
159 constraint.name = %constraint.name(),
160 check.name = %check.name(),
161 check.level = ?check.level(),
162 failure.message = %issue.message,
163 constraint.metric = ?result.metric,
164 "Constraint failed"
165 );
166 report.add_issue(issue);
167
168 // Record failure in metrics
169 #[cfg(feature = "telemetry")]
170 if let Some(telemetry) = &self.telemetry {
171 if let Some(metrics_collector) = telemetry.metrics() {
172 let attrs = vec![
173 opentelemetry::KeyValue::new(
174 "check.name",
175 check.name().to_string(),
176 ),
177 opentelemetry::KeyValue::new(
178 "check.type",
179 constraint.name().to_string(),
180 ),
181 opentelemetry::KeyValue::new("check.passed", false),
182 opentelemetry::KeyValue::new(
183 "failure.reason",
184 failure_message,
185 ),
186 ];
187 metrics_collector.increment_checks_failed(&attrs);
188 }
189 }
190 }
191 ConstraintStatus::Skipped => {
192 metrics.skipped_checks += 1;
193 debug!(
194 constraint.name = %constraint.name(),
195 check.name = %check.name(),
196 skip.reason = %result.message.as_deref().unwrap_or("No reason provided"),
197 "Constraint skipped"
198 );
199 }
200 }
201
202 // Record custom metrics
203 if let Some(metric_value) = result.metric {
204 let check_name = check.name();
205 let constraint_name = constraint.name();
206 let metric_name = format!("{check_name}.{constraint_name}");
207 metrics
208 .custom_metrics
209 .insert(metric_name.clone(), metric_value);
210
211 // Record to OpenTelemetry metrics
212 #[cfg(feature = "telemetry")]
213 if let Some(telemetry) = &self.telemetry {
214 if let Some(metrics_collector) = telemetry.metrics() {
215 let attrs = vec![
216 opentelemetry::KeyValue::new("metric.name", metric_name),
217 opentelemetry::KeyValue::new(
218 "check.name",
219 check.name().to_string(),
220 ),
221 opentelemetry::KeyValue::new(
222 "constraint.type",
223 constraint.name().to_string(),
224 ),
225 ];
226 metrics_collector.record_custom_metric(metric_value, &attrs);
227 }
228 }
229 }
230 }
231 Err(e) => {
232 // Record error in telemetry
233 constraint_span.record_error(&e as &dyn std::error::Error);
234
235 metrics.failed_checks += 1;
236 let issue = ValidationIssue {
237 check_name: check.name().to_string(),
238 constraint_name: constraint.name().to_string(),
239 level: check.level(),
240 message: format!("Error evaluating constraint: {e}"),
241 metric: None,
242 };
243
244 if check.level() == Level::Error {
245 *has_errors = true;
246 }
247
248 error!(
249 constraint.name = %constraint.name(),
250 check.name = %check.name(),
251 error = %e,
252 error.type = "constraint_evaluation",
253 "Error evaluating constraint"
254 );
255 report.add_issue(issue);
256 }
257 }
258 }
259
260 // Record check duration in metrics
261 #[cfg(feature = "telemetry")]
262 if let Some(telemetry) = &self.telemetry {
263 if let Some(metrics_collector) = telemetry.metrics() {
264 let check_duration = check_start.elapsed().as_secs_f64();
265 let attrs = vec![
266 opentelemetry::KeyValue::new("check.name", check.name().to_string()),
267 opentelemetry::KeyValue::new(
268 "check.constraint_count",
269 check.constraints().len() as i64,
270 ),
271 ];
272 metrics_collector.record_check_duration(check_duration, &attrs);
273 }
274 }
275 }
276
277 Ok(())
278 }
279
280 /// Records final metrics for the validation suite.
281 fn record_final_metrics(
282 &self,
283 metrics: &ValidationMetrics,
284 has_errors: bool,
285 start_time: &Instant,
286 suite_span: &mut TermSpan,
287 ) {
288 // Avoid unused variable warning when telemetry is disabled
289 let _ = start_time;
290
291 // Record suite duration in metrics
292 #[cfg(feature = "telemetry")]
293 if let Some(telemetry) = &self.telemetry {
294 if let Some(metrics_collector) = telemetry.metrics() {
295 let suite_duration = start_time.elapsed().as_secs_f64();
296 let attrs = vec![
297 opentelemetry::KeyValue::new("suite.name", self.name.clone()),
298 opentelemetry::KeyValue::new("suite.passed", !has_errors),
299 opentelemetry::KeyValue::new("checks.total", metrics.total_checks as i64),
300 opentelemetry::KeyValue::new("checks.passed", metrics.passed_checks as i64),
301 opentelemetry::KeyValue::new("checks.failed", metrics.failed_checks as i64),
302 ];
303 metrics_collector.record_validation_duration(suite_duration, &attrs);
304
305 // Record validation failure if there were errors
306 if has_errors {
307 metrics_collector.increment_validation_failures(&attrs);
308 }
309 }
310 }
311
312 // Record final metrics in telemetry span
313 if let Some(telemetry) = &self.telemetry {
314 if telemetry.record_timing {
315 utils::record_validation_metrics(
316 suite_span,
317 metrics.passed_checks as u32,
318 metrics.failed_checks as u32,
319 metrics.skipped_checks as u32,
320 metrics.execution_time_ms,
321 );
322 }
323 }
324
325 info!(
326 suite.name = %self.name,
327 metrics.passed = metrics.passed_checks,
328 metrics.failed = metrics.failed_checks,
329 metrics.skipped = metrics.skipped_checks,
330 metrics.total = metrics.total_checks,
331 metrics.duration_ms = metrics.execution_time_ms,
332 metrics.success_rate = %format!("{:.2}%", metrics.success_rate()),
333 suite.result = %if has_errors { "failed" } else { "passed" },
334 "Validation suite completed"
335 );
336 }
337
338 /// Creates a new builder for constructing a validation suite.
339 ///
340 /// # Arguments
341 ///
342 /// * `name` - The name of the validation suite
343 ///
344 /// # Examples
345 ///
346 /// ```rust
347 /// use term_guard::core::ValidationSuite;
348 ///
349 /// let builder = ValidationSuite::builder("my_suite");
350 /// ```
351 pub fn builder(name: impl Into<String>) -> ValidationSuiteBuilder {
352 ValidationSuiteBuilder::new(name)
353 }
354
355 /// Returns the name of the validation suite.
356 pub fn name(&self) -> &str {
357 &self.name
358 }
359
360 /// Returns the description of the validation suite if available.
361 pub fn description(&self) -> Option<&str> {
362 self.description.as_deref()
363 }
364
365 /// Returns the checks in this validation suite.
366 pub fn checks(&self) -> &[Arc<Check>] {
367 &self.checks
368 }
369
370 /// Returns whether telemetry is enabled for this suite.
371 pub fn telemetry_enabled(&self) -> bool {
372 self.telemetry.is_some()
373 }
374
375 /// Returns the telemetry configuration for this suite.
376 pub fn telemetry(&self) -> Option<&Arc<TermTelemetry>> {
377 self.telemetry.as_ref()
378 }
379
380 /// Returns whether the query optimizer is enabled for this suite.
381 pub fn optimizer_enabled(&self) -> bool {
382 self.use_optimizer
383 }
384
385 /// Runs the validation suite against the provided data.
386 ///
387 /// # Arguments
388 ///
389 /// * `ctx` - The DataFusion session context containing the data to validate
390 ///
391 /// # Returns
392 ///
393 /// A `Result` containing the validation result or an error
394 #[instrument(skip(self, ctx), fields(
395 suite.name = %self.name,
396 suite.checks = self.checks.len(),
397 telemetry.enabled = self.telemetry_enabled()
398 ))]
399 pub async fn run(&self, ctx: &SessionContext) -> Result<ValidationResult> {
400 info!(
401 suite.name = %self.name,
402 suite.checks = self.checks.len(),
403 suite.description = ?self.description,
404 "Starting validation suite"
405 );
406 let start_time = Instant::now();
407
408 // Start active validation guard for metrics
409 #[cfg(feature = "telemetry")]
410 let _active_guard = if let Some(telemetry) = &self.telemetry {
411 telemetry.metrics().map(|m| m.start_validation())
412 } else {
413 None
414 };
415
416 // Create telemetry span for the entire suite
417 let mut suite_span = if let Some(telemetry) = &self.telemetry {
418 telemetry.start_suite_span(&self.name, self.checks.len())
419 } else {
420 TermSpan::noop()
421 };
422
423 // Record validation run start in metrics
424 #[cfg(feature = "telemetry")]
425 if let Some(telemetry) = &self.telemetry {
426 if let Some(metrics) = telemetry.metrics() {
427 let attrs = vec![
428 opentelemetry::KeyValue::new("suite.name", self.name.clone()),
429 opentelemetry::KeyValue::new("check.count", self.checks.len() as i64),
430 ];
431 metrics.increment_validation_runs(&attrs);
432
433 // Try to get row count from the data table
434 let table_query = format!("SELECT COUNT(*) as row_count FROM {}", self.table_name);
435 if let Ok(df) = ctx.sql(&table_query).await {
436 if let Ok(batches) = df.collect().await {
437 if !batches.is_empty() && batches[0].num_rows() > 0 {
438 if let Some(array) = batches[0]
439 .column(0)
440 .as_any()
441 .downcast_ref::<arrow::array::Int64Array>()
442 {
443 let row_count = array.value(0) as u64;
444 metrics.add_rows_processed(row_count, &attrs);
445 }
446 }
447 }
448 }
449 }
450 }
451
452 let mut report = ValidationReport::new(&self.name);
453 let mut metrics = ValidationMetrics::new();
454 let mut has_errors = false;
455
456 // Use optimizer if enabled
457 if self.use_optimizer {
458 // TODO: Implement optimized execution once TermContext integration is resolved
459 // For now, fall back to sequential execution
460 warn!("Query optimizer is not yet implemented, falling back to sequential execution");
461 self.run_sequential(
462 ctx,
463 &mut report,
464 &mut metrics,
465 &mut has_errors,
466 &start_time,
467 &mut suite_span,
468 )
469 .await?;
470 } else {
471 // Non-optimized execution path
472 self.run_sequential(
473 ctx,
474 &mut report,
475 &mut metrics,
476 &mut has_errors,
477 &start_time,
478 &mut suite_span,
479 )
480 .await?;
481 }
482
483 metrics.execution_time_ms = start_time.elapsed().as_millis() as u64;
484 report.metrics = metrics.clone();
485
486 // Record final metrics and complete
487 self.record_final_metrics(&metrics, has_errors, &start_time, &mut suite_span);
488
489 info!(
490 suite.name = %self.name,
491 metrics.passed = metrics.passed_checks,
492 metrics.failed = metrics.failed_checks,
493 "Validation suite completed (optimized)"
494 );
495
496 if has_errors {
497 Ok(ValidationResult::failure(report))
498 } else {
499 Ok(ValidationResult::success(metrics, report))
500 }
501 }
502}
503
504/// Builder for constructing `ValidationSuite` instances.
505///
506/// # Examples
507///
508/// ```rust
509/// use term_guard::core::{ValidationSuite, Check, Level};
510/// use term_guard::telemetry::TermTelemetry;
511///
512/// let suite = ValidationSuite::builder("quality_checks")
513/// .description("Data quality validation suite")
514/// .check(
515/// Check::builder("completeness")
516/// .level(Level::Error)
517/// .build()
518/// )
519/// .build();
520///
521/// // Or with telemetry:
522/// # #[cfg(feature = "telemetry")]
523/// # {
524/// let telemetry = TermTelemetry::disabled();
525/// let suite_with_telemetry = ValidationSuite::builder("quality_checks")
526/// .with_telemetry(telemetry)
527/// .build();
528/// # }
529/// ```
530#[derive(Debug)]
531pub struct ValidationSuiteBuilder {
532 name: String,
533 description: Option<String>,
534 checks: Vec<Arc<Check>>,
535 telemetry: Option<Arc<TermTelemetry>>,
536 use_optimizer: bool,
537 table_name: String,
538}
539
540impl ValidationSuiteBuilder {
541 /// Creates a new validation suite builder with the given name.
542 pub fn new(name: impl Into<String>) -> Self {
543 Self {
544 name: name.into(),
545 description: None,
546 checks: Vec::new(),
547 telemetry: None,
548 use_optimizer: false,
549 table_name: "data".to_string(),
550 }
551 }
552
553 /// Sets the description for the validation suite.
554 ///
555 /// # Arguments
556 ///
557 /// * `description` - A description of the suite's purpose
558 pub fn description(mut self, description: impl Into<String>) -> Self {
559 self.description = Some(description.into());
560 self
561 }
562
563 /// Sets the table name to validate.
564 ///
565 /// By default, validation runs against a table named "data". Use this method
566 /// to validate a different table, which is especially useful when working with
567 /// database sources.
568 ///
569 /// # Arguments
570 ///
571 /// * `table_name` - The name of the table to validate
572 ///
573 /// # Examples
574 ///
575 /// ```rust
576 /// use term_guard::core::ValidationSuite;
577 ///
578 /// let suite = ValidationSuite::builder("customer_validation")
579 /// .table_name("customer_transactions")
580 /// .build();
581 /// ```
582 pub fn table_name(mut self, table_name: impl Into<String>) -> Self {
583 self.table_name = table_name.into();
584 self
585 }
586
587 /// Adds a check to the validation suite.
588 ///
589 /// # Arguments
590 ///
591 /// * `check` - The check to add
592 pub fn check(mut self, check: Check) -> Self {
593 self.checks.push(Arc::new(check));
594 self
595 }
596
597 /// Adds multiple checks to the validation suite.
598 ///
599 /// # Arguments
600 ///
601 /// * `checks` - An iterator of checks to add
602 pub fn checks<I>(mut self, checks: I) -> Self
603 where
604 I: IntoIterator<Item = Check>,
605 {
606 self.checks.extend(checks.into_iter().map(Arc::new));
607 self
608 }
609
610 /// Sets the telemetry configuration for the suite.
611 ///
612 /// # Arguments
613 ///
614 /// * `telemetry` - The telemetry configuration to use
615 pub fn with_telemetry(mut self, telemetry: TermTelemetry) -> Self {
616 self.telemetry = Some(Arc::new(telemetry));
617 self
618 }
619
620 /// Sets whether to use the query optimizer for execution.
621 ///
622 /// When enabled, the suite will attempt to optimize constraint execution
623 /// by batching similar queries together. If optimization fails, it will
624 /// fall back to sequential execution.
625 ///
626 /// # Arguments
627 ///
628 /// * `enabled` - Whether to enable query optimization
629 ///
630 /// # Examples
631 ///
632 /// ```rust
633 /// use term_guard::core::ValidationSuite;
634 ///
635 /// let suite = ValidationSuite::builder("optimized_suite")
636 /// .with_optimizer(true)
637 /// .build();
638 /// ```
639 pub fn with_optimizer(mut self, enabled: bool) -> Self {
640 self.use_optimizer = enabled;
641 self
642 }
643
644 /// Builds the `ValidationSuite` instance.
645 ///
646 /// # Returns
647 ///
648 /// The constructed `ValidationSuite`
649 pub fn build(self) -> ValidationSuite {
650 ValidationSuite {
651 name: self.name,
652 description: self.description,
653 checks: self.checks,
654 telemetry: self.telemetry,
655 use_optimizer: self.use_optimizer,
656 table_name: self.table_name,
657 }
658 }
659}
660
661#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn test_validation_suite_builder() {
667 let suite = ValidationSuite::builder("test_suite")
668 .description("Test validation suite")
669 .check(Check::builder("test_check").build())
670 .build();
671
672 assert_eq!(suite.name(), "test_suite");
673 assert_eq!(suite.description(), Some("Test validation suite"));
674 assert!(!suite.telemetry_enabled()); // No telemetry configured
675 assert_eq!(suite.checks().len(), 1);
676 }
677
678 #[test]
679 fn test_validation_suite_default_telemetry() {
680 let suite = ValidationSuite::builder("test_suite").build();
681 assert!(!suite.telemetry_enabled()); // Telemetry is disabled by default (BYOT pattern)
682 }
683
684 #[cfg(feature = "telemetry")]
685 #[test]
686 fn test_validation_suite_with_telemetry() {
687 let telemetry = TermTelemetry::disabled();
688 let suite = ValidationSuite::builder("test_suite")
689 .with_telemetry(telemetry)
690 .build();
691 assert!(suite.telemetry_enabled());
692 }
693
694 #[test]
695 fn test_validation_suite_with_optimizer() {
696 let suite = ValidationSuite::builder("test_suite")
697 .with_optimizer(true)
698 .build();
699 assert!(suite.optimizer_enabled());
700
701 let suite_no_opt = ValidationSuite::builder("test_suite")
702 .with_optimizer(false)
703 .build();
704 assert!(!suite_no_opt.optimizer_enabled());
705
706 // Default should be no optimizer
707 let suite_default = ValidationSuite::builder("test_suite").build();
708 assert!(!suite_default.optimizer_enabled());
709 }
710}