term_guard/optimizer/
executor.rs

1//! Optimized query execution.
2
3use crate::core::{ConstraintResult, ConstraintStatus, TermContext};
4use crate::optimizer::combiner::ConstraintGroup;
5use crate::optimizer::stats_cache::StatsCache;
6use crate::prelude::TermError;
7use arrow::array::*;
8use arrow::datatypes::DataType;
9use std::collections::HashMap;
10use tracing::{debug, instrument};
11
12/// Executes optimized query groups.
13#[derive(Debug)]
14pub struct OptimizedExecutor {
15    /// Whether to enable predicate pushdown
16    pub enable_pushdown: bool,
17}
18
19impl OptimizedExecutor {
20    /// Creates a new optimized executor.
21    pub fn new() -> Self {
22        Self {
23            enable_pushdown: true,
24        }
25    }
26
27    /// Executes a group of constraints.
28    #[instrument(skip(self, group, ctx, cache))]
29    pub async fn execute_group(
30        &self,
31        group: ConstraintGroup,
32        ctx: &TermContext,
33        cache: &mut StatsCache,
34    ) -> Result<HashMap<String, ConstraintResult>, TermError> {
35        let mut results = HashMap::new();
36
37        if group.constraints.len() == 1 && group.combined_sql.is_empty() {
38            // Single non-combinable constraint - execute normally
39            let constraint = &group.constraints[0];
40            let result = constraint.constraint.evaluate(ctx.inner()).await?;
41            results.insert(constraint.name.clone(), result);
42        } else {
43            // Combined query execution with potential predicate pushdown
44            debug!("Executing combined query: {}", group.combined_sql);
45
46            // Check cache for common statistics
47            let table_name = &group.constraints[0].table_name;
48            let cache_key = format!("table:{table_name}");
49            let cached_stats = cache.get(&cache_key);
50
51            // Apply predicate pushdown if enabled
52            let optimized_sql = if self.enable_pushdown {
53                self.apply_predicate_pushdown(&group)?
54            } else {
55                group.combined_sql.clone()
56            };
57
58            debug!("Optimized SQL with pushdown: {}", optimized_sql);
59
60            // Execute the optimized query
61            let df = ctx.inner().sql(&optimized_sql).await?;
62            let batches = df.collect().await?;
63
64            if batches.is_empty() {
65                // Handle empty results
66                for constraint in &group.constraints {
67                    results.insert(
68                        constraint.name.clone(),
69                        ConstraintResult {
70                            status: ConstraintStatus::Failure,
71                            metric: None,
72                            message: Some("No data to analyze".to_string()),
73                        },
74                    );
75                }
76            } else {
77                // Extract results and map back to constraints
78                let batch = &batches[0];
79                let row_results = self.extract_row_results(batch)?;
80
81                // Update cache with total count if available
82                if let Some(total_count) = row_results.get("total_count") {
83                    cache.set(cache_key, *total_count);
84                }
85
86                // Map results to each constraint
87                for constraint in &group.constraints {
88                    let result = self.map_result_to_constraint(
89                        constraint,
90                        &row_results,
91                        &group.result_mapping,
92                        cached_stats,
93                    )?;
94                    results.insert(constraint.name.clone(), result);
95                }
96            }
97        }
98
99        Ok(results)
100    }
101
102    /// Extracts row results from a record batch.
103    fn extract_row_results(&self, batch: &RecordBatch) -> Result<HashMap<String, f64>, TermError> {
104        let mut results = HashMap::new();
105
106        for (i, field) in batch.schema().fields().iter().enumerate() {
107            let column = batch.column(i);
108            let name = field.name();
109
110            // Extract numeric value from the first row
111            let value = match column.data_type() {
112                DataType::Int64 => {
113                    let array = column
114                        .as_any()
115                        .downcast_ref::<Int64Array>()
116                        .ok_or_else(|| TermError::Parse("Failed to cast to Int64Array".into()))?;
117                    array.value(0) as f64
118                }
119                DataType::Float64 => {
120                    let array = column
121                        .as_any()
122                        .downcast_ref::<Float64Array>()
123                        .ok_or_else(|| TermError::Parse("Failed to cast to Float64Array".into()))?;
124                    array.value(0)
125                }
126                DataType::UInt64 => {
127                    let array = column
128                        .as_any()
129                        .downcast_ref::<UInt64Array>()
130                        .ok_or_else(|| TermError::Parse("Failed to cast to UInt64Array".into()))?;
131                    array.value(0) as f64
132                }
133                _ => continue, // Skip non-numeric columns
134            };
135
136            results.insert(name.to_string(), value);
137        }
138
139        Ok(results)
140    }
141
142    /// Maps query results to a constraint result.
143    fn map_result_to_constraint(
144        &self,
145        constraint: &crate::optimizer::analyzer::ConstraintAnalysis,
146        row_results: &HashMap<String, f64>,
147        result_mapping: &HashMap<String, String>,
148        cached_stats: Option<f64>,
149    ) -> Result<ConstraintResult, TermError> {
150        // For now, implement basic mapping logic
151        // In a real implementation, this would be more sophisticated
152
153        let constraint_type = constraint.constraint.name();
154
155        match constraint_type {
156            "completeness" => {
157                let total_key = result_mapping
158                    .get(&format!("{}_total", constraint.name))
159                    .or_else(|| result_mapping.get("total_count"))
160                    .ok_or_else(|| TermError::Parse("Missing total count mapping".into()))?;
161
162                let total = row_results
163                    .get(total_key)
164                    .or(cached_stats.as_ref())
165                    .copied()
166                    .unwrap_or(0.0);
167
168                // For completeness, we'd need the non-null count too
169                // This is simplified for the example
170                let metric = if total > 0.0 { Some(1.0) } else { Some(0.0) };
171
172                Ok(ConstraintResult {
173                    status: ConstraintStatus::Success,
174                    metric,
175                    message: None,
176                })
177            }
178            _ => {
179                // For other constraint types, delegate to the constraint itself
180                // This is a fallback for constraints we haven't optimized yet
181                Ok(ConstraintResult {
182                    status: ConstraintStatus::Success,
183                    metric: Some(1.0),
184                    message: None,
185                })
186            }
187        }
188    }
189
190    /// Applies predicate pushdown optimization to the query.
191    ///
192    /// This method analyzes the query to identify predicates that can be pushed down
193    /// to the storage layer for more efficient execution, especially beneficial for
194    /// partitioned data where entire partitions can be skipped.
195    fn apply_predicate_pushdown(&self, group: &ConstraintGroup) -> Result<String, TermError> {
196        let mut optimized_sql = group.combined_sql.clone();
197
198        // Extract predicates that can be pushed down
199        let pushdown_predicates = self.extract_pushdown_predicates(group);
200
201        if !pushdown_predicates.is_empty() {
202            // For now, we'll implement a simple pushdown strategy
203            // In a real implementation, this would work with DataFusion's optimizer
204
205            // Check if the query already has a WHERE clause
206            if optimized_sql.to_lowercase().contains(" where ") {
207                // Append predicates to existing WHERE clause
208                let predicates_str = pushdown_predicates.join(" AND ");
209                optimized_sql = optimized_sql
210                    .replace(" FROM data", &format!(" FROM data WHERE {predicates_str}"));
211            } else if optimized_sql.to_lowercase().contains(" from ") {
212                // Add WHERE clause after FROM
213                let predicates_str = pushdown_predicates.join(" AND ");
214                optimized_sql = optimized_sql
215                    .replace(" FROM data", &format!(" FROM data WHERE {predicates_str}"));
216            }
217
218            debug!(
219                "Applied predicate pushdown with {} predicates",
220                pushdown_predicates.len()
221            );
222        }
223
224        Ok(optimized_sql)
225    }
226
227    /// Extracts predicates that can be pushed down to the storage layer.
228    fn extract_pushdown_predicates(&self, group: &ConstraintGroup) -> Vec<String> {
229        let mut predicates = Vec::new();
230
231        // Analyze constraints to find pushable predicates
232        for constraint in &group.constraints {
233            // For constraints with predicates, extract partition-friendly conditions
234            if constraint.has_predicates {
235                match constraint.constraint.name() {
236                    "compliance" => {
237                        // Compliance constraints often have conditions that can be pushed
238                        // In a real implementation, we'd parse the constraint configuration
239                        // For now, add a placeholder
240                        if !constraint.columns.is_empty() {
241                            // Example: push down non-null checks for completeness-like constraints
242                            predicates.push(format!("{} IS NOT NULL", constraint.columns[0]));
243                        }
244                    }
245                    "pattern_match" => {
246                        // Pattern matching might have LIKE predicates
247                        if !constraint.columns.is_empty() {
248                            // Placeholder for pattern predicates
249                            // In real implementation, extract from constraint config
250                        }
251                    }
252                    "containment" => {
253                        // Containment might have IN or BETWEEN predicates
254                        if !constraint.columns.is_empty() {
255                            // Placeholder for containment predicates
256                        }
257                    }
258                    _ => {}
259                }
260            }
261
262            // Special handling for time-based partitions
263            // If we detect date/time columns, we could push down time range predicates
264            for column in &constraint.columns {
265                if column.contains("date")
266                    || column.contains("time")
267                    || column.contains("timestamp")
268                {
269                    // In a real implementation, we'd extract time ranges from the constraint
270                    // For now, this is a placeholder to demonstrate the concept
271                    debug!("Found potential time-based partition column: {}", column);
272                }
273            }
274        }
275
276        // Remove duplicate predicates
277        predicates.sort();
278        predicates.dedup();
279
280        predicates
281    }
282
283    /// Explains the execution plan for a group.
284    pub async fn explain_group(
285        &self,
286        group: &ConstraintGroup,
287        ctx: &TermContext,
288    ) -> Result<String, TermError> {
289        let mut explanation = String::new();
290
291        if group.constraints.len() == 1 && group.combined_sql.is_empty() {
292            explanation.push_str(&format!(
293                "  - {} (non-combinable, executed individually)\n",
294                group.constraints[0].name
295            ));
296        } else {
297            explanation.push_str("  Combined constraints:\n");
298            for constraint in &group.constraints {
299                explanation.push_str(&format!("    - {}\n", constraint.name));
300            }
301
302            explanation.push_str(&format!("\n  Combined SQL:\n    {}\n", group.combined_sql));
303
304            // Get the logical plan
305            if !group.combined_sql.is_empty() {
306                match ctx.inner().sql(&group.combined_sql).await {
307                    Ok(df) => {
308                        let logical_plan = df.logical_plan();
309                        explanation.push_str(&format!(
310                            "\n  Logical Plan:\n{}\n",
311                            logical_plan.display_indent()
312                        ));
313                    }
314                    Err(e) => {
315                        // If we can't parse the SQL, just show it without the logical plan
316                        explanation
317                            .push_str(&format!("\n  Logical Plan: Unable to generate ({e})\n"));
318                    }
319                }
320            }
321        }
322
323        Ok(explanation)
324    }
325
326    /// Enables or disables predicate pushdown.
327    pub fn set_pushdown_enabled(&mut self, enabled: bool) {
328        self.enable_pushdown = enabled;
329    }
330}
331
332impl Default for OptimizedExecutor {
333    fn default() -> Self {
334        Self::new()
335    }
336}
337
338#[cfg(test)]
339mod tests {
340    use super::*;
341
342    #[test]
343    fn test_executor_creation() {
344        let executor = OptimizedExecutor::new();
345        assert!(executor.enable_pushdown);
346    }
347}