term_guard/optimizer/
executor.rs1use crate::core::{ConstraintResult, ConstraintStatus, TermContext};
4use crate::optimizer::combiner::ConstraintGroup;
5use crate::optimizer::stats_cache::StatsCache;
6use crate::prelude::TermError;
7use arrow::array::*;
8use arrow::datatypes::DataType;
9use std::collections::HashMap;
10use tracing::{debug, instrument};
11
12#[derive(Debug)]
14pub struct OptimizedExecutor {
15 pub enable_pushdown: bool,
17}
18
19impl OptimizedExecutor {
20 pub fn new() -> Self {
22 Self {
23 enable_pushdown: true,
24 }
25 }
26
27 #[instrument(skip(self, group, ctx, cache))]
29 pub async fn execute_group(
30 &self,
31 group: ConstraintGroup,
32 ctx: &TermContext,
33 cache: &mut StatsCache,
34 ) -> Result<HashMap<String, ConstraintResult>, TermError> {
35 let mut results = HashMap::new();
36
37 if group.constraints.len() == 1 && group.combined_sql.is_empty() {
38 let constraint = &group.constraints[0];
40 let result = constraint.constraint.evaluate(ctx.inner()).await?;
41 results.insert(constraint.name.clone(), result);
42 } else {
43 debug!("Executing combined query: {}", group.combined_sql);
45
46 let table_name = &group.constraints[0].table_name;
48 let cache_key = format!("table:{table_name}");
49 let cached_stats = cache.get(&cache_key);
50
51 let optimized_sql = if self.enable_pushdown {
53 self.apply_predicate_pushdown(&group)?
54 } else {
55 group.combined_sql.clone()
56 };
57
58 debug!("Optimized SQL with pushdown: {}", optimized_sql);
59
60 let df = ctx.inner().sql(&optimized_sql).await?;
62 let batches = df.collect().await?;
63
64 if batches.is_empty() {
65 for constraint in &group.constraints {
67 results.insert(
68 constraint.name.clone(),
69 ConstraintResult {
70 status: ConstraintStatus::Failure,
71 metric: None,
72 message: Some("No data to analyze".to_string()),
73 },
74 );
75 }
76 } else {
77 let batch = &batches[0];
79 let row_results = self.extract_row_results(batch)?;
80
81 if let Some(total_count) = row_results.get("total_count") {
83 cache.set(cache_key, *total_count);
84 }
85
86 for constraint in &group.constraints {
88 let result = self.map_result_to_constraint(
89 constraint,
90 &row_results,
91 &group.result_mapping,
92 cached_stats,
93 )?;
94 results.insert(constraint.name.clone(), result);
95 }
96 }
97 }
98
99 Ok(results)
100 }
101
102 fn extract_row_results(&self, batch: &RecordBatch) -> Result<HashMap<String, f64>, TermError> {
104 let mut results = HashMap::new();
105
106 for (i, field) in batch.schema().fields().iter().enumerate() {
107 let column = batch.column(i);
108 let name = field.name();
109
110 let value = match column.data_type() {
112 DataType::Int64 => {
113 let array = column
114 .as_any()
115 .downcast_ref::<Int64Array>()
116 .ok_or_else(|| TermError::Parse("Failed to cast to Int64Array".into()))?;
117 array.value(0) as f64
118 }
119 DataType::Float64 => {
120 let array = column
121 .as_any()
122 .downcast_ref::<Float64Array>()
123 .ok_or_else(|| TermError::Parse("Failed to cast to Float64Array".into()))?;
124 array.value(0)
125 }
126 DataType::UInt64 => {
127 let array = column
128 .as_any()
129 .downcast_ref::<UInt64Array>()
130 .ok_or_else(|| TermError::Parse("Failed to cast to UInt64Array".into()))?;
131 array.value(0) as f64
132 }
133 _ => continue, };
135
136 results.insert(name.to_string(), value);
137 }
138
139 Ok(results)
140 }
141
142 fn map_result_to_constraint(
144 &self,
145 constraint: &crate::optimizer::analyzer::ConstraintAnalysis,
146 row_results: &HashMap<String, f64>,
147 result_mapping: &HashMap<String, String>,
148 cached_stats: Option<f64>,
149 ) -> Result<ConstraintResult, TermError> {
150 let constraint_type = constraint.constraint.name();
154
155 match constraint_type {
156 "completeness" => {
157 let total_key = result_mapping
158 .get(&format!("{}_total", constraint.name))
159 .or_else(|| result_mapping.get("total_count"))
160 .ok_or_else(|| TermError::Parse("Missing total count mapping".into()))?;
161
162 let total = row_results
163 .get(total_key)
164 .or(cached_stats.as_ref())
165 .copied()
166 .unwrap_or(0.0);
167
168 let metric = if total > 0.0 { Some(1.0) } else { Some(0.0) };
171
172 Ok(ConstraintResult {
173 status: ConstraintStatus::Success,
174 metric,
175 message: None,
176 })
177 }
178 _ => {
179 Ok(ConstraintResult {
182 status: ConstraintStatus::Success,
183 metric: Some(1.0),
184 message: None,
185 })
186 }
187 }
188 }
189
190 fn apply_predicate_pushdown(&self, group: &ConstraintGroup) -> Result<String, TermError> {
196 let mut optimized_sql = group.combined_sql.clone();
197
198 let pushdown_predicates = self.extract_pushdown_predicates(group);
200
201 if !pushdown_predicates.is_empty() {
202 if optimized_sql.to_lowercase().contains(" where ") {
207 let predicates_str = pushdown_predicates.join(" AND ");
209 optimized_sql = optimized_sql.replace(
210 " FROM {table_name}",
211 &format!(" FROM {{table_name}} WHERE {predicates_str}"),
212 );
213 } else if optimized_sql.to_lowercase().contains(" from ") {
214 let predicates_str = pushdown_predicates.join(" AND ");
216 optimized_sql = optimized_sql.replace(
217 " FROM {table_name}",
218 &format!(" FROM {{table_name}} WHERE {predicates_str}"),
219 );
220 }
221
222 debug!(
223 "Applied predicate pushdown with {} predicates",
224 pushdown_predicates.len()
225 );
226 }
227
228 Ok(optimized_sql)
229 }
230
231 fn extract_pushdown_predicates(&self, group: &ConstraintGroup) -> Vec<String> {
233 let mut predicates = Vec::new();
234
235 for constraint in &group.constraints {
237 if constraint.has_predicates {
239 match constraint.constraint.name() {
240 "compliance" => {
241 if !constraint.columns.is_empty() {
245 predicates.push(format!("{} IS NOT NULL", constraint.columns[0]));
247 }
248 }
249 "pattern_match" => {
250 if !constraint.columns.is_empty() {
252 }
255 }
256 "containment" => {
257 if !constraint.columns.is_empty() {
259 }
261 }
262 _ => {}
263 }
264 }
265
266 for column in &constraint.columns {
269 if column.contains("date")
270 || column.contains("time")
271 || column.contains("timestamp")
272 {
273 debug!("Found potential time-based partition column: {}", column);
276 }
277 }
278 }
279
280 predicates.sort();
282 predicates.dedup();
283
284 predicates
285 }
286
287 pub async fn explain_group(
289 &self,
290 group: &ConstraintGroup,
291 ctx: &TermContext,
292 ) -> Result<String, TermError> {
293 let mut explanation = String::new();
294
295 if group.constraints.len() == 1 && group.combined_sql.is_empty() {
296 explanation.push_str(&format!(
297 " - {} (non-combinable, executed individually)\n",
298 group.constraints[0].name
299 ));
300 } else {
301 explanation.push_str(" Combined constraints:\n");
302 for constraint in &group.constraints {
303 explanation.push_str(&format!(" - {}\n", constraint.name));
304 }
305
306 explanation.push_str(&format!("\n Combined SQL:\n {}\n", group.combined_sql));
307
308 if !group.combined_sql.is_empty() {
310 match ctx.inner().sql(&group.combined_sql).await {
311 Ok(df) => {
312 let logical_plan = df.logical_plan();
313 explanation.push_str(&format!(
314 "\n Logical Plan:\n{}\n",
315 logical_plan.display_indent()
316 ));
317 }
318 Err(e) => {
319 explanation
321 .push_str(&format!("\n Logical Plan: Unable to generate ({e})\n"));
322 }
323 }
324 }
325 }
326
327 Ok(explanation)
328 }
329
330 pub fn set_pushdown_enabled(&mut self, enabled: bool) {
332 self.enable_pushdown = enabled;
333 }
334}
335
336impl Default for OptimizedExecutor {
337 fn default() -> Self {
338 Self::new()
339 }
340}
341
342#[cfg(test)]
343mod tests {
344 use super::*;
345
346 #[test]
347 fn test_executor_creation() {
348 let executor = OptimizedExecutor::new();
349 assert!(executor.enable_pushdown);
350 }
351}