Skip to main content

ruvector_dag/mincut/
bottleneck.rs

1//! Bottleneck Detection
2
3use crate::dag::{OperatorType, QueryDag};
4use std::collections::HashMap;
5
6/// A detected bottleneck in the DAG
7#[derive(Debug, Clone)]
8pub struct Bottleneck {
9    pub node_id: usize,
10    pub score: f64,
11    pub impact_estimate: f64,
12    pub suggested_action: String,
13}
14
15/// Analysis of bottlenecks in a DAG
16#[derive(Debug)]
17pub struct BottleneckAnalysis {
18    pub bottlenecks: Vec<Bottleneck>,
19    pub total_cost: f64,
20    pub critical_path_cost: f64,
21    pub parallelization_potential: f64,
22}
23
24impl BottleneckAnalysis {
25    pub fn analyze(dag: &QueryDag, criticality: &HashMap<usize, f64>) -> Self {
26        let mut bottlenecks = Vec::new();
27
28        for (&node_id, &score) in criticality {
29            if score > 0.5 {
30                // Threshold for bottleneck
31                let node = dag.get_node(node_id).unwrap();
32                let action = Self::suggest_action(&node.op_type);
33
34                bottlenecks.push(Bottleneck {
35                    node_id,
36                    score,
37                    impact_estimate: node.estimated_cost * score,
38                    suggested_action: action,
39                });
40            }
41        }
42
43        // Sort by score descending
44        bottlenecks.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
45
46        // Calculate total cost by iterating over all node IDs
47        let total_cost: f64 = (0..dag.node_count())
48            .filter_map(|id| dag.get_node(id))
49            .map(|n| n.estimated_cost)
50            .sum();
51
52        let critical_path_cost = Self::compute_critical_path_cost(dag);
53        let parallelization_potential = 1.0 - (critical_path_cost / total_cost.max(1.0));
54
55        Self {
56            bottlenecks,
57            total_cost,
58            critical_path_cost,
59            parallelization_potential,
60        }
61    }
62
63    fn suggest_action(op_type: &OperatorType) -> String {
64        match op_type {
65            OperatorType::SeqScan { table } => {
66                format!("Consider adding index on {}", table)
67            }
68            OperatorType::NestedLoopJoin => "Consider using hash join instead".to_string(),
69            OperatorType::Sort { .. } => "Consider adding sorted index".to_string(),
70            OperatorType::HnswScan { .. } => "Consider increasing ef_search parameter".to_string(),
71            _ => "Review operator parameters".to_string(),
72        }
73    }
74
75    fn compute_critical_path_cost(dag: &QueryDag) -> f64 {
76        // Longest path by cost
77        let mut max_cost: HashMap<usize, f64> = HashMap::new();
78
79        // Get topological sort, return 0 if there's a cycle
80        let sorted = match dag.topological_sort() {
81            Ok(s) => s,
82            Err(_) => return 0.0,
83        };
84
85        for node_id in sorted {
86            let node = dag.get_node(node_id).unwrap();
87            let parent_max = dag
88                .parents(node_id)
89                .iter()
90                .filter_map(|&p| max_cost.get(&p))
91                .max_by(|a, b| a.partial_cmp(b).unwrap())
92                .copied()
93                .unwrap_or(0.0);
94
95            max_cost.insert(node_id, parent_max + node.estimated_cost);
96        }
97
98        max_cost
99            .values()
100            .max_by(|a, b| a.partial_cmp(b).unwrap())
101            .copied()
102            .unwrap_or(0.0)
103    }
104}