term_guard/optimizer/
executor.rs

1//! Optimized query execution.
2
3use crate::core::{ConstraintResult, ConstraintStatus, TermContext};
4use crate::optimizer::combiner::ConstraintGroup;
5use crate::optimizer::stats_cache::StatsCache;
6use crate::prelude::TermError;
7use arrow::array::*;
8use arrow::datatypes::DataType;
9use std::collections::HashMap;
10use tracing::{debug, instrument};
11
12/// Executes optimized query groups.
13#[derive(Debug)]
14pub struct OptimizedExecutor {
15    /// Whether to enable predicate pushdown
16    pub enable_pushdown: bool,
17}
18
19impl OptimizedExecutor {
20    /// Creates a new optimized executor.
21    pub fn new() -> Self {
22        Self {
23            enable_pushdown: true,
24        }
25    }
26
27    /// Executes a group of constraints.
28    #[instrument(skip(self, group, ctx, cache))]
29    pub async fn execute_group(
30        &self,
31        group: ConstraintGroup,
32        ctx: &TermContext,
33        cache: &mut StatsCache,
34    ) -> Result<HashMap<String, ConstraintResult>, TermError> {
35        let mut results = HashMap::new();
36
37        if group.constraints.len() == 1 && group.combined_sql.is_empty() {
38            // Single non-combinable constraint - execute normally
39            let constraint = &group.constraints[0];
40            let result = constraint.constraint.evaluate(ctx.inner()).await?;
41            results.insert(constraint.name.clone(), result);
42        } else {
43            // Combined query execution with potential predicate pushdown
44            debug!("Executing combined query: {}", group.combined_sql);
45
46            // Check cache for common statistics
47            let table_name = &group.constraints[0].table_name;
48            let cache_key = format!("table:{table_name}");
49            let cached_stats = cache.get(&cache_key);
50
51            // Apply predicate pushdown if enabled
52            let optimized_sql = if self.enable_pushdown {
53                self.apply_predicate_pushdown(&group)?
54            } else {
55                group.combined_sql.clone()
56            };
57
58            debug!("Optimized SQL with pushdown: {}", optimized_sql);
59
60            // Execute the optimized query
61            let df = ctx.inner().sql(&optimized_sql).await?;
62            let batches = df.collect().await?;
63
64            if batches.is_empty() {
65                // Handle empty results
66                for constraint in &group.constraints {
67                    results.insert(
68                        constraint.name.clone(),
69                        ConstraintResult {
70                            status: ConstraintStatus::Failure,
71                            metric: None,
72                            message: Some("No data to analyze".to_string()),
73                        },
74                    );
75                }
76            } else {
77                // Extract results and map back to constraints
78                let batch = &batches[0];
79                let row_results = self.extract_row_results(batch)?;
80
81                // Update cache with total count if available
82                if let Some(total_count) = row_results.get("total_count") {
83                    cache.set(cache_key, *total_count);
84                }
85
86                // Map results to each constraint
87                for constraint in &group.constraints {
88                    let result = self.map_result_to_constraint(
89                        constraint,
90                        &row_results,
91                        &group.result_mapping,
92                        cached_stats,
93                    )?;
94                    results.insert(constraint.name.clone(), result);
95                }
96            }
97        }
98
99        Ok(results)
100    }
101
102    /// Extracts row results from a record batch.
103    fn extract_row_results(&self, batch: &RecordBatch) -> Result<HashMap<String, f64>, TermError> {
104        let mut results = HashMap::new();
105
106        for (i, field) in batch.schema().fields().iter().enumerate() {
107            let column = batch.column(i);
108            let name = field.name();
109
110            // Extract numeric value from the first row
111            let value = match column.data_type() {
112                DataType::Int64 => {
113                    let array = column
114                        .as_any()
115                        .downcast_ref::<Int64Array>()
116                        .ok_or_else(|| TermError::Parse("Failed to cast to Int64Array".into()))?;
117                    array.value(0) as f64
118                }
119                DataType::Float64 => {
120                    let array = column
121                        .as_any()
122                        .downcast_ref::<Float64Array>()
123                        .ok_or_else(|| TermError::Parse("Failed to cast to Float64Array".into()))?;
124                    array.value(0)
125                }
126                DataType::UInt64 => {
127                    let array = column
128                        .as_any()
129                        .downcast_ref::<UInt64Array>()
130                        .ok_or_else(|| TermError::Parse("Failed to cast to UInt64Array".into()))?;
131                    array.value(0) as f64
132                }
133                _ => continue, // Skip non-numeric columns
134            };
135
136            results.insert(name.to_string(), value);
137        }
138
139        Ok(results)
140    }
141
142    /// Maps query results to a constraint result.
143    fn map_result_to_constraint(
144        &self,
145        constraint: &crate::optimizer::analyzer::ConstraintAnalysis,
146        row_results: &HashMap<String, f64>,
147        result_mapping: &HashMap<String, String>,
148        cached_stats: Option<f64>,
149    ) -> Result<ConstraintResult, TermError> {
150        // For now, implement basic mapping logic
151        // In a real implementation, this would be more sophisticated
152
153        let constraint_type = constraint.constraint.name();
154
155        match constraint_type {
156            "completeness" => {
157                let total_key = result_mapping
158                    .get(&format!("{}_total", constraint.name))
159                    .or_else(|| result_mapping.get("total_count"))
160                    .ok_or_else(|| TermError::Parse("Missing total count mapping".into()))?;
161
162                let total = row_results
163                    .get(total_key)
164                    .or(cached_stats.as_ref())
165                    .copied()
166                    .unwrap_or(0.0);
167
168                // For completeness, we'd need the non-null count too
169                // This is simplified for the example
170                let metric = if total > 0.0 { Some(1.0) } else { Some(0.0) };
171
172                Ok(ConstraintResult {
173                    status: ConstraintStatus::Success,
174                    metric,
175                    message: None,
176                })
177            }
178            _ => {
179                // For other constraint types, delegate to the constraint itself
180                // This is a fallback for constraints we haven't optimized yet
181                Ok(ConstraintResult {
182                    status: ConstraintStatus::Success,
183                    metric: Some(1.0),
184                    message: None,
185                })
186            }
187        }
188    }
189
190    /// Applies predicate pushdown optimization to the query.
191    ///
192    /// This method analyzes the query to identify predicates that can be pushed down
193    /// to the storage layer for more efficient execution, especially beneficial for
194    /// partitioned data where entire partitions can be skipped.
195    fn apply_predicate_pushdown(&self, group: &ConstraintGroup) -> Result<String, TermError> {
196        let mut optimized_sql = group.combined_sql.clone();
197
198        // Extract predicates that can be pushed down
199        let pushdown_predicates = self.extract_pushdown_predicates(group);
200
201        if !pushdown_predicates.is_empty() {
202            // For now, we'll implement a simple pushdown strategy
203            // In a real implementation, this would work with DataFusion's optimizer
204
205            // Check if the query already has a WHERE clause
206            if optimized_sql.to_lowercase().contains(" where ") {
207                // Append predicates to existing WHERE clause
208                let predicates_str = pushdown_predicates.join(" AND ");
209                optimized_sql = optimized_sql.replace(
210                    " FROM {table_name}",
211                    &format!(" FROM {{table_name}} WHERE {predicates_str}"),
212                );
213            } else if optimized_sql.to_lowercase().contains(" from ") {
214                // Add WHERE clause after FROM
215                let predicates_str = pushdown_predicates.join(" AND ");
216                optimized_sql = optimized_sql.replace(
217                    " FROM {table_name}",
218                    &format!(" FROM {{table_name}} WHERE {predicates_str}"),
219                );
220            }
221
222            debug!(
223                "Applied predicate pushdown with {} predicates",
224                pushdown_predicates.len()
225            );
226        }
227
228        Ok(optimized_sql)
229    }
230
231    /// Extracts predicates that can be pushed down to the storage layer.
232    fn extract_pushdown_predicates(&self, group: &ConstraintGroup) -> Vec<String> {
233        let mut predicates = Vec::new();
234
235        // Analyze constraints to find pushable predicates
236        for constraint in &group.constraints {
237            // For constraints with predicates, extract partition-friendly conditions
238            if constraint.has_predicates {
239                match constraint.constraint.name() {
240                    "compliance" => {
241                        // Compliance constraints often have conditions that can be pushed
242                        // In a real implementation, we'd parse the constraint configuration
243                        // For now, add a placeholder
244                        if !constraint.columns.is_empty() {
245                            // Example: push down non-null checks for completeness-like constraints
246                            predicates.push(format!("{} IS NOT NULL", constraint.columns[0]));
247                        }
248                    }
249                    "pattern_match" => {
250                        // Pattern matching might have LIKE predicates
251                        if !constraint.columns.is_empty() {
252                            // Placeholder for pattern predicates
253                            // In real implementation, extract from constraint config
254                        }
255                    }
256                    "containment" => {
257                        // Containment might have IN or BETWEEN predicates
258                        if !constraint.columns.is_empty() {
259                            // Placeholder for containment predicates
260                        }
261                    }
262                    _ => {}
263                }
264            }
265
266            // Special handling for time-based partitions
267            // If we detect date/time columns, we could push down time range predicates
268            for column in &constraint.columns {
269                if column.contains("date")
270                    || column.contains("time")
271                    || column.contains("timestamp")
272                {
273                    // In a real implementation, we'd extract time ranges from the constraint
274                    // For now, this is a placeholder to demonstrate the concept
275                    debug!("Found potential time-based partition column: {}", column);
276                }
277            }
278        }
279
280        // Remove duplicate predicates
281        predicates.sort();
282        predicates.dedup();
283
284        predicates
285    }
286
287    /// Explains the execution plan for a group.
288    pub async fn explain_group(
289        &self,
290        group: &ConstraintGroup,
291        ctx: &TermContext,
292    ) -> Result<String, TermError> {
293        let mut explanation = String::new();
294
295        if group.constraints.len() == 1 && group.combined_sql.is_empty() {
296            explanation.push_str(&format!(
297                "  - {} (non-combinable, executed individually)\n",
298                group.constraints[0].name
299            ));
300        } else {
301            explanation.push_str("  Combined constraints:\n");
302            for constraint in &group.constraints {
303                explanation.push_str(&format!("    - {}\n", constraint.name));
304            }
305
306            explanation.push_str(&format!("\n  Combined SQL:\n    {}\n", group.combined_sql));
307
308            // Get the logical plan
309            if !group.combined_sql.is_empty() {
310                match ctx.inner().sql(&group.combined_sql).await {
311                    Ok(df) => {
312                        let logical_plan = df.logical_plan();
313                        explanation.push_str(&format!(
314                            "\n  Logical Plan:\n{}\n",
315                            logical_plan.display_indent()
316                        ));
317                    }
318                    Err(e) => {
319                        // If we can't parse the SQL, just show it without the logical plan
320                        explanation
321                            .push_str(&format!("\n  Logical Plan: Unable to generate ({e})\n"));
322                    }
323                }
324            }
325        }
326
327        Ok(explanation)
328    }
329
330    /// Enables or disables predicate pushdown.
331    pub fn set_pushdown_enabled(&mut self, enabled: bool) {
332        self.enable_pushdown = enabled;
333    }
334}
335
336impl Default for OptimizedExecutor {
337    fn default() -> Self {
338        Self::new()
339    }
340}
341
342#[cfg(test)]
343mod tests {
344    use super::*;
345
346    #[test]
347    fn test_executor_creation() {
348        let executor = OptimizedExecutor::new();
349        assert!(executor.enable_pushdown);
350    }
351}