1use crate::error::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct PoolDiagnostics {
14 pub stats: PoolStats,
16 pub health: ConnectionHealth,
18 pub issues: Vec<ConnectionIssue>,
20 pub recommendations: Vec<String>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct PoolStats {
27 pub connections_active: u32,
29 pub connections_idle: u32,
31 pub connections_max: u32,
33 pub utilization_percent: f64,
35 pub avg_acquisition_time_ms: Option<f64>,
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ConnectionHealth {
42 pub is_healthy: bool,
44 pub status: String,
46 pub last_successful_connection: Option<DateTime<Utc>>,
48 pub recent_failures: u32,
50}
51
52#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ConnectionIssue {
55 pub timestamp: DateTime<Utc>,
57 pub issue_type: IssueType,
59 pub description: String,
61 pub severity: Severity,
63}
64
65#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
67pub enum IssueType {
68 PoolExhausted,
70 Timeout,
72 ConnectionFailed,
74 SlowQuery,
76 HighUtilization,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
82pub enum Severity {
83 Info,
85 Warning,
87 Critical,
89}
90
91pub async fn get_pool_diagnostics(pool: &PgPool) -> Result<PoolDiagnostics> {
93 let stats = get_pool_stats(pool);
94 let health = check_connection_health(pool).await?;
95 let issues = diagnose_issues(&stats, &health);
96 let recommendations = generate_recommendations(&stats, &health, &issues);
97
98 Ok(PoolDiagnostics {
99 stats,
100 health,
101 issues,
102 recommendations,
103 })
104}
105
106pub fn get_pool_stats(pool: &PgPool) -> PoolStats {
108 let size = pool.size();
109 let idle = pool.num_idle() as u32;
110 let max = pool.options().get_max_connections();
111
112 let active = size.saturating_sub(idle);
113 let utilization = if max > 0 {
114 (active as f64 / max as f64) * 100.0
115 } else {
116 0.0
117 };
118
119 PoolStats {
120 connections_active: active,
121 connections_idle: idle,
122 connections_max: max,
123 utilization_percent: utilization,
124 avg_acquisition_time_ms: None, }
126}
127
128pub async fn check_connection_health(pool: &PgPool) -> Result<ConnectionHealth> {
130 let result = sqlx::query_scalar::<_, i32>("SELECT 1")
132 .fetch_one(pool)
133 .await;
134
135 let is_healthy = result.is_ok();
136 let status = if is_healthy {
137 "Healthy".to_string()
138 } else {
139 format!("Unhealthy: {}", result.unwrap_err())
140 };
141
142 Ok(ConnectionHealth {
143 is_healthy,
144 status,
145 last_successful_connection: if is_healthy { Some(Utc::now()) } else { None },
146 recent_failures: if is_healthy { 0 } else { 1 },
147 })
148}
149
150fn diagnose_issues(stats: &PoolStats, health: &ConnectionHealth) -> Vec<ConnectionIssue> {
152 let mut issues = Vec::new();
153 let now = Utc::now();
154
155 if stats.utilization_percent > 90.0 {
157 issues.push(ConnectionIssue {
158 timestamp: now,
159 issue_type: IssueType::HighUtilization,
160 description: format!(
161 "Pool utilization is {:.1}% - consider increasing max connections",
162 stats.utilization_percent
163 ),
164 severity: Severity::Warning,
165 });
166 }
167
168 if stats.connections_idle == 0 && stats.connections_active >= stats.connections_max {
170 issues.push(ConnectionIssue {
171 timestamp: now,
172 issue_type: IssueType::PoolExhausted,
173 description: "Connection pool is exhausted - all connections are in use".to_string(),
174 severity: Severity::Critical,
175 });
176 }
177
178 if !health.is_healthy {
180 issues.push(ConnectionIssue {
181 timestamp: now,
182 issue_type: IssueType::ConnectionFailed,
183 description: health.status.clone(),
184 severity: Severity::Critical,
185 });
186 }
187
188 issues
189}
190
191fn generate_recommendations(
193 stats: &PoolStats,
194 health: &ConnectionHealth,
195 issues: &[ConnectionIssue],
196) -> Vec<String> {
197 let mut recommendations = Vec::new();
198
199 if stats.utilization_percent > 80.0 {
201 recommendations.push(format!(
202 "Consider increasing max_connections from {} to {} for better headroom",
203 stats.connections_max,
204 stats.connections_max * 2
205 ));
206 }
207
208 if stats.utilization_percent < 20.0 && stats.connections_max > 10 {
210 recommendations.push(format!(
211 "Pool utilization is low ({:.1}%) - consider reducing max_connections to save resources",
212 stats.utilization_percent
213 ));
214 }
215
216 let has_critical = issues.iter().any(|i| i.severity == Severity::Critical);
218 if has_critical {
219 recommendations.push(
220 "Critical issues detected - investigate immediately to prevent service disruption"
221 .to_string(),
222 );
223 }
224
225 if !health.is_healthy {
227 recommendations.push("Database connectivity issues detected - check network, credentials, and database status".to_string());
228 }
229
230 if stats.connections_idle == 0 && stats.utilization_percent > 70.0 {
232 recommendations.push(
233 "No idle connections available - increase pool size or optimize query performance"
234 .to_string(),
235 );
236 }
237
238 recommendations
239}
240
241pub async fn get_active_connections(pool: &PgPool) -> Result<Vec<ActiveConnection>> {
243 let connections =
244 sqlx::query_as::<_, (i32, String, Option<String>, Option<DateTime<Utc>>, String)>(
245 r#"
246 SELECT
247 pid,
248 usename,
249 application_name,
250 query_start,
251 state
252 FROM pg_stat_activity
253 WHERE datname = current_database()
254 AND pid != pg_backend_pid()
255 ORDER BY query_start DESC NULLS LAST
256 "#,
257 )
258 .fetch_all(pool)
259 .await?;
260
261 Ok(connections
262 .into_iter()
263 .map(|c| ActiveConnection {
264 pid: c.0,
265 username: c.1,
266 application_name: c.2,
267 query_start: c.3,
268 state: c.4,
269 duration: c.3.map(|start| {
270 let now = Utc::now();
271 now.signed_duration_since(start).num_milliseconds() as u64
272 }),
273 })
274 .collect())
275}
276
277#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct ActiveConnection {
280 pub pid: i32,
282 pub username: String,
284 pub application_name: Option<String>,
286 pub query_start: Option<DateTime<Utc>>,
288 pub state: String,
290 pub duration: Option<u64>,
292}
293
294pub async fn find_long_running_queries(
296 pool: &PgPool,
297 threshold_ms: u64,
298) -> Result<Vec<LongRunningQuery>> {
299 let threshold_interval = format!("{} milliseconds", threshold_ms);
300
301 let queries = sqlx::query_as::<_, (i32, String, String, DateTime<Utc>, String)>(
302 r#"
303 SELECT
304 pid,
305 usename,
306 query,
307 query_start,
308 state
309 FROM pg_stat_activity
310 WHERE datname = current_database()
311 AND state = 'active'
312 AND query_start < NOW() - $1::interval
313 AND query NOT LIKE '%pg_stat_activity%'
314 ORDER BY query_start ASC
315 "#,
316 )
317 .bind(threshold_interval)
318 .fetch_all(pool)
319 .await?;
320
321 Ok(queries
322 .into_iter()
323 .map(|q| {
324 let duration = Utc::now().signed_duration_since(q.3).num_milliseconds() as u64;
325
326 LongRunningQuery {
327 pid: q.0,
328 username: q.1,
329 query: q.2,
330 started_at: q.3,
331 duration_ms: duration,
332 state: q.4,
333 }
334 })
335 .collect())
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
340pub struct LongRunningQuery {
341 pub pid: i32,
343 pub username: String,
345 pub query: String,
347 pub started_at: DateTime<Utc>,
349 pub duration_ms: u64,
351 pub state: String,
353}
354
355#[cfg(test)]
356mod tests {
357 use super::*;
358
359 #[test]
360 fn test_pool_stats_structure() {
361 let stats = PoolStats {
362 connections_active: 5,
363 connections_idle: 15,
364 connections_max: 20,
365 utilization_percent: 25.0,
366 avg_acquisition_time_ms: Some(1.5),
367 };
368
369 assert_eq!(stats.connections_active, 5);
370 assert_eq!(stats.utilization_percent, 25.0);
371 }
372
373 #[test]
374 fn test_connection_health_structure() {
375 let health = ConnectionHealth {
376 is_healthy: true,
377 status: "OK".to_string(),
378 last_successful_connection: Some(Utc::now()),
379 recent_failures: 0,
380 };
381
382 assert!(health.is_healthy);
383 assert_eq!(health.recent_failures, 0);
384 }
385
386 #[test]
387 fn test_connection_issue_structure() {
388 let issue = ConnectionIssue {
389 timestamp: Utc::now(),
390 issue_type: IssueType::PoolExhausted,
391 description: "Pool full".to_string(),
392 severity: Severity::Critical,
393 };
394
395 assert_eq!(issue.issue_type, IssueType::PoolExhausted);
396 assert_eq!(issue.severity, Severity::Critical);
397 }
398
399 #[test]
400 fn test_diagnose_high_utilization() {
401 let stats = PoolStats {
402 connections_active: 19,
403 connections_idle: 1,
404 connections_max: 20,
405 utilization_percent: 95.0,
406 avg_acquisition_time_ms: None,
407 };
408
409 let health = ConnectionHealth {
410 is_healthy: true,
411 status: "OK".to_string(),
412 last_successful_connection: Some(Utc::now()),
413 recent_failures: 0,
414 };
415
416 let issues = diagnose_issues(&stats, &health);
417 assert!(issues
418 .iter()
419 .any(|i| i.issue_type == IssueType::HighUtilization));
420 }
421
422 #[test]
423 fn test_diagnose_pool_exhaustion() {
424 let stats = PoolStats {
425 connections_active: 20,
426 connections_idle: 0,
427 connections_max: 20,
428 utilization_percent: 100.0,
429 avg_acquisition_time_ms: None,
430 };
431
432 let health = ConnectionHealth {
433 is_healthy: true,
434 status: "OK".to_string(),
435 last_successful_connection: Some(Utc::now()),
436 recent_failures: 0,
437 };
438
439 let issues = diagnose_issues(&stats, &health);
440 assert!(issues
441 .iter()
442 .any(|i| i.issue_type == IssueType::PoolExhausted));
443 assert!(issues.iter().any(|i| i.severity == Severity::Critical));
444 }
445
446 #[test]
447 fn test_generate_recommendations_high_utilization() {
448 let stats = PoolStats {
449 connections_active: 18,
450 connections_idle: 2,
451 connections_max: 20,
452 utilization_percent: 90.0,
453 avg_acquisition_time_ms: None,
454 };
455
456 let health = ConnectionHealth {
457 is_healthy: true,
458 status: "OK".to_string(),
459 last_successful_connection: Some(Utc::now()),
460 recent_failures: 0,
461 };
462
463 let issues = vec![];
464 let recommendations = generate_recommendations(&stats, &health, &issues);
465
466 assert!(!recommendations.is_empty());
467 assert!(recommendations
468 .iter()
469 .any(|r| r.contains("increasing max_connections")));
470 }
471
472 #[test]
473 fn test_generate_recommendations_low_utilization() {
474 let stats = PoolStats {
475 connections_active: 2,
476 connections_idle: 18,
477 connections_max: 20,
478 utilization_percent: 10.0,
479 avg_acquisition_time_ms: None,
480 };
481
482 let health = ConnectionHealth {
483 is_healthy: true,
484 status: "OK".to_string(),
485 last_successful_connection: Some(Utc::now()),
486 recent_failures: 0,
487 };
488
489 let issues = vec![];
490 let recommendations = generate_recommendations(&stats, &health, &issues);
491
492 assert!(recommendations
493 .iter()
494 .any(|r| r.contains("reducing max_connections")));
495 }
496
497 #[test]
498 fn test_severity_ordering() {
499 assert!(Severity::Info < Severity::Warning);
500 assert!(Severity::Warning < Severity::Critical);
501 }
502
503 #[test]
504 fn test_pool_diagnostics_serialization() {
505 let diagnostics = PoolDiagnostics {
506 stats: PoolStats {
507 connections_active: 5,
508 connections_idle: 5,
509 connections_max: 10,
510 utilization_percent: 50.0,
511 avg_acquisition_time_ms: None,
512 },
513 health: ConnectionHealth {
514 is_healthy: true,
515 status: "OK".to_string(),
516 last_successful_connection: None,
517 recent_failures: 0,
518 },
519 issues: vec![],
520 recommendations: vec![],
521 };
522
523 let json = serde_json::to_string(&diagnostics).unwrap();
524 let deserialized: PoolDiagnostics = serde_json::from_str(&json).unwrap();
525
526 assert_eq!(deserialized.stats.connections_active, 5);
527 assert!(deserialized.health.is_healthy);
528 }
529}