elif_orm/loading/optimizer/
analyzer.rs1use super::plan::{PlanStatistics, QueryPlan};
2use crate::error::OrmResult;
3use std::collections::HashSet;
4
5#[derive(Debug, Clone)]
7pub struct PlanAnalysis {
8 pub complexity_score: f64,
10 pub estimated_execution_time: u64,
12 pub bottlenecks: Vec<String>,
14 pub recommendations: Vec<String>,
16 pub risk_level: RiskLevel,
18 pub statistics: PlanStatistics,
20}
21
22#[derive(Debug, Clone, PartialEq)]
24pub enum RiskLevel {
25 Low, Medium, High, Critical, }
30
31#[derive(Debug, Clone)]
33pub enum OptimizationStrategy {
34 IncreaseParallelism,
36 ReduceBatchSize,
38 AddConstraints,
40 ReorderPhases,
42 SplitQueries,
44 SuggestIndexes(Vec<String>),
46}
47
48pub struct QueryOptimizer {
50 max_complexity: f64,
52 target_execution_time: u64,
54}
55
56impl QueryOptimizer {
57 pub fn new() -> Self {
59 Self {
60 max_complexity: 100.0,
61 target_execution_time: 5000, }
63 }
64
65 pub fn with_settings(max_complexity: f64, target_execution_time: u64) -> Self {
67 Self {
68 max_complexity,
69 target_execution_time,
70 }
71 }
72
73 pub fn analyze_plan(&self, plan: &QueryPlan) -> OrmResult<PlanAnalysis> {
75 let statistics = plan.statistics();
76 let complexity_score = plan.complexity_score();
77
78 let estimated_execution_time = self.estimate_execution_time(plan);
80
81 let bottlenecks = self.identify_bottlenecks(plan);
83
84 let recommendations = self.generate_recommendations(plan, &bottlenecks);
86
87 let risk_level =
89 self.assess_risk_level(complexity_score, estimated_execution_time, &bottlenecks);
90
91 Ok(PlanAnalysis {
92 complexity_score,
93 estimated_execution_time,
94 bottlenecks,
95 recommendations,
96 risk_level,
97 statistics,
98 })
99 }
100
101 pub fn optimize_plan(&self, plan: &mut QueryPlan) -> OrmResult<Vec<OptimizationStrategy>> {
103 let mut applied_strategies = Vec::new();
104
105 let analysis = self.analyze_plan(plan)?;
107
108 if analysis.complexity_score > self.max_complexity {
110 if self.can_increase_parallelism(plan) {
112 self.increase_parallelism(plan)?;
113 applied_strategies.push(OptimizationStrategy::IncreaseParallelism);
114 }
115
116 if self.should_split_queries(plan) {
117 applied_strategies.push(OptimizationStrategy::SplitQueries);
118 }
119 }
120
121 if analysis.estimated_execution_time > self.target_execution_time {
122 if self.can_reorder_phases(plan) {
124 self.reorder_phases(plan)?;
125 applied_strategies.push(OptimizationStrategy::ReorderPhases);
126 }
127
128 let index_suggestions = self.suggest_indexes(plan);
130 if !index_suggestions.is_empty() {
131 applied_strategies.push(OptimizationStrategy::SuggestIndexes(index_suggestions));
132 }
133 }
134
135 plan.build_execution_phases()?;
137
138 Ok(applied_strategies)
139 }
140
141 fn estimate_execution_time(&self, plan: &QueryPlan) -> u64 {
143 let base_time_per_node = 10; let row_processing_time = plan.total_estimated_rows as u64 / 1000; let depth_penalty = (plan.max_depth as u64) * 50; let phase_time: u64 = plan
154 .execution_phases
155 .iter()
156 .map(|phase| {
157 if phase.len() == 1 {
158 base_time_per_node * 2 } else {
160 base_time_per_node }
162 })
163 .sum();
164
165 base_time_per_node * plan.nodes.len() as u64
166 + row_processing_time
167 + depth_penalty
168 + phase_time
169 }
170
171 fn identify_bottlenecks(&self, plan: &QueryPlan) -> Vec<String> {
173 let mut bottlenecks = Vec::new();
174
175 for (id, node) in &plan.nodes {
177 if node.estimated_rows > 10000 {
178 bottlenecks.push(format!(
179 "High row count in node '{}': {} rows",
180 id, node.estimated_rows
181 ));
182 }
183 }
184
185 if plan.max_depth > 5 {
187 bottlenecks.push(format!("Deep nesting detected: {} levels", plan.max_depth));
188 }
189
190 for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
192 if phase.len() == 1 {
193 let node_id = &phase[0];
194 if let Some(node) = plan.nodes.get(node_id) {
195 if !node.parallel_safe {
196 bottlenecks.push(format!(
197 "Sequential bottleneck in phase {}: node '{}'",
198 phase_idx, node_id
199 ));
200 }
201 }
202 }
203 }
204
205 let avg_phase_size: f64 = plan.execution_phases.iter().map(|p| p.len()).sum::<usize>()
207 as f64
208 / plan.execution_phases.len() as f64;
209
210 for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
211 if phase.len() as f64 > avg_phase_size * 3.0 {
212 bottlenecks.push(format!(
213 "Unbalanced phase {}: {} nodes (avg: {:.1})",
214 phase_idx,
215 phase.len(),
216 avg_phase_size
217 ));
218 }
219 }
220
221 bottlenecks
222 }
223
224 fn generate_recommendations(&self, plan: &QueryPlan, bottlenecks: &[String]) -> Vec<String> {
226 let mut recommendations = Vec::new();
227
228 if plan.max_depth > 3 {
230 recommendations
231 .push("Consider limiting relationship depth to improve performance".to_string());
232 }
233
234 if plan.total_estimated_rows > 50000 {
235 recommendations
236 .push("Consider adding query constraints to reduce data volume".to_string());
237 }
238
239 let parallel_nodes = plan.nodes.values().filter(|n| n.parallel_safe).count();
240 let total_nodes = plan.nodes.len();
241 if parallel_nodes < total_nodes / 2 {
242 recommendations.push(
243 "Consider making more queries parallel-safe to improve throughput".to_string(),
244 );
245 }
246
247 for bottleneck in bottlenecks {
249 if bottleneck.contains("High row count") {
250 recommendations.push(
251 "Consider adding pagination or filtering to reduce row counts".to_string(),
252 );
253 } else if bottleneck.contains("Deep nesting") {
254 recommendations.push(
255 "Consider flattening the relationship structure or using separate queries"
256 .to_string(),
257 );
258 } else if bottleneck.contains("Sequential bottleneck") {
259 recommendations.push(
260 "Consider optimizing sequential queries for parallel execution".to_string(),
261 );
262 }
263 }
264
265 recommendations.push("Ensure proper indexes exist on foreign key columns".to_string());
267 recommendations
268 .push("Consider using connection pooling for better resource utilization".to_string());
269
270 recommendations.sort();
272 recommendations.dedup();
273
274 recommendations
275 }
276
277 fn assess_risk_level(
279 &self,
280 complexity_score: f64,
281 estimated_time: u64,
282 bottlenecks: &[String],
283 ) -> RiskLevel {
284 let bottleneck_count = bottlenecks.len();
285
286 if complexity_score > self.max_complexity * 2.0
287 || estimated_time > self.target_execution_time * 3
288 || bottleneck_count > 5
289 {
290 RiskLevel::Critical
291 } else if complexity_score > self.max_complexity
292 || estimated_time > self.target_execution_time
293 || bottleneck_count > 2
294 {
295 RiskLevel::High
296 } else if complexity_score > self.max_complexity * 0.7
297 || estimated_time > (self.target_execution_time as f64 * 0.7) as u64
298 || bottleneck_count > 0
299 {
300 RiskLevel::Medium
301 } else {
302 RiskLevel::Low
303 }
304 }
305
306 fn can_increase_parallelism(&self, plan: &QueryPlan) -> bool {
308 plan.nodes.values().any(|node| !node.parallel_safe)
309 }
310
311 fn increase_parallelism(&self, plan: &mut QueryPlan) -> OrmResult<()> {
313 for node in plan.nodes.values_mut() {
314 if !node.parallel_safe && node.constraints.is_empty() {
316 node.set_parallel_safe(true);
317 }
318 }
319 Ok(())
320 }
321
322 fn should_split_queries(&self, plan: &QueryPlan) -> bool {
324 plan.nodes.values().any(|node| node.estimated_rows > 50000)
325 }
326
327 fn can_reorder_phases(&self, plan: &QueryPlan) -> bool {
329 plan.execution_phases.len() > 1
330 }
331
332 fn reorder_phases(&self, plan: &mut QueryPlan) -> OrmResult<()> {
334 plan.execution_phases.sort_by(|a, b| {
336 let a_complexity: usize = a
337 .iter()
338 .filter_map(|id| plan.nodes.get(id))
339 .map(|node| node.estimated_rows)
340 .sum();
341 let b_complexity: usize = b
342 .iter()
343 .filter_map(|id| plan.nodes.get(id))
344 .map(|node| node.estimated_rows)
345 .sum();
346
347 a_complexity.cmp(&b_complexity)
348 });
349
350 Ok(())
351 }
352
353 fn suggest_indexes(&self, plan: &QueryPlan) -> Vec<String> {
355 let mut suggestions = Vec::new();
356 let mut suggested_tables = HashSet::new();
357
358 for node in plan.nodes.values() {
359 if !suggested_tables.contains(&node.table) {
360 suggestions.push(format!(
361 "CREATE INDEX idx_{}_id ON {} (id)",
362 node.table, node.table
363 ));
364
365 if let Some(fk) = &node.foreign_key {
366 suggestions.push(format!(
367 "CREATE INDEX idx_{}_{} ON {} ({})",
368 node.table, fk, node.table, fk
369 ));
370 }
371
372 suggested_tables.insert(node.table.clone());
373 }
374 }
375
376 suggestions
377 }
378}
379
380impl Default for QueryOptimizer {
381 fn default() -> Self {
382 Self::new()
383 }
384}