kaccy_db/
query_performance_analyzer.rs

1//! Query Performance Analyzer
2//!
3//! Analyzes query performance and provides actionable optimization suggestions.
4//! Complements the query_plan module with automatic analysis and recommendations.
5//!
6//! # Features
7//!
8//! - Automatic slow query detection from pg_stat_statements
9//! - Query plan analysis with cost breakdown
10//! - Index usage recommendations
11//! - Query rewrite suggestions
12//! - Performance bottleneck identification
13//! - Historical performance tracking
14//! - Configurable analysis thresholds
15//!
16//! # Example
17//!
18//! ```rust
19//! use kaccy_db::query_performance_analyzer::{QueryPerformanceAnalyzer, AnalyzerConfig};
20//! use std::time::Duration;
21//!
22//! let config = AnalyzerConfig {
23//!     slow_query_threshold_ms: 1000, // 1 second
24//!     min_calls: 10,
25//!     analysis_limit: 50,
26//! };
27//!
28//! let analyzer = QueryPerformanceAnalyzer::new(config);
29//! ```
30
31use crate::error::Result;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use sqlx::PgPool;
35use tracing::{debug, info};
36
37/// Configuration for the query performance analyzer
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct AnalyzerConfig {
40    /// Queries taking longer than this (in ms) are considered slow
41    pub slow_query_threshold_ms: u64,
42
43    /// Minimum number of calls before analyzing
44    pub min_calls: i64,
45
46    /// Maximum number of queries to analyze
47    pub analysis_limit: i64,
48}
49
50impl Default for AnalyzerConfig {
51    fn default() -> Self {
52        Self {
53            slow_query_threshold_ms: 1000, // 1 second
54            min_calls: 10,
55            analysis_limit: 50,
56        }
57    }
58}
59
60/// Optimization suggestion type
61#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum SuggestionType {
63    /// Add a missing index
64    AddIndex,
65
66    /// Rewrite query for better performance
67    RewriteQuery,
68
69    /// Increase work_mem
70    IncreaseWorkMem,
71
72    /// Use materialized view
73    UseMaterializedView,
74
75    /// Partition table
76    PartitionTable,
77
78    /// Update statistics
79    UpdateStatistics,
80
81    /// Remove unused index
82    RemoveUnusedIndex,
83
84    /// Add covering index
85    AddCoveringIndex,
86}
87
88impl SuggestionType {
89    /// Get a human-readable description
90    pub fn description(&self) -> &'static str {
91        match self {
92            Self::AddIndex => "Add missing index",
93            Self::RewriteQuery => "Rewrite query",
94            Self::IncreaseWorkMem => "Increase work_mem",
95            Self::UseMaterializedView => "Use materialized view",
96            Self::PartitionTable => "Partition table",
97            Self::UpdateStatistics => "Update statistics",
98            Self::RemoveUnusedIndex => "Remove unused index",
99            Self::AddCoveringIndex => "Add covering index",
100        }
101    }
102}
103
104/// Optimization suggestion
105#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OptimizationSuggestion {
107    /// Type of suggestion
108    pub suggestion_type: SuggestionType,
109
110    /// Detailed description
111    pub description: String,
112
113    /// SQL to apply the suggestion (if applicable)
114    pub sql: Option<String>,
115
116    /// Expected performance improvement (0.0-1.0)
117    pub expected_improvement: f64,
118
119    /// Priority (1-10, higher is more important)
120    pub priority: u8,
121}
122
123/// Query performance issue
124#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PerformanceIssue {
126    /// Type of issue (e.g., "Sequential Scan", "High Cost", "Many Rows")
127    pub issue_type: String,
128
129    /// Description of the issue
130    pub description: String,
131
132    /// Severity (1-10, higher is more severe)
133    pub severity: u8,
134
135    /// Affected table or operation
136    pub affected_object: Option<String>,
137}
138
139/// Query performance analysis result
140#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct QueryAnalysis {
142    /// Query text (normalized)
143    pub query: String,
144
145    /// Average execution time in milliseconds
146    pub avg_time_ms: f64,
147
148    /// Total execution time in milliseconds
149    pub total_time_ms: f64,
150
151    /// Number of calls
152    pub calls: i64,
153
154    /// Average rows returned
155    pub avg_rows: f64,
156
157    /// Identified performance issues
158    pub issues: Vec<PerformanceIssue>,
159
160    /// Optimization suggestions
161    pub suggestions: Vec<OptimizationSuggestion>,
162
163    /// Overall performance score (0-100, higher is better)
164    pub performance_score: u8,
165
166    /// When this analysis was performed
167    pub analyzed_at: DateTime<Utc>,
168}
169
170impl QueryAnalysis {
171    /// Calculate performance score based on issues and timing
172    pub fn calculate_score(&mut self) {
173        let mut score = 100u8;
174
175        // Deduct points for execution time
176        if self.avg_time_ms > 5000.0 {
177            score = score.saturating_sub(30);
178        } else if self.avg_time_ms > 1000.0 {
179            score = score.saturating_sub(20);
180        } else if self.avg_time_ms > 500.0 {
181            score = score.saturating_sub(10);
182        }
183
184        // Deduct points for issues
185        for issue in &self.issues {
186            score = score.saturating_sub(issue.severity);
187        }
188
189        self.performance_score = score;
190    }
191}
192
193/// Performance analysis report
194#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct PerformanceReport {
196    /// When this report was generated
197    pub generated_at: DateTime<Utc>,
198
199    /// Total queries analyzed
200    pub total_queries_analyzed: usize,
201
202    /// Queries with performance issues
203    pub queries_with_issues: usize,
204
205    /// Individual query analyses
206    pub analyses: Vec<QueryAnalysis>,
207
208    /// Summary statistics
209    pub summary: ReportSummary,
210}
211
212/// Report summary statistics
213#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ReportSummary {
215    /// Total number of suggestions
216    pub total_suggestions: usize,
217
218    /// Average performance score
219    pub avg_performance_score: f64,
220
221    /// Total slow query time (ms)
222    pub total_slow_query_time_ms: f64,
223
224    /// Most common issue type
225    pub most_common_issue: Option<String>,
226}
227
228/// Query performance analyzer
229pub struct QueryPerformanceAnalyzer {
230    config: AnalyzerConfig,
231}
232
233impl QueryPerformanceAnalyzer {
234    /// Create a new query performance analyzer
235    pub fn new(config: AnalyzerConfig) -> Self {
236        Self { config }
237    }
238
239    /// Create an analyzer with default configuration
240    pub fn with_defaults() -> Self {
241        Self::new(AnalyzerConfig::default())
242    }
243
244    /// Analyze slow queries and generate report
245    pub async fn analyze_slow_queries(&self, pool: &PgPool) -> Result<PerformanceReport> {
246        info!("Starting query performance analysis");
247
248        let slow_queries = self.get_slow_queries(pool).await?;
249        let mut analyses = Vec::new();
250
251        for (query, avg_time_ms, total_time_ms, calls, avg_rows) in slow_queries {
252            let mut analysis = QueryAnalysis {
253                query: query.clone(),
254                avg_time_ms,
255                total_time_ms,
256                calls,
257                avg_rows,
258                issues: Vec::new(),
259                suggestions: Vec::new(),
260                performance_score: 100,
261                analyzed_at: Utc::now(),
262            };
263
264            // Analyze query plan
265            self.analyze_query_plan(pool, &query, &mut analysis).await?;
266
267            // Generate suggestions based on issues
268            self.generate_suggestions(&mut analysis);
269
270            // Calculate performance score
271            analysis.calculate_score();
272
273            analyses.push(analysis);
274        }
275
276        let queries_with_issues = analyses.iter().filter(|a| !a.issues.is_empty()).count();
277
278        let summary = self.calculate_summary(&analyses);
279
280        debug!(
281            total_analyzed = analyses.len(),
282            with_issues = queries_with_issues,
283            "Query analysis complete"
284        );
285
286        Ok(PerformanceReport {
287            generated_at: Utc::now(),
288            total_queries_analyzed: analyses.len(),
289            queries_with_issues,
290            analyses,
291            summary,
292        })
293    }
294
295    /// Get slow queries from pg_stat_statements
296    async fn get_slow_queries(&self, pool: &PgPool) -> Result<Vec<(String, f64, f64, i64, f64)>> {
297        let query = r#"
298            SELECT
299                query,
300                mean_exec_time,
301                total_exec_time,
302                calls,
303                COALESCE(rows::float / NULLIF(calls, 0), 0) as avg_rows
304            FROM pg_stat_statements
305            WHERE mean_exec_time > $1
306                AND calls >= $2
307                AND query NOT LIKE '%pg_stat_statements%'
308            ORDER BY mean_exec_time DESC
309            LIMIT $3
310        "#;
311
312        let rows = sqlx::query_as::<_, (String, f64, f64, i64, f64)>(query)
313            .bind(self.config.slow_query_threshold_ms as f64)
314            .bind(self.config.min_calls)
315            .bind(self.config.analysis_limit)
316            .fetch_all(pool)
317            .await?;
318
319        Ok(rows)
320    }
321
322    /// Analyze query execution plan
323    async fn analyze_query_plan(
324        &self,
325        pool: &PgPool,
326        query: &str,
327        analysis: &mut QueryAnalysis,
328    ) -> Result<()> {
329        // Try to get explain plan
330        let explain_query = format!("EXPLAIN (FORMAT JSON) {}", query);
331
332        match sqlx::query_scalar::<_, serde_json::Value>(&explain_query)
333            .fetch_one(pool)
334            .await
335        {
336            Ok(plan) => {
337                self.analyze_plan_json(&plan, analysis);
338            }
339            Err(_) => {
340                debug!("Could not analyze query plan for: {}", query);
341            }
342        }
343
344        Ok(())
345    }
346
347    /// Analyze JSON query plan
348    fn analyze_plan_json(&self, plan: &serde_json::Value, analysis: &mut QueryAnalysis) {
349        if let Some(plans) = plan.get(0).and_then(|p| p.get("Plan")) {
350            self.analyze_plan_node(plans, analysis);
351        }
352    }
353
354    /// Recursively analyze plan nodes
355    #[allow(clippy::only_used_in_recursion)]
356    fn analyze_plan_node(&self, node: &serde_json::Value, analysis: &mut QueryAnalysis) {
357        // Check for sequential scans
358        if let Some(node_type) = node.get("Node Type").and_then(|v| v.as_str()) {
359            if node_type == "Seq Scan" {
360                if let Some(relation) = node.get("Relation Name").and_then(|v| v.as_str()) {
361                    analysis.issues.push(PerformanceIssue {
362                        issue_type: "Sequential Scan".to_string(),
363                        description: format!("Sequential scan on table '{}'", relation),
364                        severity: 7,
365                        affected_object: Some(relation.to_string()),
366                    });
367                }
368            }
369
370            // Check for high cost operations
371            if let Some(total_cost) = node.get("Total Cost").and_then(|v| v.as_f64()) {
372                if total_cost > 10000.0 {
373                    analysis.issues.push(PerformanceIssue {
374                        issue_type: "High Cost".to_string(),
375                        description: format!("Operation has high cost: {:.2}", total_cost),
376                        severity: 8,
377                        affected_object: Some(node_type.to_string()),
378                    });
379                }
380            }
381
382            // Check for many rows
383            if let Some(plan_rows) = node.get("Plan Rows").and_then(|v| v.as_f64()) {
384                if plan_rows > 100000.0 {
385                    analysis.issues.push(PerformanceIssue {
386                        issue_type: "Many Rows".to_string(),
387                        description: format!("Processing many rows: {:.0}", plan_rows),
388                        severity: 6,
389                        affected_object: Some(node_type.to_string()),
390                    });
391                }
392            }
393        }
394
395        // Recursively check child plans
396        if let Some(plans) = node.get("Plans").and_then(|v| v.as_array()) {
397            for child_plan in plans {
398                self.analyze_plan_node(child_plan, analysis);
399            }
400        }
401    }
402
403    /// Generate optimization suggestions based on issues
404    fn generate_suggestions(&self, analysis: &mut QueryAnalysis) {
405        for issue in &analysis.issues {
406            match issue.issue_type.as_str() {
407                "Sequential Scan" => {
408                    if let Some(table) = &issue.affected_object {
409                        analysis.suggestions.push(OptimizationSuggestion {
410                            suggestion_type: SuggestionType::AddIndex,
411                            description: format!(
412                                "Consider adding an index on table '{}' for columns used in WHERE/JOIN clauses",
413                                table
414                            ),
415                            sql: None,
416                            expected_improvement: 0.7,
417                            priority: 8,
418                        });
419                    }
420                }
421                "High Cost" => {
422                    analysis.suggestions.push(OptimizationSuggestion {
423                        suggestion_type: SuggestionType::RewriteQuery,
424                        description: "Consider rewriting the query to reduce complexity"
425                            .to_string(),
426                        sql: None,
427                        expected_improvement: 0.5,
428                        priority: 7,
429                    });
430                }
431                "Many Rows" => {
432                    analysis.suggestions.push(OptimizationSuggestion {
433                        suggestion_type: SuggestionType::AddCoveringIndex,
434                        description:
435                            "Consider adding a covering index to avoid accessing the table"
436                                .to_string(),
437                        sql: None,
438                        expected_improvement: 0.6,
439                        priority: 6,
440                    });
441                }
442                _ => {}
443            }
444        }
445
446        // Add general suggestions for very slow queries
447        if analysis.avg_time_ms > 5000.0 {
448            analysis.suggestions.push(OptimizationSuggestion {
449                suggestion_type: SuggestionType::UpdateStatistics,
450                description: "Run ANALYZE to update table statistics".to_string(),
451                sql: None,
452                expected_improvement: 0.3,
453                priority: 5,
454            });
455        }
456    }
457
458    /// Calculate report summary
459    fn calculate_summary(&self, analyses: &[QueryAnalysis]) -> ReportSummary {
460        let total_suggestions: usize = analyses.iter().map(|a| a.suggestions.len()).sum();
461
462        let avg_performance_score = if !analyses.is_empty() {
463            analyses
464                .iter()
465                .map(|a| a.performance_score as f64)
466                .sum::<f64>()
467                / analyses.len() as f64
468        } else {
469            100.0
470        };
471
472        let total_slow_query_time_ms: f64 = analyses.iter().map(|a| a.total_time_ms).sum();
473
474        // Find most common issue type
475        let mut issue_counts = std::collections::HashMap::new();
476        for analysis in analyses {
477            for issue in &analysis.issues {
478                *issue_counts.entry(issue.issue_type.clone()).or_insert(0) += 1;
479            }
480        }
481
482        let most_common_issue = issue_counts
483            .into_iter()
484            .max_by_key(|(_, count)| *count)
485            .map(|(issue_type, _)| issue_type);
486
487        ReportSummary {
488            total_suggestions,
489            avg_performance_score,
490            total_slow_query_time_ms,
491            most_common_issue,
492        }
493    }
494}
495
496#[cfg(test)]
497mod tests {
498    use super::*;
499
500    #[test]
501    fn test_analyzer_config_default() {
502        let config = AnalyzerConfig::default();
503        assert_eq!(config.slow_query_threshold_ms, 1000);
504        assert_eq!(config.min_calls, 10);
505        assert_eq!(config.analysis_limit, 50);
506    }
507
508    #[test]
509    fn test_suggestion_type_description() {
510        assert_eq!(SuggestionType::AddIndex.description(), "Add missing index");
511        assert_eq!(SuggestionType::RewriteQuery.description(), "Rewrite query");
512    }
513
514    #[test]
515    fn test_query_analysis_score_calculation() {
516        let mut analysis = QueryAnalysis {
517            query: "SELECT * FROM users".to_string(),
518            avg_time_ms: 6000.0,
519            total_time_ms: 60000.0,
520            calls: 10,
521            avg_rows: 100.0,
522            issues: vec![PerformanceIssue {
523                issue_type: "Sequential Scan".to_string(),
524                description: "Seq scan on users".to_string(),
525                severity: 7,
526                affected_object: Some("users".to_string()),
527            }],
528            suggestions: Vec::new(),
529            performance_score: 100,
530            analyzed_at: Utc::now(),
531        };
532
533        analysis.calculate_score();
534
535        // Should be 100 - 30 (>5000ms) - 7 (severity) = 63
536        assert_eq!(analysis.performance_score, 63);
537    }
538
539    #[test]
540    fn test_performance_issue_serialization() {
541        let issue = PerformanceIssue {
542            issue_type: "Sequential Scan".to_string(),
543            description: "Test description".to_string(),
544            severity: 7,
545            affected_object: Some("users".to_string()),
546        };
547
548        let json = serde_json::to_string(&issue).unwrap();
549        assert!(json.contains("Sequential Scan"));
550        assert!(json.contains("users"));
551    }
552
553    #[test]
554    fn test_optimization_suggestion_serialization() {
555        let suggestion = OptimizationSuggestion {
556            suggestion_type: SuggestionType::AddIndex,
557            description: "Add index on email".to_string(),
558            sql: Some("CREATE INDEX idx_email ON users(email)".to_string()),
559            expected_improvement: 0.7,
560            priority: 8,
561        };
562
563        let json = serde_json::to_string(&suggestion).unwrap();
564        assert!(json.contains("AddIndex"));
565        assert!(json.contains("expected_improvement"));
566    }
567
568    #[test]
569    fn test_report_summary_calculation() {
570        let analyses = vec![
571            QueryAnalysis {
572                query: "SELECT 1".to_string(),
573                avg_time_ms: 100.0,
574                total_time_ms: 1000.0,
575                calls: 10,
576                avg_rows: 1.0,
577                issues: vec![],
578                suggestions: vec![],
579                performance_score: 90,
580                analyzed_at: Utc::now(),
581            },
582            QueryAnalysis {
583                query: "SELECT 2".to_string(),
584                avg_time_ms: 200.0,
585                total_time_ms: 2000.0,
586                calls: 10,
587                avg_rows: 1.0,
588                issues: vec![],
589                suggestions: vec![],
590                performance_score: 80,
591                analyzed_at: Utc::now(),
592            },
593        ];
594
595        let analyzer = QueryPerformanceAnalyzer::with_defaults();
596        let summary = analyzer.calculate_summary(&analyses);
597
598        assert_eq!(summary.avg_performance_score, 85.0);
599        assert_eq!(summary.total_slow_query_time_ms, 3000.0);
600    }
601
602    #[test]
603    fn test_analyzer_with_defaults() {
604        let analyzer = QueryPerformanceAnalyzer::with_defaults();
605        assert_eq!(analyzer.config.slow_query_threshold_ms, 1000);
606    }
607
608    #[test]
609    fn test_query_analysis_with_issues() {
610        let mut analysis = QueryAnalysis {
611            query: "SELECT * FROM users WHERE email = 'test@example.com'".to_string(),
612            avg_time_ms: 2500.0,
613            total_time_ms: 25000.0,
614            calls: 10,
615            avg_rows: 1.0,
616            issues: vec![PerformanceIssue {
617                issue_type: "Sequential Scan".to_string(),
618                description: "Sequential scan on users table".to_string(),
619                severity: 7,
620                affected_object: Some("users".to_string()),
621            }],
622            suggestions: Vec::new(),
623            performance_score: 100,
624            analyzed_at: Utc::now(),
625        };
626
627        let analyzer = QueryPerformanceAnalyzer::with_defaults();
628        analyzer.generate_suggestions(&mut analysis);
629
630        assert!(!analysis.suggestions.is_empty());
631        assert!(analysis
632            .suggestions
633            .iter()
634            .any(|s| s.suggestion_type == SuggestionType::AddIndex));
635    }
636
637    #[test]
638    fn test_performance_report_serialization() {
639        let report = PerformanceReport {
640            generated_at: Utc::now(),
641            total_queries_analyzed: 10,
642            queries_with_issues: 5,
643            analyses: vec![],
644            summary: ReportSummary {
645                total_suggestions: 15,
646                avg_performance_score: 75.0,
647                total_slow_query_time_ms: 50000.0,
648                most_common_issue: Some("Sequential Scan".to_string()),
649            },
650        };
651
652        let json = serde_json::to_string(&report).unwrap();
653        assert!(json.contains("total_queries_analyzed"));
654        assert!(json.contains("\"queries_with_issues\":5"));
655    }
656}