term_guard/optimizer/
mod.rs

1//! Query optimization for validation constraints.
2//!
3//! This module provides optimization strategies to improve the performance of
4//! validation queries by:
5//! - Combining multiple checks into single table scans
6//! - Implementing predicate pushdown for partitioned data
7//! - Caching statistics across validation runs
8//! - Providing query plan explanations for debugging
9
10use crate::core::{Check, Constraint, ConstraintResult, TermContext};
11use crate::prelude::TermError;
12use std::collections::HashMap;
13use std::sync::Arc;
14use tracing::instrument;
15
16pub mod analyzer;
17pub mod combiner;
18pub mod executor;
19pub mod stats_cache;
20
21pub use analyzer::QueryAnalyzer;
22pub use combiner::QueryCombiner;
23pub use executor::OptimizedExecutor;
24pub use stats_cache::StatsCache;
25
26/// Query optimizer for validation constraints.
27#[derive(Debug)]
28pub struct QueryOptimizer {
29    analyzer: QueryAnalyzer,
30    combiner: QueryCombiner,
31    executor: OptimizedExecutor,
32    stats_cache: StatsCache,
33}
34
35impl QueryOptimizer {
36    /// Creates a new query optimizer.
37    pub fn new() -> Self {
38        Self {
39            analyzer: QueryAnalyzer::new(),
40            combiner: QueryCombiner::new(),
41            executor: OptimizedExecutor::new(),
42            stats_cache: StatsCache::new(),
43        }
44    }
45
46    /// Optimizes and executes a set of checks.
47    ///
48    /// This method:
49    /// 1. Analyzes all constraints to identify optimization opportunities
50    /// 2. Groups constraints by table and compatible operations
51    /// 3. Combines queries to minimize table scans
52    /// 4. Executes optimized queries and maps results back to constraints
53    ///
54    /// # Arguments
55    ///
56    /// * `checks` - The validation checks to optimize and execute
57    /// * `ctx` - The Term context containing the DataFusion session
58    ///
59    /// # Returns
60    ///
61    /// A map of constraint names to their results.
62    #[instrument(skip(self, checks, ctx))]
63    pub async fn optimize_and_execute(
64        &mut self,
65        checks: &[Check],
66        ctx: &TermContext,
67    ) -> Result<HashMap<String, Vec<ConstraintResult>>, TermError> {
68        // Extract all constraints from checks
69        let constraints = self.extract_constraints(checks);
70
71        // Analyze constraints to identify optimization opportunities
72        let analysis = self.analyzer.analyze(&constraints)?;
73
74        // Group constraints by optimization strategy
75        let groups = self.combiner.group_constraints(analysis)?;
76
77        // Execute optimized queries
78        let mut results = HashMap::new();
79
80        for group in groups {
81            let group_results = self
82                .executor
83                .execute_group(group, ctx, &mut self.stats_cache)
84                .await?;
85            results.extend(group_results);
86        }
87
88        // Map results back to checks
89        Ok(self.map_results_to_checks(checks, results))
90    }
91
92    /// Extracts all constraints from checks.
93    fn extract_constraints(&self, checks: &[Check]) -> Vec<(String, Arc<dyn Constraint>)> {
94        let mut constraints = Vec::new();
95
96        for check in checks {
97            for constraint in check.constraints() {
98                let check_name = check.name();
99                let constraint_name = constraint.name();
100                let name = format!("{check_name}.{constraint_name}");
101                constraints.push((name, constraint.clone()));
102            }
103        }
104
105        constraints
106    }
107
108    /// Maps constraint results back to their respective checks.
109    fn map_results_to_checks(
110        &self,
111        checks: &[Check],
112        constraint_results: HashMap<String, ConstraintResult>,
113    ) -> HashMap<String, Vec<ConstraintResult>> {
114        let mut check_results = HashMap::new();
115
116        for check in checks {
117            let mut results = Vec::new();
118
119            for constraint in check.constraints() {
120                let check_name = check.name();
121                let constraint_name = constraint.name();
122                let name = format!("{check_name}.{constraint_name}");
123                if let Some(result) = constraint_results.get(&name) {
124                    results.push(result.clone());
125                }
126            }
127
128            check_results.insert(check.name().to_string(), results);
129        }
130
131        check_results
132    }
133
134    /// Explains the query optimization plan for debugging.
135    ///
136    /// This provides detailed information about:
137    /// - Which constraints were grouped together
138    /// - What optimizations were applied
139    /// - The resulting query plans
140    pub async fn explain_plan(
141        &mut self,
142        checks: &[Check],
143        ctx: &TermContext,
144    ) -> Result<String, TermError> {
145        let constraints = self.extract_constraints(checks);
146        let analysis = self.analyzer.analyze(&constraints)?;
147        let groups = self.combiner.group_constraints(analysis.clone())?;
148
149        let mut explanation = String::new();
150        explanation.push_str("Query Optimization Plan\n");
151        explanation.push_str("======================\n\n");
152
153        // Summary statistics
154        explanation.push_str(&format!("Total Checks: {}\n", checks.len()));
155        explanation.push_str(&format!("Total Constraints: {}\n", constraints.len()));
156        explanation.push_str(&format!("Optimized Groups: {}\n", groups.len()));
157
158        let combinable = analysis.iter().filter(|a| a.is_combinable).count();
159
160        explanation.push_str(&format!("Combinable Constraints: {combinable}\n"));
161        explanation.push_str(&format!(
162            "Optimization Ratio: {:.1}%\n\n",
163            if constraints.is_empty() {
164                0.0
165            } else {
166                (groups.len() as f64 / constraints.len() as f64) * 100.0
167            }
168        ));
169
170        // Detailed group information
171        for (i, group) in groups.iter().enumerate() {
172            explanation.push_str(&format!(
173                "Group {}: {} constraints\n",
174                i + 1,
175                group.constraints.len()
176            ));
177            explanation.push_str(&format!("  Table: {}\n", group.constraints[0].table_name));
178
179            // Show optimization benefits
180            if group.constraints.len() > 1 {
181                explanation.push_str(&format!(
182                    "  Benefit: {} table scans reduced to 1\n",
183                    group.constraints.len()
184                ));
185            }
186
187            // Show predicate pushdown info if applicable
188            let has_predicates = group.constraints.iter().any(|c| c.has_predicates);
189            if has_predicates && self.executor.enable_pushdown {
190                explanation.push_str("  Predicate Pushdown: Enabled\n");
191            }
192
193            explanation.push_str(&self.executor.explain_group(group, ctx).await?);
194            explanation.push('\n');
195        }
196
197        // Cache statistics
198        let cache_stats = self.stats_cache.stats();
199        explanation.push_str("Cache Statistics\n");
200        explanation.push_str("----------------\n");
201        explanation.push_str(&format!("  Total Entries: {}\n", cache_stats.total_entries));
202        explanation.push_str(&format!(
203            "  Active Entries: {}\n",
204            cache_stats.active_entries
205        ));
206        explanation.push_str(&format!(
207            "  Expired Entries: {}\n\n",
208            cache_stats.expired_entries
209        ));
210
211        Ok(explanation)
212    }
213}
214
215impl Default for QueryOptimizer {
216    fn default() -> Self {
217        Self::new()
218    }
219}
220
221#[cfg(test)]
222mod tests;
223
224#[cfg(test)]
225mod basic_tests {
226    use super::*;
227
228    #[test]
229    fn test_optimizer_creation() {
230        let optimizer = QueryOptimizer::new();
231        // Basic sanity check
232        assert!(std::ptr::eq(&optimizer.analyzer, &optimizer.analyzer));
233    }
234}