term_guard/core/
suite.rs

1//! Validation suite for organizing and running checks.
2
3use super::{
4    result::{ValidationIssue, ValidationMetrics, ValidationReport},
5    Check, ConstraintStatus, Level, ValidationResult,
6};
7// use crate::optimizer::QueryOptimizer; // TODO: Re-enable once TermContext integration is resolved
8use crate::prelude::*;
9use crate::telemetry::{utils, TermSpan, TermTelemetry};
10use datafusion::prelude::*;
11use std::sync::Arc;
12use std::time::Instant;
13use tracing::{debug, error, info, instrument, warn};
14
15/// A collection of validation checks to be run together.
16///
17/// A `ValidationSuite` groups related checks and provides a way to execute
18/// them against data. It supports telemetry integration for monitoring
19/// validation performance and results.
20///
21/// # Examples
22///
23/// ```rust
24/// use term_guard::core::ValidationSuite;
25/// use term_guard::telemetry::TermTelemetry;
26///
27/// let suite = ValidationSuite::builder("data_quality_suite")
28///     .description("Comprehensive data quality validation")
29///     .build();
30///
31/// // Or with telemetry configuration:
32/// # #[cfg(feature = "telemetry")]
33/// # {
34/// let telemetry = TermTelemetry::disabled();
35/// let suite_with_telemetry = ValidationSuite::builder("data_quality_suite")
36///     .with_telemetry(telemetry)
37///     .build();
38/// # }
39/// ```
40#[derive(Debug, Clone)]
41pub struct ValidationSuite {
42    /// The name of the validation suite
43    name: String,
44    /// Optional description of the suite's purpose
45    description: Option<String>,
46    /// The checks to run as part of this suite
47    checks: Vec<Arc<Check>>,
48    /// Optional telemetry configuration
49    telemetry: Option<Arc<TermTelemetry>>,
50    /// Optional query optimizer for improving performance
51    use_optimizer: bool,
52    /// The name of the table to validate (defaults to "data")
53    table_name: String,
54}
55
56impl ValidationSuite {
57    /// Runs the validation suite sequentially without optimization.
58    async fn run_sequential(
59        &self,
60        ctx: &SessionContext,
61        report: &mut ValidationReport,
62        metrics: &mut ValidationMetrics,
63        has_errors: &mut bool,
64        #[allow(unused_variables)] start_time: &Instant,
65        _suite_span: &mut TermSpan,
66    ) -> Result<()> {
67        for check in &self.checks {
68            debug!(
69                check.name = %check.name(),
70                check.level = ?check.level(),
71                check.constraints = check.constraints().len(),
72                "Running validation check"
73            );
74            #[allow(unused_variables)]
75            let check_start = Instant::now();
76
77            // Create telemetry span for the check
78            let _check_span = if let Some(telemetry) = &self.telemetry {
79                telemetry.start_check_span(check.name(), check.constraints().len())
80            } else {
81                TermSpan::noop()
82            };
83
84            for constraint in check.constraints() {
85                metrics.total_checks += 1;
86
87                // Create telemetry span for the constraint
88                let mut constraint_span = if let Some(telemetry) = &self.telemetry {
89                    // Try to extract column information from constraint metadata if available
90                    let column = None; // TODO: Extract from constraint if possible
91                    telemetry.start_constraint_span(constraint.name(), column)
92                } else {
93                    TermSpan::noop()
94                };
95
96                // Run constraint evaluation with the proper table context
97                let validation_ctx = crate::core::ValidationContext::new(self.table_name.clone());
98                let result = crate::core::validation_context::CURRENT_CONTEXT
99                    .scope(validation_ctx, constraint.evaluate(ctx))
100                    .await;
101
102                match result {
103                    Ok(result) => {
104                        // Record constraint result in telemetry
105                        if let Some(telemetry) = &self.telemetry {
106                            if telemetry.detailed_metrics {
107                                utils::record_constraint_result(&mut constraint_span, &result);
108                            }
109                        }
110
111                        match result.status {
112                            ConstraintStatus::Success => {
113                                metrics.passed_checks += 1;
114                                debug!(
115                                    constraint.name = %constraint.name(),
116                                    check.name = %check.name(),
117                                    constraint.metric = ?result.metric,
118                                    "Constraint passed"
119                                );
120
121                                // Record success in metrics
122                                #[cfg(feature = "telemetry")]
123                                if let Some(telemetry) = &self.telemetry {
124                                    if let Some(metrics_collector) = telemetry.metrics() {
125                                        let attrs = vec![
126                                            opentelemetry::KeyValue::new(
127                                                "check.name",
128                                                check.name().to_string(),
129                                            ),
130                                            opentelemetry::KeyValue::new(
131                                                "check.type",
132                                                constraint.name().to_string(),
133                                            ),
134                                            opentelemetry::KeyValue::new("check.passed", true),
135                                        ];
136                                        metrics_collector.increment_checks_passed(&attrs);
137                                    }
138                                }
139                            }
140                            ConstraintStatus::Failure => {
141                                metrics.failed_checks += 1;
142                                let failure_message = result.message.clone().unwrap_or_else(|| {
143                                    let name = constraint.name();
144                                    format!("Constraint {name} failed")
145                                });
146                                let issue = ValidationIssue {
147                                    check_name: check.name().to_string(),
148                                    constraint_name: constraint.name().to_string(),
149                                    level: check.level(),
150                                    message: failure_message.clone(),
151                                    metric: result.metric,
152                                };
153
154                                if check.level() == Level::Error {
155                                    *has_errors = true;
156                                }
157
158                                warn!(
159                                    constraint.name = %constraint.name(),
160                                    check.name = %check.name(),
161                                    check.level = ?check.level(),
162                                    failure.message = %issue.message,
163                                    constraint.metric = ?result.metric,
164                                    "Constraint failed"
165                                );
166                                report.add_issue(issue);
167
168                                // Record failure in metrics
169                                #[cfg(feature = "telemetry")]
170                                if let Some(telemetry) = &self.telemetry {
171                                    if let Some(metrics_collector) = telemetry.metrics() {
172                                        let attrs = vec![
173                                            opentelemetry::KeyValue::new(
174                                                "check.name",
175                                                check.name().to_string(),
176                                            ),
177                                            opentelemetry::KeyValue::new(
178                                                "check.type",
179                                                constraint.name().to_string(),
180                                            ),
181                                            opentelemetry::KeyValue::new("check.passed", false),
182                                            opentelemetry::KeyValue::new(
183                                                "failure.reason",
184                                                failure_message,
185                                            ),
186                                        ];
187                                        metrics_collector.increment_checks_failed(&attrs);
188                                    }
189                                }
190                            }
191                            ConstraintStatus::Skipped => {
192                                metrics.skipped_checks += 1;
193                                debug!(
194                                    constraint.name = %constraint.name(),
195                                    check.name = %check.name(),
196                                    skip.reason = %result.message.as_deref().unwrap_or("No reason provided"),
197                                    "Constraint skipped"
198                                );
199                            }
200                        }
201
202                        // Record custom metrics
203                        if let Some(metric_value) = result.metric {
204                            let check_name = check.name();
205                            let constraint_name = constraint.name();
206                            let metric_name = format!("{check_name}.{constraint_name}");
207                            metrics
208                                .custom_metrics
209                                .insert(metric_name.clone(), metric_value);
210
211                            // Record to OpenTelemetry metrics
212                            #[cfg(feature = "telemetry")]
213                            if let Some(telemetry) = &self.telemetry {
214                                if let Some(metrics_collector) = telemetry.metrics() {
215                                    let attrs = vec![
216                                        opentelemetry::KeyValue::new("metric.name", metric_name),
217                                        opentelemetry::KeyValue::new(
218                                            "check.name",
219                                            check.name().to_string(),
220                                        ),
221                                        opentelemetry::KeyValue::new(
222                                            "constraint.type",
223                                            constraint.name().to_string(),
224                                        ),
225                                    ];
226                                    metrics_collector.record_custom_metric(metric_value, &attrs);
227                                }
228                            }
229                        }
230                    }
231                    Err(e) => {
232                        // Record error in telemetry
233                        constraint_span.record_error(&e as &dyn std::error::Error);
234
235                        metrics.failed_checks += 1;
236                        let issue = ValidationIssue {
237                            check_name: check.name().to_string(),
238                            constraint_name: constraint.name().to_string(),
239                            level: check.level(),
240                            message: format!("Error evaluating constraint: {e}"),
241                            metric: None,
242                        };
243
244                        if check.level() == Level::Error {
245                            *has_errors = true;
246                        }
247
248                        error!(
249                            constraint.name = %constraint.name(),
250                            check.name = %check.name(),
251                            error = %e,
252                            error.type = "constraint_evaluation",
253                            "Error evaluating constraint"
254                        );
255                        report.add_issue(issue);
256                    }
257                }
258            }
259
260            // Record check duration in metrics
261            #[cfg(feature = "telemetry")]
262            if let Some(telemetry) = &self.telemetry {
263                if let Some(metrics_collector) = telemetry.metrics() {
264                    let check_duration = check_start.elapsed().as_secs_f64();
265                    let attrs = vec![
266                        opentelemetry::KeyValue::new("check.name", check.name().to_string()),
267                        opentelemetry::KeyValue::new(
268                            "check.constraint_count",
269                            check.constraints().len() as i64,
270                        ),
271                    ];
272                    metrics_collector.record_check_duration(check_duration, &attrs);
273                }
274            }
275        }
276
277        Ok(())
278    }
279
280    /// Records final metrics for the validation suite.
281    fn record_final_metrics(
282        &self,
283        metrics: &ValidationMetrics,
284        has_errors: bool,
285        start_time: &Instant,
286        suite_span: &mut TermSpan,
287    ) {
288        // Avoid unused variable warning when telemetry is disabled
289        let _ = start_time;
290
291        // Record suite duration in metrics
292        #[cfg(feature = "telemetry")]
293        if let Some(telemetry) = &self.telemetry {
294            if let Some(metrics_collector) = telemetry.metrics() {
295                let suite_duration = start_time.elapsed().as_secs_f64();
296                let attrs = vec![
297                    opentelemetry::KeyValue::new("suite.name", self.name.clone()),
298                    opentelemetry::KeyValue::new("suite.passed", !has_errors),
299                    opentelemetry::KeyValue::new("checks.total", metrics.total_checks as i64),
300                    opentelemetry::KeyValue::new("checks.passed", metrics.passed_checks as i64),
301                    opentelemetry::KeyValue::new("checks.failed", metrics.failed_checks as i64),
302                ];
303                metrics_collector.record_validation_duration(suite_duration, &attrs);
304
305                // Record validation failure if there were errors
306                if has_errors {
307                    metrics_collector.increment_validation_failures(&attrs);
308                }
309            }
310        }
311
312        // Record final metrics in telemetry span
313        if let Some(telemetry) = &self.telemetry {
314            if telemetry.record_timing {
315                utils::record_validation_metrics(
316                    suite_span,
317                    metrics.passed_checks as u32,
318                    metrics.failed_checks as u32,
319                    metrics.skipped_checks as u32,
320                    metrics.execution_time_ms,
321                );
322            }
323        }
324
325        info!(
326            suite.name = %self.name,
327            metrics.passed = metrics.passed_checks,
328            metrics.failed = metrics.failed_checks,
329            metrics.skipped = metrics.skipped_checks,
330            metrics.total = metrics.total_checks,
331            metrics.duration_ms = metrics.execution_time_ms,
332            metrics.success_rate = %format!("{:.2}%", metrics.success_rate()),
333            suite.result = %if has_errors { "failed" } else { "passed" },
334            "Validation suite completed"
335        );
336    }
337
338    /// Creates a new builder for constructing a validation suite.
339    ///
340    /// # Arguments
341    ///
342    /// * `name` - The name of the validation suite
343    ///
344    /// # Examples
345    ///
346    /// ```rust
347    /// use term_guard::core::ValidationSuite;
348    ///
349    /// let builder = ValidationSuite::builder("my_suite");
350    /// ```
351    pub fn builder(name: impl Into<String>) -> ValidationSuiteBuilder {
352        ValidationSuiteBuilder::new(name)
353    }
354
355    /// Returns the name of the validation suite.
356    pub fn name(&self) -> &str {
357        &self.name
358    }
359
360    /// Returns the description of the validation suite if available.
361    pub fn description(&self) -> Option<&str> {
362        self.description.as_deref()
363    }
364
365    /// Returns the checks in this validation suite.
366    pub fn checks(&self) -> &[Arc<Check>] {
367        &self.checks
368    }
369
370    /// Returns whether telemetry is enabled for this suite.
371    pub fn telemetry_enabled(&self) -> bool {
372        self.telemetry.is_some()
373    }
374
375    /// Returns the telemetry configuration for this suite.
376    pub fn telemetry(&self) -> Option<&Arc<TermTelemetry>> {
377        self.telemetry.as_ref()
378    }
379
380    /// Returns whether the query optimizer is enabled for this suite.
381    pub fn optimizer_enabled(&self) -> bool {
382        self.use_optimizer
383    }
384
385    /// Runs the validation suite against the provided data.
386    ///
387    /// # Arguments
388    ///
389    /// * `ctx` - The DataFusion session context containing the data to validate
390    ///
391    /// # Returns
392    ///
393    /// A `Result` containing the validation result or an error
394    #[instrument(skip(self, ctx), fields(
395        suite.name = %self.name,
396        suite.checks = self.checks.len(),
397        telemetry.enabled = self.telemetry_enabled()
398    ))]
399    pub async fn run(&self, ctx: &SessionContext) -> Result<ValidationResult> {
400        info!(
401            suite.name = %self.name,
402            suite.checks = self.checks.len(),
403            suite.description = ?self.description,
404            "Starting validation suite"
405        );
406        let start_time = Instant::now();
407
408        // Start active validation guard for metrics
409        #[cfg(feature = "telemetry")]
410        let _active_guard = if let Some(telemetry) = &self.telemetry {
411            telemetry.metrics().map(|m| m.start_validation())
412        } else {
413            None
414        };
415
416        // Create telemetry span for the entire suite
417        let mut suite_span = if let Some(telemetry) = &self.telemetry {
418            telemetry.start_suite_span(&self.name, self.checks.len())
419        } else {
420            TermSpan::noop()
421        };
422
423        // Record validation run start in metrics
424        #[cfg(feature = "telemetry")]
425        if let Some(telemetry) = &self.telemetry {
426            if let Some(metrics) = telemetry.metrics() {
427                let attrs = vec![
428                    opentelemetry::KeyValue::new("suite.name", self.name.clone()),
429                    opentelemetry::KeyValue::new("check.count", self.checks.len() as i64),
430                ];
431                metrics.increment_validation_runs(&attrs);
432
433                // Try to get row count from the data table
434                let table_query = format!("SELECT COUNT(*) as row_count FROM {}", self.table_name);
435                if let Ok(df) = ctx.sql(&table_query).await {
436                    if let Ok(batches) = df.collect().await {
437                        if !batches.is_empty() && batches[0].num_rows() > 0 {
438                            if let Some(array) = batches[0]
439                                .column(0)
440                                .as_any()
441                                .downcast_ref::<arrow::array::Int64Array>()
442                            {
443                                let row_count = array.value(0) as u64;
444                                metrics.add_rows_processed(row_count, &attrs);
445                            }
446                        }
447                    }
448                }
449            }
450        }
451
452        let mut report = ValidationReport::new(&self.name);
453        let mut metrics = ValidationMetrics::new();
454        let mut has_errors = false;
455
456        // Use optimizer if enabled
457        if self.use_optimizer {
458            // TODO: Implement optimized execution once TermContext integration is resolved
459            // For now, fall back to sequential execution
460            warn!("Query optimizer is not yet implemented, falling back to sequential execution");
461            self.run_sequential(
462                ctx,
463                &mut report,
464                &mut metrics,
465                &mut has_errors,
466                &start_time,
467                &mut suite_span,
468            )
469            .await?;
470        } else {
471            // Non-optimized execution path
472            self.run_sequential(
473                ctx,
474                &mut report,
475                &mut metrics,
476                &mut has_errors,
477                &start_time,
478                &mut suite_span,
479            )
480            .await?;
481        }
482
483        metrics.execution_time_ms = start_time.elapsed().as_millis() as u64;
484        report.metrics = metrics.clone();
485
486        // Record final metrics and complete
487        self.record_final_metrics(&metrics, has_errors, &start_time, &mut suite_span);
488
489        info!(
490            suite.name = %self.name,
491            metrics.passed = metrics.passed_checks,
492            metrics.failed = metrics.failed_checks,
493            "Validation suite completed (optimized)"
494        );
495
496        if has_errors {
497            Ok(ValidationResult::failure(report))
498        } else {
499            Ok(ValidationResult::success(metrics, report))
500        }
501    }
502}
503
504/// Builder for constructing `ValidationSuite` instances.
505///
506/// # Examples
507///
508/// ```rust
509/// use term_guard::core::{ValidationSuite, Check, Level};
510/// use term_guard::telemetry::TermTelemetry;
511///
512/// let suite = ValidationSuite::builder("quality_checks")
513///     .description("Data quality validation suite")
514///     .check(
515///         Check::builder("completeness")
516///             .level(Level::Error)
517///             .build()
518///     )
519///     .build();
520///
521/// // Or with telemetry:
522/// # #[cfg(feature = "telemetry")]
523/// # {
524/// let telemetry = TermTelemetry::disabled();
525/// let suite_with_telemetry = ValidationSuite::builder("quality_checks")
526///     .with_telemetry(telemetry)
527///     .build();
528/// # }
529/// ```
530#[derive(Debug)]
531pub struct ValidationSuiteBuilder {
532    name: String,
533    description: Option<String>,
534    checks: Vec<Arc<Check>>,
535    telemetry: Option<Arc<TermTelemetry>>,
536    use_optimizer: bool,
537    table_name: String,
538}
539
540impl ValidationSuiteBuilder {
541    /// Creates a new validation suite builder with the given name.
542    pub fn new(name: impl Into<String>) -> Self {
543        Self {
544            name: name.into(),
545            description: None,
546            checks: Vec::new(),
547            telemetry: None,
548            use_optimizer: false,
549            table_name: "data".to_string(),
550        }
551    }
552
553    /// Sets the description for the validation suite.
554    ///
555    /// # Arguments
556    ///
557    /// * `description` - A description of the suite's purpose
558    pub fn description(mut self, description: impl Into<String>) -> Self {
559        self.description = Some(description.into());
560        self
561    }
562
563    /// Sets the table name to validate.
564    ///
565    /// By default, validation runs against a table named "data". Use this method
566    /// to validate a different table, which is especially useful when working with
567    /// database sources.
568    ///
569    /// # Arguments
570    ///
571    /// * `table_name` - The name of the table to validate
572    ///
573    /// # Examples
574    ///
575    /// ```rust
576    /// use term_guard::core::ValidationSuite;
577    ///
578    /// let suite = ValidationSuite::builder("customer_validation")
579    ///     .table_name("customer_transactions")
580    ///     .build();
581    /// ```
582    pub fn table_name(mut self, table_name: impl Into<String>) -> Self {
583        self.table_name = table_name.into();
584        self
585    }
586
587    /// Adds a check to the validation suite.
588    ///
589    /// # Arguments
590    ///
591    /// * `check` - The check to add
592    pub fn check(mut self, check: Check) -> Self {
593        self.checks.push(Arc::new(check));
594        self
595    }
596
597    /// Adds multiple checks to the validation suite.
598    ///
599    /// # Arguments
600    ///
601    /// * `checks` - An iterator of checks to add
602    pub fn checks<I>(mut self, checks: I) -> Self
603    where
604        I: IntoIterator<Item = Check>,
605    {
606        self.checks.extend(checks.into_iter().map(Arc::new));
607        self
608    }
609
610    /// Sets the telemetry configuration for the suite.
611    ///
612    /// # Arguments
613    ///
614    /// * `telemetry` - The telemetry configuration to use
615    pub fn with_telemetry(mut self, telemetry: TermTelemetry) -> Self {
616        self.telemetry = Some(Arc::new(telemetry));
617        self
618    }
619
620    /// Sets whether to use the query optimizer for execution.
621    ///
622    /// When enabled, the suite will attempt to optimize constraint execution
623    /// by batching similar queries together. If optimization fails, it will
624    /// fall back to sequential execution.
625    ///
626    /// # Arguments
627    ///
628    /// * `enabled` - Whether to enable query optimization
629    ///
630    /// # Examples
631    ///
632    /// ```rust
633    /// use term_guard::core::ValidationSuite;
634    ///
635    /// let suite = ValidationSuite::builder("optimized_suite")
636    ///     .with_optimizer(true)
637    ///     .build();
638    /// ```
639    pub fn with_optimizer(mut self, enabled: bool) -> Self {
640        self.use_optimizer = enabled;
641        self
642    }
643
644    /// Builds the `ValidationSuite` instance.
645    ///
646    /// # Returns
647    ///
648    /// The constructed `ValidationSuite`
649    pub fn build(self) -> ValidationSuite {
650        ValidationSuite {
651            name: self.name,
652            description: self.description,
653            checks: self.checks,
654            telemetry: self.telemetry,
655            use_optimizer: self.use_optimizer,
656            table_name: self.table_name,
657        }
658    }
659}
660
661#[cfg(test)]
662mod tests {
663    use super::*;
664
665    #[test]
666    fn test_validation_suite_builder() {
667        let suite = ValidationSuite::builder("test_suite")
668            .description("Test validation suite")
669            .check(Check::builder("test_check").build())
670            .build();
671
672        assert_eq!(suite.name(), "test_suite");
673        assert_eq!(suite.description(), Some("Test validation suite"));
674        assert!(!suite.telemetry_enabled()); // No telemetry configured
675        assert_eq!(suite.checks().len(), 1);
676    }
677
678    #[test]
679    fn test_validation_suite_default_telemetry() {
680        let suite = ValidationSuite::builder("test_suite").build();
681        assert!(!suite.telemetry_enabled()); // Telemetry is disabled by default (BYOT pattern)
682    }
683
684    #[cfg(feature = "telemetry")]
685    #[test]
686    fn test_validation_suite_with_telemetry() {
687        let telemetry = TermTelemetry::disabled();
688        let suite = ValidationSuite::builder("test_suite")
689            .with_telemetry(telemetry)
690            .build();
691        assert!(suite.telemetry_enabled());
692    }
693
694    #[test]
695    fn test_validation_suite_with_optimizer() {
696        let suite = ValidationSuite::builder("test_suite")
697            .with_optimizer(true)
698            .build();
699        assert!(suite.optimizer_enabled());
700
701        let suite_no_opt = ValidationSuite::builder("test_suite")
702            .with_optimizer(false)
703            .build();
704        assert!(!suite_no_opt.optimizer_enabled());
705
706        // Default should be no optimizer
707        let suite_default = ValidationSuite::builder("test_suite").build();
708        assert!(!suite_default.optimizer_enabled());
709    }
710}