term_guard/core/
suite.rs

1//! Validation suite for organizing and running checks.
2
3use super::{
4    result::{ValidationIssue, ValidationMetrics, ValidationReport},
5    Check, ConstraintStatus, Level, ValidationResult,
6};
7// use crate::optimizer::QueryOptimizer; // TODO: Re-enable once TermContext integration is resolved
8use crate::prelude::*;
9use crate::telemetry::{utils, TermSpan, TermTelemetry};
10use datafusion::prelude::*;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, instrument, warn};
14
15/// A collection of validation checks to be run together.
16///
17/// A `ValidationSuite` groups related checks and provides a way to execute
18/// them against data. It supports telemetry integration for monitoring
19/// validation performance and results.
20///
21/// # Examples
22///
23/// ```rust
24/// use term_guard::core::ValidationSuite;
25/// use term_guard::telemetry::TermTelemetry;
26///
27/// let suite = ValidationSuite::builder("data_quality_suite")
28///     .description("Comprehensive data quality validation")
29///     .build();
30///
31/// // Or with telemetry configuration:
32/// # #[cfg(feature = "telemetry")]
33/// # {
34/// let telemetry = TermTelemetry::disabled();
35/// let suite_with_telemetry = ValidationSuite::builder("data_quality_suite")
36///     .with_telemetry(telemetry)
37///     .build();
38/// # }
39/// ```
40#[derive(Debug, Clone)]
41pub struct ValidationSuite {
42    /// The name of the validation suite
43    name: String,
44    /// Optional description of the suite's purpose
45    description: Option<String>,
46    /// The checks to run as part of this suite
47    checks: Vec<Arc<Check>>,
48    /// Optional telemetry configuration
49    telemetry: Option<Arc<TermTelemetry>>,
50    /// Optional query optimizer for improving performance
51    use_optimizer: bool,
52}
53
54impl ValidationSuite {
55    /// Runs the validation suite sequentially without optimization.
56    async fn run_sequential(
57        &self,
58        ctx: &SessionContext,
59        report: &mut ValidationReport,
60        metrics: &mut ValidationMetrics,
61        has_errors: &mut bool,
62        #[allow(unused_variables)] start_time: &Instant,
63        _suite_span: &mut TermSpan,
64    ) -> Result<()> {
65        for check in &self.checks {
66            debug!(
67                check.name = %check.name(),
68                check.level = ?check.level(),
69                check.constraints = check.constraints().len(),
70                "Running validation check"
71            );
72            #[allow(unused_variables)]
73            let check_start = Instant::now();
74
75            // Create telemetry span for the check
76            let _check_span = if let Some(telemetry) = &self.telemetry {
77                telemetry.start_check_span(check.name(), check.constraints().len())
78            } else {
79                TermSpan::noop()
80            };
81
82            for constraint in check.constraints() {
83                metrics.total_checks += 1;
84
85                // Create telemetry span for the constraint
86                let mut constraint_span = if let Some(telemetry) = &self.telemetry {
87                    // Try to extract column information from constraint metadata if available
88                    let column = None; // TODO: Extract from constraint if possible
89                    telemetry.start_constraint_span(constraint.name(), column)
90                } else {
91                    TermSpan::noop()
92                };
93
94                match constraint.evaluate(ctx).await {
95                    Ok(result) => {
96                        // Record constraint result in telemetry
97                        if let Some(telemetry) = &self.telemetry {
98                            if telemetry.detailed_metrics {
99                                utils::record_constraint_result(&mut constraint_span, &result);
100                            }
101                        }
102
103                        match result.status {
104                            ConstraintStatus::Success => {
105                                metrics.passed_checks += 1;
106                                debug!(
107                                    constraint.name = %constraint.name(),
108                                    check.name = %check.name(),
109                                    constraint.metric = ?result.metric,
110                                    "Constraint passed"
111                                );
112
113                                // Record success in metrics
114                                #[cfg(feature = "telemetry")]
115                                if let Some(telemetry) = &self.telemetry {
116                                    if let Some(metrics_collector) = telemetry.metrics() {
117                                        let attrs = vec![
118                                            opentelemetry::KeyValue::new(
119                                                "check.name",
120                                                check.name().to_string(),
121                                            ),
122                                            opentelemetry::KeyValue::new(
123                                                "check.type",
124                                                constraint.name().to_string(),
125                                            ),
126                                            opentelemetry::KeyValue::new("check.passed", true),
127                                        ];
128                                        metrics_collector.increment_checks_passed(&attrs);
129                                    }
130                                }
131                            }
132                            ConstraintStatus::Failure => {
133                                metrics.failed_checks += 1;
134                                let failure_message = result.message.clone().unwrap_or_else(|| {
135                                    let name = constraint.name();
136                                    format!("Constraint {name} failed")
137                                });
138                                let issue = ValidationIssue {
139                                    check_name: check.name().to_string(),
140                                    constraint_name: constraint.name().to_string(),
141                                    level: check.level(),
142                                    message: failure_message.clone(),
143                                    metric: result.metric,
144                                };
145
146                                if check.level() == Level::Error {
147                                    *has_errors = true;
148                                }
149
150                                warn!(
151                                    constraint.name = %constraint.name(),
152                                    check.name = %check.name(),
153                                    check.level = ?check.level(),
154                                    failure.message = %issue.message,
155                                    constraint.metric = ?result.metric,
156                                    "Constraint failed"
157                                );
158                                report.add_issue(issue);
159
160                                // Record failure in metrics
161                                #[cfg(feature = "telemetry")]
162                                if let Some(telemetry) = &self.telemetry {
163                                    if let Some(metrics_collector) = telemetry.metrics() {
164                                        let attrs = vec![
165                                            opentelemetry::KeyValue::new(
166                                                "check.name",
167                                                check.name().to_string(),
168                                            ),
169                                            opentelemetry::KeyValue::new(
170                                                "check.type",
171                                                constraint.name().to_string(),
172                                            ),
173                                            opentelemetry::KeyValue::new("check.passed", false),
174                                            opentelemetry::KeyValue::new(
175                                                "failure.reason",
176                                                failure_message,
177                                            ),
178                                        ];
179                                        metrics_collector.increment_checks_failed(&attrs);
180                                    }
181                                }
182                            }
183                            ConstraintStatus::Skipped => {
184                                metrics.skipped_checks += 1;
185                                debug!(
186                                    constraint.name = %constraint.name(),
187                                    check.name = %check.name(),
188                                    skip.reason = %result.message.as_deref().unwrap_or("No reason provided"),
189                                    "Constraint skipped"
190                                );
191                            }
192                        }
193
194                        // Record custom metrics
195                        if let Some(metric_value) = result.metric {
196                            let check_name = check.name();
197                            let constraint_name = constraint.name();
198                            let metric_name = format!("{check_name}.{constraint_name}");
199                            metrics
200                                .custom_metrics
201                                .insert(metric_name.clone(), metric_value);
202
203                            // Record to OpenTelemetry metrics
204                            #[cfg(feature = "telemetry")]
205                            if let Some(telemetry) = &self.telemetry {
206                                if let Some(metrics_collector) = telemetry.metrics() {
207                                    let attrs = vec![
208                                        opentelemetry::KeyValue::new("metric.name", metric_name),
209                                        opentelemetry::KeyValue::new(
210                                            "check.name",
211                                            check.name().to_string(),
212                                        ),
213                                        opentelemetry::KeyValue::new(
214                                            "constraint.type",
215                                            constraint.name().to_string(),
216                                        ),
217                                    ];
218                                    metrics_collector.record_custom_metric(metric_value, &attrs);
219                                }
220                            }
221                        }
222                    }
223                    Err(e) => {
224                        // Record error in telemetry
225                        constraint_span.record_error(&e as &dyn std::error::Error);
226
227                        metrics.failed_checks += 1;
228                        let issue = ValidationIssue {
229                            check_name: check.name().to_string(),
230                            constraint_name: constraint.name().to_string(),
231                            level: check.level(),
232                            message: format!("Error evaluating constraint: {e}"),
233                            metric: None,
234                        };
235
236                        if check.level() == Level::Error {
237                            *has_errors = true;
238                        }
239
240                        error!(
241                            constraint.name = %constraint.name(),
242                            check.name = %check.name(),
243                            error = %e,
244                            error.type = "constraint_evaluation",
245                            "Error evaluating constraint"
246                        );
247                        report.add_issue(issue);
248                    }
249                }
250            }
251
252            // Record check duration in metrics
253            #[cfg(feature = "telemetry")]
254            if let Some(telemetry) = &self.telemetry {
255                if let Some(metrics_collector) = telemetry.metrics() {
256                    let check_duration = check_start.elapsed().as_secs_f64();
257                    let attrs = vec![
258                        opentelemetry::KeyValue::new("check.name", check.name().to_string()),
259                        opentelemetry::KeyValue::new(
260                            "check.constraint_count",
261                            check.constraints().len() as i64,
262                        ),
263                    ];
264                    metrics_collector.record_check_duration(check_duration, &attrs);
265                }
266            }
267        }
268
269        Ok(())
270    }
271
272    /// Records final metrics for the validation suite.
273    fn record_final_metrics(
274        &self,
275        metrics: &ValidationMetrics,
276        has_errors: bool,
277        start_time: &Instant,
278        suite_span: &mut TermSpan,
279    ) {
280        // Avoid unused variable warning when telemetry is disabled
281        let _ = start_time;
282
283        // Record suite duration in metrics
284        #[cfg(feature = "telemetry")]
285        if let Some(telemetry) = &self.telemetry {
286            if let Some(metrics_collector) = telemetry.metrics() {
287                let suite_duration = start_time.elapsed().as_secs_f64();
288                let attrs = vec![
289                    opentelemetry::KeyValue::new("suite.name", self.name.clone()),
290                    opentelemetry::KeyValue::new("suite.passed", !has_errors),
291                    opentelemetry::KeyValue::new("checks.total", metrics.total_checks as i64),
292                    opentelemetry::KeyValue::new("checks.passed", metrics.passed_checks as i64),
293                    opentelemetry::KeyValue::new("checks.failed", metrics.failed_checks as i64),
294                ];
295                metrics_collector.record_validation_duration(suite_duration, &attrs);
296
297                // Record validation failure if there were errors
298                if has_errors {
299                    metrics_collector.increment_validation_failures(&attrs);
300                }
301            }
302        }
303
304        // Record final metrics in telemetry span
305        if let Some(telemetry) = &self.telemetry {
306            if telemetry.record_timing {
307                utils::record_validation_metrics(
308                    suite_span,
309                    metrics.passed_checks as u32,
310                    metrics.failed_checks as u32,
311                    metrics.skipped_checks as u32,
312                    metrics.execution_time_ms,
313                );
314            }
315        }
316
317        info!(
318            suite.name = %self.name,
319            metrics.passed = metrics.passed_checks,
320            metrics.failed = metrics.failed_checks,
321            metrics.skipped = metrics.skipped_checks,
322            metrics.total = metrics.total_checks,
323            metrics.duration_ms = metrics.execution_time_ms,
324            metrics.success_rate = %format!("{:.2}%", metrics.success_rate()),
325            suite.result = %if has_errors { "failed" } else { "passed" },
326            "Validation suite completed"
327        );
328    }
329
330    /// Creates a new builder for constructing a validation suite.
331    ///
332    /// # Arguments
333    ///
334    /// * `name` - The name of the validation suite
335    ///
336    /// # Examples
337    ///
338    /// ```rust
339    /// use term_guard::core::ValidationSuite;
340    ///
341    /// let builder = ValidationSuite::builder("my_suite");
342    /// ```
343    pub fn builder(name: impl Into<String>) -> ValidationSuiteBuilder {
344        ValidationSuiteBuilder::new(name)
345    }
346
347    /// Returns the name of the validation suite.
348    pub fn name(&self) -> &str {
349        &self.name
350    }
351
352    /// Returns the description of the validation suite if available.
353    pub fn description(&self) -> Option<&str> {
354        self.description.as_deref()
355    }
356
357    /// Returns the checks in this validation suite.
358    pub fn checks(&self) -> &[Arc<Check>] {
359        &self.checks
360    }
361
362    /// Returns whether telemetry is enabled for this suite.
363    pub fn telemetry_enabled(&self) -> bool {
364        self.telemetry.is_some()
365    }
366
367    /// Returns the telemetry configuration for this suite.
368    pub fn telemetry(&self) -> Option<&Arc<TermTelemetry>> {
369        self.telemetry.as_ref()
370    }
371
372    /// Returns whether the query optimizer is enabled for this suite.
373    pub fn optimizer_enabled(&self) -> bool {
374        self.use_optimizer
375    }
376
377    /// Runs the validation suite against the provided data.
378    ///
379    /// # Arguments
380    ///
381    /// * `ctx` - The DataFusion session context containing the data to validate
382    ///
383    /// # Returns
384    ///
385    /// A `Result` containing the validation result or an error
386    #[instrument(skip(self, ctx), fields(
387        suite.name = %self.name,
388        suite.checks = self.checks.len(),
389        telemetry.enabled = self.telemetry_enabled()
390    ))]
391    pub async fn run(&self, ctx: &SessionContext) -> Result<ValidationResult> {
392        info!(
393            suite.name = %self.name,
394            suite.checks = self.checks.len(),
395            suite.description = ?self.description,
396            "Starting validation suite"
397        );
398        let start_time = Instant::now();
399
400        // Start active validation guard for metrics
401        #[cfg(feature = "telemetry")]
402        let _active_guard = if let Some(telemetry) = &self.telemetry {
403            telemetry.metrics().map(|m| m.start_validation())
404        } else {
405            None
406        };
407
408        // Create telemetry span for the entire suite
409        let mut suite_span = if let Some(telemetry) = &self.telemetry {
410            telemetry.start_suite_span(&self.name, self.checks.len())
411        } else {
412            TermSpan::noop()
413        };
414
415        // Record validation run start in metrics
416        #[cfg(feature = "telemetry")]
417        if let Some(telemetry) = &self.telemetry {
418            if let Some(metrics) = telemetry.metrics() {
419                let attrs = vec![
420                    opentelemetry::KeyValue::new("suite.name", self.name.clone()),
421                    opentelemetry::KeyValue::new("check.count", self.checks.len() as i64),
422                ];
423                metrics.increment_validation_runs(&attrs);
424
425                // Try to get row count from the data table
426                if let Ok(df) = ctx.sql("SELECT COUNT(*) as row_count FROM data").await {
427                    if let Ok(batches) = df.collect().await {
428                        if !batches.is_empty() && batches[0].num_rows() > 0 {
429                            if let Some(array) = batches[0]
430                                .column(0)
431                                .as_any()
432                                .downcast_ref::<arrow::array::Int64Array>()
433                            {
434                                let row_count = array.value(0) as u64;
435                                metrics.add_rows_processed(row_count, &attrs);
436                            }
437                        }
438                    }
439                }
440            }
441        }
442
443        let mut report = ValidationReport::new(&self.name);
444        let mut metrics = ValidationMetrics::new();
445        let mut has_errors = false;
446
447        // Use optimizer if enabled
448        if self.use_optimizer {
449            // TODO: Implement optimized execution once TermContext integration is resolved
450            // For now, fall back to sequential execution
451            warn!("Query optimizer is not yet implemented, falling back to sequential execution");
452            self.run_sequential(
453                ctx,
454                &mut report,
455                &mut metrics,
456                &mut has_errors,
457                &start_time,
458                &mut suite_span,
459            )
460            .await?;
461        } else {
462            // Non-optimized execution path
463            self.run_sequential(
464                ctx,
465                &mut report,
466                &mut metrics,
467                &mut has_errors,
468                &start_time,
469                &mut suite_span,
470            )
471            .await?;
472        }
473
474        metrics.execution_time_ms = start_time.elapsed().as_millis() as u64;
475        report.metrics = metrics.clone();
476
477        // Record final metrics and complete
478        self.record_final_metrics(&metrics, has_errors, &start_time, &mut suite_span);
479
480        info!(
481            suite.name = %self.name,
482            metrics.passed = metrics.passed_checks,
483            metrics.failed = metrics.failed_checks,
484            "Validation suite completed (optimized)"
485        );
486
487        if has_errors {
488            Ok(ValidationResult::failure(report))
489        } else {
490            Ok(ValidationResult::success(metrics, report))
491        }
492    }
493}
494
495/// Builder for constructing `ValidationSuite` instances.
496///
497/// # Examples
498///
499/// ```rust
500/// use term_guard::core::{ValidationSuite, Check, Level};
501/// use term_guard::telemetry::TermTelemetry;
502///
503/// let suite = ValidationSuite::builder("quality_checks")
504///     .description("Data quality validation suite")
505///     .check(
506///         Check::builder("completeness")
507///             .level(Level::Error)
508///             .build()
509///     )
510///     .build();
511///
512/// // Or with telemetry:
513/// # #[cfg(feature = "telemetry")]
514/// # {
515/// let telemetry = TermTelemetry::disabled();
516/// let suite_with_telemetry = ValidationSuite::builder("quality_checks")
517///     .with_telemetry(telemetry)
518///     .build();
519/// # }
520/// ```
521#[derive(Debug)]
522pub struct ValidationSuiteBuilder {
523    name: String,
524    description: Option<String>,
525    checks: Vec<Arc<Check>>,
526    telemetry: Option<Arc<TermTelemetry>>,
527    use_optimizer: bool,
528}
529
530impl ValidationSuiteBuilder {
531    /// Creates a new validation suite builder with the given name.
532    pub fn new(name: impl Into<String>) -> Self {
533        Self {
534            name: name.into(),
535            description: None,
536            checks: Vec::new(),
537            telemetry: None,
538            use_optimizer: false,
539        }
540    }
541
542    /// Sets the description for the validation suite.
543    ///
544    /// # Arguments
545    ///
546    /// * `description` - A description of the suite's purpose
547    pub fn description(mut self, description: impl Into<String>) -> Self {
548        self.description = Some(description.into());
549        self
550    }
551
552    /// Adds a check to the validation suite.
553    ///
554    /// # Arguments
555    ///
556    /// * `check` - The check to add
557    pub fn check(mut self, check: Check) -> Self {
558        self.checks.push(Arc::new(check));
559        self
560    }
561
562    /// Adds multiple checks to the validation suite.
563    ///
564    /// # Arguments
565    ///
566    /// * `checks` - An iterator of checks to add
567    pub fn checks<I>(mut self, checks: I) -> Self
568    where
569        I: IntoIterator<Item = Check>,
570    {
571        self.checks.extend(checks.into_iter().map(Arc::new));
572        self
573    }
574
575    /// Sets the telemetry configuration for the suite.
576    ///
577    /// # Arguments
578    ///
579    /// * `telemetry` - The telemetry configuration to use
580    pub fn with_telemetry(mut self, telemetry: TermTelemetry) -> Self {
581        self.telemetry = Some(Arc::new(telemetry));
582        self
583    }
584
585    /// Sets whether to use the query optimizer for execution.
586    ///
587    /// When enabled, the suite will attempt to optimize constraint execution
588    /// by batching similar queries together. If optimization fails, it will
589    /// fall back to sequential execution.
590    ///
591    /// # Arguments
592    ///
593    /// * `enabled` - Whether to enable query optimization
594    ///
595    /// # Examples
596    ///
597    /// ```rust
598    /// use term_guard::core::ValidationSuite;
599    ///
600    /// let suite = ValidationSuite::builder("optimized_suite")
601    ///     .with_optimizer(true)
602    ///     .build();
603    /// ```
604    pub fn with_optimizer(mut self, enabled: bool) -> Self {
605        self.use_optimizer = enabled;
606        self
607    }
608
609    /// Builds the `ValidationSuite` instance.
610    ///
611    /// # Returns
612    ///
613    /// The constructed `ValidationSuite`
614    pub fn build(self) -> ValidationSuite {
615        ValidationSuite {
616            name: self.name,
617            description: self.description,
618            checks: self.checks,
619            telemetry: self.telemetry,
620            use_optimizer: self.use_optimizer,
621        }
622    }
623}
624
625#[cfg(test)]
626mod tests {
627    use super::*;
628
629    #[test]
630    fn test_validation_suite_builder() {
631        let suite = ValidationSuite::builder("test_suite")
632            .description("Test validation suite")
633            .check(Check::builder("test_check").build())
634            .build();
635
636        assert_eq!(suite.name(), "test_suite");
637        assert_eq!(suite.description(), Some("Test validation suite"));
638        assert!(!suite.telemetry_enabled()); // No telemetry configured
639        assert_eq!(suite.checks().len(), 1);
640    }
641
642    #[test]
643    fn test_validation_suite_default_telemetry() {
644        let suite = ValidationSuite::builder("test_suite").build();
645        assert!(!suite.telemetry_enabled()); // Telemetry is disabled by default (BYOT pattern)
646    }
647
648    #[cfg(feature = "telemetry")]
649    #[test]
650    fn test_validation_suite_with_telemetry() {
651        let telemetry = TermTelemetry::disabled();
652        let suite = ValidationSuite::builder("test_suite")
653            .with_telemetry(telemetry)
654            .build();
655        assert!(suite.telemetry_enabled());
656    }
657
658    #[test]
659    fn test_validation_suite_with_optimizer() {
660        let suite = ValidationSuite::builder("test_suite")
661            .with_optimizer(true)
662            .build();
663        assert!(suite.optimizer_enabled());
664
665        let suite_no_opt = ValidationSuite::builder("test_suite")
666            .with_optimizer(false)
667            .build();
668        assert!(!suite_no_opt.optimizer_enabled());
669
670        // Default should be no optimizer
671        let suite_default = ValidationSuite::builder("test_suite").build();
672        assert!(!suite_default.optimizer_enabled());
673    }
674}