ruvector_dag/mincut/
bottleneck.rs1use crate::dag::{OperatorType, QueryDag};
4use std::collections::HashMap;
5
6#[derive(Debug, Clone)]
8pub struct Bottleneck {
9 pub node_id: usize,
10 pub score: f64,
11 pub impact_estimate: f64,
12 pub suggested_action: String,
13}
14
15#[derive(Debug)]
17pub struct BottleneckAnalysis {
18 pub bottlenecks: Vec<Bottleneck>,
19 pub total_cost: f64,
20 pub critical_path_cost: f64,
21 pub parallelization_potential: f64,
22}
23
24impl BottleneckAnalysis {
25 pub fn analyze(dag: &QueryDag, criticality: &HashMap<usize, f64>) -> Self {
26 let mut bottlenecks = Vec::new();
27
28 for (&node_id, &score) in criticality {
29 if score > 0.5 {
30 let node = dag.get_node(node_id).unwrap();
32 let action = Self::suggest_action(&node.op_type);
33
34 bottlenecks.push(Bottleneck {
35 node_id,
36 score,
37 impact_estimate: node.estimated_cost * score,
38 suggested_action: action,
39 });
40 }
41 }
42
43 bottlenecks.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
45
46 let total_cost: f64 = (0..dag.node_count())
48 .filter_map(|id| dag.get_node(id))
49 .map(|n| n.estimated_cost)
50 .sum();
51
52 let critical_path_cost = Self::compute_critical_path_cost(dag);
53 let parallelization_potential = 1.0 - (critical_path_cost / total_cost.max(1.0));
54
55 Self {
56 bottlenecks,
57 total_cost,
58 critical_path_cost,
59 parallelization_potential,
60 }
61 }
62
63 fn suggest_action(op_type: &OperatorType) -> String {
64 match op_type {
65 OperatorType::SeqScan { table } => {
66 format!("Consider adding index on {}", table)
67 }
68 OperatorType::NestedLoopJoin => "Consider using hash join instead".to_string(),
69 OperatorType::Sort { .. } => "Consider adding sorted index".to_string(),
70 OperatorType::HnswScan { .. } => "Consider increasing ef_search parameter".to_string(),
71 _ => "Review operator parameters".to_string(),
72 }
73 }
74
75 fn compute_critical_path_cost(dag: &QueryDag) -> f64 {
76 let mut max_cost: HashMap<usize, f64> = HashMap::new();
78
79 let sorted = match dag.topological_sort() {
81 Ok(s) => s,
82 Err(_) => return 0.0,
83 };
84
85 for node_id in sorted {
86 let node = dag.get_node(node_id).unwrap();
87 let parent_max = dag
88 .parents(node_id)
89 .iter()
90 .filter_map(|&p| max_cost.get(&p))
91 .max_by(|a, b| a.partial_cmp(b).unwrap())
92 .copied()
93 .unwrap_or(0.0);
94
95 max_cost.insert(node_id, parent_max + node.estimated_cost);
96 }
97
98 max_cost
99 .values()
100 .max_by(|a, b| a.partial_cmp(b).unwrap())
101 .copied()
102 .unwrap_or(0.0)
103 }
104}