elif_orm/loading/optimizer/
analyzer.rs1use crate::{
2 error::{OrmError, OrmResult},
3 relationships::RelationshipType,
4};
5use super::plan::{QueryPlan, QueryNode, PlanStatistics};
6use std::collections::{HashMap, HashSet};
7
8#[derive(Debug, Clone)]
10pub struct PlanAnalysis {
11 pub complexity_score: f64,
13 pub estimated_execution_time: u64,
15 pub bottlenecks: Vec<String>,
17 pub recommendations: Vec<String>,
19 pub risk_level: RiskLevel,
21 pub statistics: PlanStatistics,
23}
24
25#[derive(Debug, Clone, PartialEq)]
27pub enum RiskLevel {
28 Low, Medium, High, Critical, }
33
34#[derive(Debug, Clone)]
36pub enum OptimizationStrategy {
37 IncreaseParallelism,
39 ReduceBatchSize,
41 AddConstraints,
43 ReorderPhases,
45 SplitQueries,
47 SuggestIndexes(Vec<String>),
49}
50
51pub struct QueryOptimizer {
53 max_complexity: f64,
55 target_execution_time: u64,
57 detailed_analysis: bool,
59}
60
61impl QueryOptimizer {
62 pub fn new() -> Self {
64 Self {
65 max_complexity: 100.0,
66 target_execution_time: 5000, detailed_analysis: true,
68 }
69 }
70
71 pub fn with_settings(
73 max_complexity: f64,
74 target_execution_time: u64,
75 detailed_analysis: bool,
76 ) -> Self {
77 Self {
78 max_complexity,
79 target_execution_time,
80 detailed_analysis,
81 }
82 }
83
84 pub fn analyze_plan(&self, plan: &QueryPlan) -> OrmResult<PlanAnalysis> {
86 let statistics = plan.statistics();
87 let complexity_score = plan.complexity_score();
88
89 let estimated_execution_time = self.estimate_execution_time(plan);
91
92 let bottlenecks = self.identify_bottlenecks(plan);
94
95 let recommendations = self.generate_recommendations(plan, &bottlenecks);
97
98 let risk_level = self.assess_risk_level(complexity_score, estimated_execution_time, &bottlenecks);
100
101 Ok(PlanAnalysis {
102 complexity_score,
103 estimated_execution_time,
104 bottlenecks,
105 recommendations,
106 risk_level,
107 statistics,
108 })
109 }
110
111 pub fn optimize_plan(&self, plan: &mut QueryPlan) -> OrmResult<Vec<OptimizationStrategy>> {
113 let mut applied_strategies = Vec::new();
114
115 let analysis = self.analyze_plan(plan)?;
117
118 if analysis.complexity_score > self.max_complexity {
120 if self.can_increase_parallelism(plan) {
122 self.increase_parallelism(plan)?;
123 applied_strategies.push(OptimizationStrategy::IncreaseParallelism);
124 }
125
126 if self.should_split_queries(plan) {
127 applied_strategies.push(OptimizationStrategy::SplitQueries);
128 }
129 }
130
131 if analysis.estimated_execution_time > self.target_execution_time {
132 if self.can_reorder_phases(plan) {
134 self.reorder_phases(plan)?;
135 applied_strategies.push(OptimizationStrategy::ReorderPhases);
136 }
137
138 let index_suggestions = self.suggest_indexes(plan);
140 if !index_suggestions.is_empty() {
141 applied_strategies.push(OptimizationStrategy::SuggestIndexes(index_suggestions));
142 }
143 }
144
145 plan.build_execution_phases()?;
147
148 Ok(applied_strategies)
149 }
150
151 fn estimate_execution_time(&self, plan: &QueryPlan) -> u64 {
153 let mut total_time = 0u64;
154
155 let base_time_per_node = 10; let row_processing_time = plan.total_estimated_rows as u64 / 1000; let depth_penalty = (plan.max_depth as u64) * 50; let phase_time: u64 = plan.execution_phases.iter().map(|phase| {
166 if phase.len() == 1 {
167 base_time_per_node * 2 } else {
169 base_time_per_node }
171 }).sum();
172
173 total_time = base_time_per_node * plan.nodes.len() as u64
174 + row_processing_time
175 + depth_penalty
176 + phase_time;
177
178 total_time
179 }
180
181 fn identify_bottlenecks(&self, plan: &QueryPlan) -> Vec<String> {
183 let mut bottlenecks = Vec::new();
184
185 for (id, node) in &plan.nodes {
187 if node.estimated_rows > 10000 {
188 bottlenecks.push(format!("High row count in node '{}': {} rows", id, node.estimated_rows));
189 }
190 }
191
192 if plan.max_depth > 5 {
194 bottlenecks.push(format!("Deep nesting detected: {} levels", plan.max_depth));
195 }
196
197 for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
199 if phase.len() == 1 {
200 let node_id = &phase[0];
201 if let Some(node) = plan.nodes.get(node_id) {
202 if !node.parallel_safe {
203 bottlenecks.push(format!("Sequential bottleneck in phase {}: node '{}'", phase_idx, node_id));
204 }
205 }
206 }
207 }
208
209 let avg_phase_size: f64 = plan.execution_phases.iter()
211 .map(|p| p.len())
212 .sum::<usize>() as f64 / plan.execution_phases.len() as f64;
213
214 for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
215 if phase.len() as f64 > avg_phase_size * 3.0 {
216 bottlenecks.push(format!("Unbalanced phase {}: {} nodes (avg: {:.1})",
217 phase_idx, phase.len(), avg_phase_size));
218 }
219 }
220
221 bottlenecks
222 }
223
224 fn generate_recommendations(&self, plan: &QueryPlan, bottlenecks: &[String]) -> Vec<String> {
226 let mut recommendations = Vec::new();
227
228 if plan.max_depth > 3 {
230 recommendations.push("Consider limiting relationship depth to improve performance".to_string());
231 }
232
233 if plan.total_estimated_rows > 50000 {
234 recommendations.push("Consider adding query constraints to reduce data volume".to_string());
235 }
236
237 let parallel_nodes = plan.nodes.values().filter(|n| n.parallel_safe).count();
238 let total_nodes = plan.nodes.len();
239 if parallel_nodes < total_nodes / 2 {
240 recommendations.push("Consider making more queries parallel-safe to improve throughput".to_string());
241 }
242
243 for bottleneck in bottlenecks {
245 if bottleneck.contains("High row count") {
246 recommendations.push("Consider adding pagination or filtering to reduce row counts".to_string());
247 } else if bottleneck.contains("Deep nesting") {
248 recommendations.push("Consider flattening the relationship structure or using separate queries".to_string());
249 } else if bottleneck.contains("Sequential bottleneck") {
250 recommendations.push("Consider optimizing sequential queries for parallel execution".to_string());
251 }
252 }
253
254 recommendations.push("Ensure proper indexes exist on foreign key columns".to_string());
256 recommendations.push("Consider using connection pooling for better resource utilization".to_string());
257
258 recommendations.sort();
260 recommendations.dedup();
261
262 recommendations
263 }
264
265 fn assess_risk_level(
267 &self,
268 complexity_score: f64,
269 estimated_time: u64,
270 bottlenecks: &[String],
271 ) -> RiskLevel {
272 let bottleneck_count = bottlenecks.len();
273
274 if complexity_score > self.max_complexity * 2.0
275 || estimated_time > self.target_execution_time * 3
276 || bottleneck_count > 5 {
277 RiskLevel::Critical
278 } else if complexity_score > self.max_complexity
279 || estimated_time > self.target_execution_time
280 || bottleneck_count > 2 {
281 RiskLevel::High
282 } else if complexity_score > self.max_complexity * 0.7
283 || estimated_time > (self.target_execution_time as f64 * 0.7) as u64
284 || bottleneck_count > 0 {
285 RiskLevel::Medium
286 } else {
287 RiskLevel::Low
288 }
289 }
290
291 fn can_increase_parallelism(&self, plan: &QueryPlan) -> bool {
293 plan.nodes.values().any(|node| !node.parallel_safe)
294 }
295
296 fn increase_parallelism(&self, plan: &mut QueryPlan) -> OrmResult<()> {
298 for node in plan.nodes.values_mut() {
299 if !node.parallel_safe && node.constraints.is_empty() {
301 node.set_parallel_safe(true);
302 }
303 }
304 Ok(())
305 }
306
307 fn should_split_queries(&self, plan: &QueryPlan) -> bool {
309 plan.nodes.values().any(|node| node.estimated_rows > 50000)
310 }
311
312 fn can_reorder_phases(&self, plan: &QueryPlan) -> bool {
314 plan.execution_phases.len() > 1
315 }
316
317 fn reorder_phases(&self, plan: &mut QueryPlan) -> OrmResult<()> {
319 plan.execution_phases.sort_by(|a, b| {
321 let a_complexity: usize = a.iter()
322 .filter_map(|id| plan.nodes.get(id))
323 .map(|node| node.estimated_rows)
324 .sum();
325 let b_complexity: usize = b.iter()
326 .filter_map(|id| plan.nodes.get(id))
327 .map(|node| node.estimated_rows)
328 .sum();
329
330 a_complexity.cmp(&b_complexity)
331 });
332
333 Ok(())
334 }
335
336 fn suggest_indexes(&self, plan: &QueryPlan) -> Vec<String> {
338 let mut suggestions = Vec::new();
339 let mut suggested_tables = HashSet::new();
340
341 for node in plan.nodes.values() {
342 if !suggested_tables.contains(&node.table) {
343 suggestions.push(format!("CREATE INDEX idx_{}_id ON {} (id)", node.table, node.table));
344
345 if let Some(fk) = &node.foreign_key {
346 suggestions.push(format!("CREATE INDEX idx_{}_{} ON {} ({})",
347 node.table, fk, node.table, fk));
348 }
349
350 suggested_tables.insert(node.table.clone());
351 }
352 }
353
354 suggestions
355 }
356}
357
358impl Default for QueryOptimizer {
359 fn default() -> Self {
360 Self::new()
361 }
362}