1use crate::error::Result;
32use chrono::{DateTime, Utc};
33use serde::{Deserialize, Serialize};
34use sqlx::PgPool;
35use tracing::{debug, info};
36
37#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct AnalyzerConfig {
40 pub slow_query_threshold_ms: u64,
42
43 pub min_calls: i64,
45
46 pub analysis_limit: i64,
48}
49
50impl Default for AnalyzerConfig {
51 fn default() -> Self {
52 Self {
53 slow_query_threshold_ms: 1000, min_calls: 10,
55 analysis_limit: 50,
56 }
57 }
58}
59
60#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
62pub enum SuggestionType {
63 AddIndex,
65
66 RewriteQuery,
68
69 IncreaseWorkMem,
71
72 UseMaterializedView,
74
75 PartitionTable,
77
78 UpdateStatistics,
80
81 RemoveUnusedIndex,
83
84 AddCoveringIndex,
86}
87
88impl SuggestionType {
89 pub fn description(&self) -> &'static str {
91 match self {
92 Self::AddIndex => "Add missing index",
93 Self::RewriteQuery => "Rewrite query",
94 Self::IncreaseWorkMem => "Increase work_mem",
95 Self::UseMaterializedView => "Use materialized view",
96 Self::PartitionTable => "Partition table",
97 Self::UpdateStatistics => "Update statistics",
98 Self::RemoveUnusedIndex => "Remove unused index",
99 Self::AddCoveringIndex => "Add covering index",
100 }
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
106pub struct OptimizationSuggestion {
107 pub suggestion_type: SuggestionType,
109
110 pub description: String,
112
113 pub sql: Option<String>,
115
116 pub expected_improvement: f64,
118
119 pub priority: u8,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct PerformanceIssue {
126 pub issue_type: String,
128
129 pub description: String,
131
132 pub severity: u8,
134
135 pub affected_object: Option<String>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct QueryAnalysis {
142 pub query: String,
144
145 pub avg_time_ms: f64,
147
148 pub total_time_ms: f64,
150
151 pub calls: i64,
153
154 pub avg_rows: f64,
156
157 pub issues: Vec<PerformanceIssue>,
159
160 pub suggestions: Vec<OptimizationSuggestion>,
162
163 pub performance_score: u8,
165
166 pub analyzed_at: DateTime<Utc>,
168}
169
170impl QueryAnalysis {
171 pub fn calculate_score(&mut self) {
173 let mut score = 100u8;
174
175 if self.avg_time_ms > 5000.0 {
177 score = score.saturating_sub(30);
178 } else if self.avg_time_ms > 1000.0 {
179 score = score.saturating_sub(20);
180 } else if self.avg_time_ms > 500.0 {
181 score = score.saturating_sub(10);
182 }
183
184 for issue in &self.issues {
186 score = score.saturating_sub(issue.severity);
187 }
188
189 self.performance_score = score;
190 }
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct PerformanceReport {
196 pub generated_at: DateTime<Utc>,
198
199 pub total_queries_analyzed: usize,
201
202 pub queries_with_issues: usize,
204
205 pub analyses: Vec<QueryAnalysis>,
207
208 pub summary: ReportSummary,
210}
211
212#[derive(Debug, Clone, Serialize, Deserialize)]
214pub struct ReportSummary {
215 pub total_suggestions: usize,
217
218 pub avg_performance_score: f64,
220
221 pub total_slow_query_time_ms: f64,
223
224 pub most_common_issue: Option<String>,
226}
227
228pub struct QueryPerformanceAnalyzer {
230 config: AnalyzerConfig,
231}
232
233impl QueryPerformanceAnalyzer {
234 pub fn new(config: AnalyzerConfig) -> Self {
236 Self { config }
237 }
238
239 pub fn with_defaults() -> Self {
241 Self::new(AnalyzerConfig::default())
242 }
243
244 pub async fn analyze_slow_queries(&self, pool: &PgPool) -> Result<PerformanceReport> {
246 info!("Starting query performance analysis");
247
248 let slow_queries = self.get_slow_queries(pool).await?;
249 let mut analyses = Vec::new();
250
251 for (query, avg_time_ms, total_time_ms, calls, avg_rows) in slow_queries {
252 let mut analysis = QueryAnalysis {
253 query: query.clone(),
254 avg_time_ms,
255 total_time_ms,
256 calls,
257 avg_rows,
258 issues: Vec::new(),
259 suggestions: Vec::new(),
260 performance_score: 100,
261 analyzed_at: Utc::now(),
262 };
263
264 self.analyze_query_plan(pool, &query, &mut analysis).await?;
266
267 self.generate_suggestions(&mut analysis);
269
270 analysis.calculate_score();
272
273 analyses.push(analysis);
274 }
275
276 let queries_with_issues = analyses.iter().filter(|a| !a.issues.is_empty()).count();
277
278 let summary = self.calculate_summary(&analyses);
279
280 debug!(
281 total_analyzed = analyses.len(),
282 with_issues = queries_with_issues,
283 "Query analysis complete"
284 );
285
286 Ok(PerformanceReport {
287 generated_at: Utc::now(),
288 total_queries_analyzed: analyses.len(),
289 queries_with_issues,
290 analyses,
291 summary,
292 })
293 }
294
295 async fn get_slow_queries(&self, pool: &PgPool) -> Result<Vec<(String, f64, f64, i64, f64)>> {
297 let query = r#"
298 SELECT
299 query,
300 mean_exec_time,
301 total_exec_time,
302 calls,
303 COALESCE(rows::float / NULLIF(calls, 0), 0) as avg_rows
304 FROM pg_stat_statements
305 WHERE mean_exec_time > $1
306 AND calls >= $2
307 AND query NOT LIKE '%pg_stat_statements%'
308 ORDER BY mean_exec_time DESC
309 LIMIT $3
310 "#;
311
312 let rows = sqlx::query_as::<_, (String, f64, f64, i64, f64)>(query)
313 .bind(self.config.slow_query_threshold_ms as f64)
314 .bind(self.config.min_calls)
315 .bind(self.config.analysis_limit)
316 .fetch_all(pool)
317 .await?;
318
319 Ok(rows)
320 }
321
322 async fn analyze_query_plan(
324 &self,
325 pool: &PgPool,
326 query: &str,
327 analysis: &mut QueryAnalysis,
328 ) -> Result<()> {
329 let explain_query = format!("EXPLAIN (FORMAT JSON) {}", query);
331
332 match sqlx::query_scalar::<_, serde_json::Value>(&explain_query)
333 .fetch_one(pool)
334 .await
335 {
336 Ok(plan) => {
337 self.analyze_plan_json(&plan, analysis);
338 }
339 Err(_) => {
340 debug!("Could not analyze query plan for: {}", query);
341 }
342 }
343
344 Ok(())
345 }
346
347 fn analyze_plan_json(&self, plan: &serde_json::Value, analysis: &mut QueryAnalysis) {
349 if let Some(plans) = plan.get(0).and_then(|p| p.get("Plan")) {
350 self.analyze_plan_node(plans, analysis);
351 }
352 }
353
354 #[allow(clippy::only_used_in_recursion)]
356 fn analyze_plan_node(&self, node: &serde_json::Value, analysis: &mut QueryAnalysis) {
357 if let Some(node_type) = node.get("Node Type").and_then(|v| v.as_str()) {
359 if node_type == "Seq Scan" {
360 if let Some(relation) = node.get("Relation Name").and_then(|v| v.as_str()) {
361 analysis.issues.push(PerformanceIssue {
362 issue_type: "Sequential Scan".to_string(),
363 description: format!("Sequential scan on table '{}'", relation),
364 severity: 7,
365 affected_object: Some(relation.to_string()),
366 });
367 }
368 }
369
370 if let Some(total_cost) = node.get("Total Cost").and_then(|v| v.as_f64()) {
372 if total_cost > 10000.0 {
373 analysis.issues.push(PerformanceIssue {
374 issue_type: "High Cost".to_string(),
375 description: format!("Operation has high cost: {:.2}", total_cost),
376 severity: 8,
377 affected_object: Some(node_type.to_string()),
378 });
379 }
380 }
381
382 if let Some(plan_rows) = node.get("Plan Rows").and_then(|v| v.as_f64()) {
384 if plan_rows > 100000.0 {
385 analysis.issues.push(PerformanceIssue {
386 issue_type: "Many Rows".to_string(),
387 description: format!("Processing many rows: {:.0}", plan_rows),
388 severity: 6,
389 affected_object: Some(node_type.to_string()),
390 });
391 }
392 }
393 }
394
395 if let Some(plans) = node.get("Plans").and_then(|v| v.as_array()) {
397 for child_plan in plans {
398 self.analyze_plan_node(child_plan, analysis);
399 }
400 }
401 }
402
403 fn generate_suggestions(&self, analysis: &mut QueryAnalysis) {
405 for issue in &analysis.issues {
406 match issue.issue_type.as_str() {
407 "Sequential Scan" => {
408 if let Some(table) = &issue.affected_object {
409 analysis.suggestions.push(OptimizationSuggestion {
410 suggestion_type: SuggestionType::AddIndex,
411 description: format!(
412 "Consider adding an index on table '{}' for columns used in WHERE/JOIN clauses",
413 table
414 ),
415 sql: None,
416 expected_improvement: 0.7,
417 priority: 8,
418 });
419 }
420 }
421 "High Cost" => {
422 analysis.suggestions.push(OptimizationSuggestion {
423 suggestion_type: SuggestionType::RewriteQuery,
424 description: "Consider rewriting the query to reduce complexity"
425 .to_string(),
426 sql: None,
427 expected_improvement: 0.5,
428 priority: 7,
429 });
430 }
431 "Many Rows" => {
432 analysis.suggestions.push(OptimizationSuggestion {
433 suggestion_type: SuggestionType::AddCoveringIndex,
434 description:
435 "Consider adding a covering index to avoid accessing the table"
436 .to_string(),
437 sql: None,
438 expected_improvement: 0.6,
439 priority: 6,
440 });
441 }
442 _ => {}
443 }
444 }
445
446 if analysis.avg_time_ms > 5000.0 {
448 analysis.suggestions.push(OptimizationSuggestion {
449 suggestion_type: SuggestionType::UpdateStatistics,
450 description: "Run ANALYZE to update table statistics".to_string(),
451 sql: None,
452 expected_improvement: 0.3,
453 priority: 5,
454 });
455 }
456 }
457
458 fn calculate_summary(&self, analyses: &[QueryAnalysis]) -> ReportSummary {
460 let total_suggestions: usize = analyses.iter().map(|a| a.suggestions.len()).sum();
461
462 let avg_performance_score = if !analyses.is_empty() {
463 analyses
464 .iter()
465 .map(|a| a.performance_score as f64)
466 .sum::<f64>()
467 / analyses.len() as f64
468 } else {
469 100.0
470 };
471
472 let total_slow_query_time_ms: f64 = analyses.iter().map(|a| a.total_time_ms).sum();
473
474 let mut issue_counts = std::collections::HashMap::new();
476 for analysis in analyses {
477 for issue in &analysis.issues {
478 *issue_counts.entry(issue.issue_type.clone()).or_insert(0) += 1;
479 }
480 }
481
482 let most_common_issue = issue_counts
483 .into_iter()
484 .max_by_key(|(_, count)| *count)
485 .map(|(issue_type, _)| issue_type);
486
487 ReportSummary {
488 total_suggestions,
489 avg_performance_score,
490 total_slow_query_time_ms,
491 most_common_issue,
492 }
493 }
494}
495
496#[cfg(test)]
497mod tests {
498 use super::*;
499
500 #[test]
501 fn test_analyzer_config_default() {
502 let config = AnalyzerConfig::default();
503 assert_eq!(config.slow_query_threshold_ms, 1000);
504 assert_eq!(config.min_calls, 10);
505 assert_eq!(config.analysis_limit, 50);
506 }
507
508 #[test]
509 fn test_suggestion_type_description() {
510 assert_eq!(SuggestionType::AddIndex.description(), "Add missing index");
511 assert_eq!(SuggestionType::RewriteQuery.description(), "Rewrite query");
512 }
513
514 #[test]
515 fn test_query_analysis_score_calculation() {
516 let mut analysis = QueryAnalysis {
517 query: "SELECT * FROM users".to_string(),
518 avg_time_ms: 6000.0,
519 total_time_ms: 60000.0,
520 calls: 10,
521 avg_rows: 100.0,
522 issues: vec![PerformanceIssue {
523 issue_type: "Sequential Scan".to_string(),
524 description: "Seq scan on users".to_string(),
525 severity: 7,
526 affected_object: Some("users".to_string()),
527 }],
528 suggestions: Vec::new(),
529 performance_score: 100,
530 analyzed_at: Utc::now(),
531 };
532
533 analysis.calculate_score();
534
535 assert_eq!(analysis.performance_score, 63);
537 }
538
539 #[test]
540 fn test_performance_issue_serialization() {
541 let issue = PerformanceIssue {
542 issue_type: "Sequential Scan".to_string(),
543 description: "Test description".to_string(),
544 severity: 7,
545 affected_object: Some("users".to_string()),
546 };
547
548 let json = serde_json::to_string(&issue).unwrap();
549 assert!(json.contains("Sequential Scan"));
550 assert!(json.contains("users"));
551 }
552
553 #[test]
554 fn test_optimization_suggestion_serialization() {
555 let suggestion = OptimizationSuggestion {
556 suggestion_type: SuggestionType::AddIndex,
557 description: "Add index on email".to_string(),
558 sql: Some("CREATE INDEX idx_email ON users(email)".to_string()),
559 expected_improvement: 0.7,
560 priority: 8,
561 };
562
563 let json = serde_json::to_string(&suggestion).unwrap();
564 assert!(json.contains("AddIndex"));
565 assert!(json.contains("expected_improvement"));
566 }
567
568 #[test]
569 fn test_report_summary_calculation() {
570 let analyses = vec![
571 QueryAnalysis {
572 query: "SELECT 1".to_string(),
573 avg_time_ms: 100.0,
574 total_time_ms: 1000.0,
575 calls: 10,
576 avg_rows: 1.0,
577 issues: vec![],
578 suggestions: vec![],
579 performance_score: 90,
580 analyzed_at: Utc::now(),
581 },
582 QueryAnalysis {
583 query: "SELECT 2".to_string(),
584 avg_time_ms: 200.0,
585 total_time_ms: 2000.0,
586 calls: 10,
587 avg_rows: 1.0,
588 issues: vec![],
589 suggestions: vec![],
590 performance_score: 80,
591 analyzed_at: Utc::now(),
592 },
593 ];
594
595 let analyzer = QueryPerformanceAnalyzer::with_defaults();
596 let summary = analyzer.calculate_summary(&analyses);
597
598 assert_eq!(summary.avg_performance_score, 85.0);
599 assert_eq!(summary.total_slow_query_time_ms, 3000.0);
600 }
601
602 #[test]
603 fn test_analyzer_with_defaults() {
604 let analyzer = QueryPerformanceAnalyzer::with_defaults();
605 assert_eq!(analyzer.config.slow_query_threshold_ms, 1000);
606 }
607
608 #[test]
609 fn test_query_analysis_with_issues() {
610 let mut analysis = QueryAnalysis {
611 query: "SELECT * FROM users WHERE email = 'test@example.com'".to_string(),
612 avg_time_ms: 2500.0,
613 total_time_ms: 25000.0,
614 calls: 10,
615 avg_rows: 1.0,
616 issues: vec![PerformanceIssue {
617 issue_type: "Sequential Scan".to_string(),
618 description: "Sequential scan on users table".to_string(),
619 severity: 7,
620 affected_object: Some("users".to_string()),
621 }],
622 suggestions: Vec::new(),
623 performance_score: 100,
624 analyzed_at: Utc::now(),
625 };
626
627 let analyzer = QueryPerformanceAnalyzer::with_defaults();
628 analyzer.generate_suggestions(&mut analysis);
629
630 assert!(!analysis.suggestions.is_empty());
631 assert!(analysis
632 .suggestions
633 .iter()
634 .any(|s| s.suggestion_type == SuggestionType::AddIndex));
635 }
636
637 #[test]
638 fn test_performance_report_serialization() {
639 let report = PerformanceReport {
640 generated_at: Utc::now(),
641 total_queries_analyzed: 10,
642 queries_with_issues: 5,
643 analyses: vec![],
644 summary: ReportSummary {
645 total_suggestions: 15,
646 avg_performance_score: 75.0,
647 total_slow_query_time_ms: 50000.0,
648 most_common_issue: Some("Sequential Scan".to_string()),
649 },
650 };
651
652 let json = serde_json::to_string(&report).unwrap();
653 assert!(json.contains("total_queries_analyzed"));
654 assert!(json.contains("\"queries_with_issues\":5"));
655 }
656}