use super::plan::{PlanStatistics, QueryPlan};
use crate::error::OrmResult;
use std::collections::HashSet;
#[derive(Debug, Clone)]
pub struct PlanAnalysis {
pub complexity_score: f64,
pub estimated_execution_time: u64,
pub bottlenecks: Vec<String>,
pub recommendations: Vec<String>,
pub risk_level: RiskLevel,
pub statistics: PlanStatistics,
}
#[derive(Debug, Clone, PartialEq)]
pub enum RiskLevel {
Low, Medium, High, Critical, }
#[derive(Debug, Clone)]
pub enum OptimizationStrategy {
IncreaseParallelism,
ReduceBatchSize,
AddConstraints,
ReorderPhases,
SplitQueries,
SuggestIndexes(Vec<String>),
}
pub struct QueryOptimizer {
max_complexity: f64,
target_execution_time: u64,
}
impl QueryOptimizer {
pub fn new() -> Self {
Self {
max_complexity: 100.0,
target_execution_time: 5000, }
}
pub fn with_settings(max_complexity: f64, target_execution_time: u64) -> Self {
Self {
max_complexity,
target_execution_time,
}
}
pub fn analyze_plan(&self, plan: &QueryPlan) -> OrmResult<PlanAnalysis> {
let statistics = plan.statistics();
let complexity_score = plan.complexity_score();
let estimated_execution_time = self.estimate_execution_time(plan);
let bottlenecks = self.identify_bottlenecks(plan);
let recommendations = self.generate_recommendations(plan, &bottlenecks);
let risk_level =
self.assess_risk_level(complexity_score, estimated_execution_time, &bottlenecks);
Ok(PlanAnalysis {
complexity_score,
estimated_execution_time,
bottlenecks,
recommendations,
risk_level,
statistics,
})
}
pub fn optimize_plan(&self, plan: &mut QueryPlan) -> OrmResult<Vec<OptimizationStrategy>> {
let mut applied_strategies = Vec::new();
let analysis = self.analyze_plan(plan)?;
if analysis.complexity_score > self.max_complexity {
if self.can_increase_parallelism(plan) {
self.increase_parallelism(plan)?;
applied_strategies.push(OptimizationStrategy::IncreaseParallelism);
}
if self.should_split_queries(plan) {
applied_strategies.push(OptimizationStrategy::SplitQueries);
}
}
if analysis.estimated_execution_time > self.target_execution_time {
if self.can_reorder_phases(plan) {
self.reorder_phases(plan)?;
applied_strategies.push(OptimizationStrategy::ReorderPhases);
}
let index_suggestions = self.suggest_indexes(plan);
if !index_suggestions.is_empty() {
applied_strategies.push(OptimizationStrategy::SuggestIndexes(index_suggestions));
}
}
plan.build_execution_phases()?;
Ok(applied_strategies)
}
fn estimate_execution_time(&self, plan: &QueryPlan) -> u64 {
let base_time_per_node = 10;
let row_processing_time = plan.total_estimated_rows as u64 / 1000;
let depth_penalty = (plan.max_depth as u64) * 50;
let phase_time: u64 = plan
.execution_phases
.iter()
.map(|phase| {
if phase.len() == 1 {
base_time_per_node * 2 } else {
base_time_per_node }
})
.sum();
base_time_per_node * plan.nodes.len() as u64
+ row_processing_time
+ depth_penalty
+ phase_time
}
fn identify_bottlenecks(&self, plan: &QueryPlan) -> Vec<String> {
let mut bottlenecks = Vec::new();
for (id, node) in &plan.nodes {
if node.estimated_rows > 10000 {
bottlenecks.push(format!(
"High row count in node '{}': {} rows",
id, node.estimated_rows
));
}
}
if plan.max_depth > 5 {
bottlenecks.push(format!("Deep nesting detected: {} levels", plan.max_depth));
}
for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
if phase.len() == 1 {
let node_id = &phase[0];
if let Some(node) = plan.nodes.get(node_id) {
if !node.parallel_safe {
bottlenecks.push(format!(
"Sequential bottleneck in phase {}: node '{}'",
phase_idx, node_id
));
}
}
}
}
let avg_phase_size: f64 = plan.execution_phases.iter().map(|p| p.len()).sum::<usize>()
as f64
/ plan.execution_phases.len() as f64;
for (phase_idx, phase) in plan.execution_phases.iter().enumerate() {
if phase.len() as f64 > avg_phase_size * 3.0 {
bottlenecks.push(format!(
"Unbalanced phase {}: {} nodes (avg: {:.1})",
phase_idx,
phase.len(),
avg_phase_size
));
}
}
bottlenecks
}
fn generate_recommendations(&self, plan: &QueryPlan, bottlenecks: &[String]) -> Vec<String> {
let mut recommendations = Vec::new();
if plan.max_depth > 3 {
recommendations
.push("Consider limiting relationship depth to improve performance".to_string());
}
if plan.total_estimated_rows > 50000 {
recommendations
.push("Consider adding query constraints to reduce data volume".to_string());
}
let parallel_nodes = plan.nodes.values().filter(|n| n.parallel_safe).count();
let total_nodes = plan.nodes.len();
if parallel_nodes < total_nodes / 2 {
recommendations.push(
"Consider making more queries parallel-safe to improve throughput".to_string(),
);
}
for bottleneck in bottlenecks {
if bottleneck.contains("High row count") {
recommendations.push(
"Consider adding pagination or filtering to reduce row counts".to_string(),
);
} else if bottleneck.contains("Deep nesting") {
recommendations.push(
"Consider flattening the relationship structure or using separate queries"
.to_string(),
);
} else if bottleneck.contains("Sequential bottleneck") {
recommendations.push(
"Consider optimizing sequential queries for parallel execution".to_string(),
);
}
}
recommendations.push("Ensure proper indexes exist on foreign key columns".to_string());
recommendations
.push("Consider using connection pooling for better resource utilization".to_string());
recommendations.sort();
recommendations.dedup();
recommendations
}
fn assess_risk_level(
&self,
complexity_score: f64,
estimated_time: u64,
bottlenecks: &[String],
) -> RiskLevel {
let bottleneck_count = bottlenecks.len();
if complexity_score > self.max_complexity * 2.0
|| estimated_time > self.target_execution_time * 3
|| bottleneck_count > 5
{
RiskLevel::Critical
} else if complexity_score > self.max_complexity
|| estimated_time > self.target_execution_time
|| bottleneck_count > 2
{
RiskLevel::High
} else if complexity_score > self.max_complexity * 0.7
|| estimated_time > (self.target_execution_time as f64 * 0.7) as u64
|| bottleneck_count > 0
{
RiskLevel::Medium
} else {
RiskLevel::Low
}
}
fn can_increase_parallelism(&self, plan: &QueryPlan) -> bool {
plan.nodes.values().any(|node| !node.parallel_safe)
}
fn increase_parallelism(&self, plan: &mut QueryPlan) -> OrmResult<()> {
for node in plan.nodes.values_mut() {
if !node.parallel_safe && node.constraints.is_empty() {
node.set_parallel_safe(true);
}
}
Ok(())
}
fn should_split_queries(&self, plan: &QueryPlan) -> bool {
plan.nodes.values().any(|node| node.estimated_rows > 50000)
}
fn can_reorder_phases(&self, plan: &QueryPlan) -> bool {
plan.execution_phases.len() > 1
}
fn reorder_phases(&self, plan: &mut QueryPlan) -> OrmResult<()> {
plan.execution_phases.sort_by(|a, b| {
let a_complexity: usize = a
.iter()
.filter_map(|id| plan.nodes.get(id))
.map(|node| node.estimated_rows)
.sum();
let b_complexity: usize = b
.iter()
.filter_map(|id| plan.nodes.get(id))
.map(|node| node.estimated_rows)
.sum();
a_complexity.cmp(&b_complexity)
});
Ok(())
}
fn suggest_indexes(&self, plan: &QueryPlan) -> Vec<String> {
let mut suggestions = Vec::new();
let mut suggested_tables = HashSet::new();
for node in plan.nodes.values() {
if !suggested_tables.contains(&node.table) {
suggestions.push(format!(
"CREATE INDEX idx_{}_id ON {} (id)",
node.table, node.table
));
if let Some(fk) = &node.foreign_key {
suggestions.push(format!(
"CREATE INDEX idx_{}_{} ON {} ({})",
node.table, fk, node.table, fk
));
}
suggested_tables.insert(node.table.clone());
}
}
suggestions
}
}
impl Default for QueryOptimizer {
fn default() -> Self {
Self::new()
}
}