kaccy_db/
connection_diagnostics.rs

1//! Connection pool diagnostics and health monitoring utilities
2//!
3//! This module provides comprehensive diagnostics for database connection pools,
4//! helping identify connection issues, pool exhaustion, and performance problems.
5
6use crate::error::Result;
7use chrono::{DateTime, Utc};
8use serde::{Deserialize, Serialize};
9use sqlx::PgPool;
10
11/// Comprehensive connection pool diagnostics
12#[derive(Debug, Clone, Serialize, Deserialize)]
13pub struct PoolDiagnostics {
14    /// Current pool statistics
15    pub stats: PoolStats,
16    /// Connection health status
17    pub health: ConnectionHealth,
18    /// Recent connection issues
19    pub issues: Vec<ConnectionIssue>,
20    /// Recommendations for optimization
21    pub recommendations: Vec<String>,
22}
23
24/// Current pool statistics
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct PoolStats {
27    /// Number of connections currently in use
28    pub connections_active: u32,
29    /// Number of idle connections in the pool
30    pub connections_idle: u32,
31    /// Maximum number of connections allowed
32    pub connections_max: u32,
33    /// Current pool utilization percentage
34    pub utilization_percent: f64,
35    /// Average connection acquisition time (ms)
36    pub avg_acquisition_time_ms: Option<f64>,
37}
38
39/// Connection health status
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct ConnectionHealth {
42    /// Whether the pool is healthy
43    pub is_healthy: bool,
44    /// Health status message
45    pub status: String,
46    /// Last successful connection timestamp
47    pub last_successful_connection: Option<DateTime<Utc>>,
48    /// Number of failed connection attempts in the last minute
49    pub recent_failures: u32,
50}
51
52/// Connection issue record
53#[derive(Debug, Clone, Serialize, Deserialize)]
54pub struct ConnectionIssue {
55    /// When the issue occurred
56    pub timestamp: DateTime<Utc>,
57    /// Issue type
58    pub issue_type: IssueType,
59    /// Issue description
60    pub description: String,
61    /// Severity level
62    pub severity: Severity,
63}
64
65/// Type of connection issue
66#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
67pub enum IssueType {
68    /// Pool exhaustion (no available connections)
69    PoolExhausted,
70    /// Connection timeout
71    Timeout,
72    /// Connection failure
73    ConnectionFailed,
74    /// Slow query
75    SlowQuery,
76    /// High utilization
77    HighUtilization,
78}
79
80/// Issue severity
81#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, PartialOrd, Ord)]
82pub enum Severity {
83    /// Informational
84    Info,
85    /// Warning
86    Warning,
87    /// Critical
88    Critical,
89}
90
91/// Get comprehensive pool diagnostics
92pub async fn get_pool_diagnostics(pool: &PgPool) -> Result<PoolDiagnostics> {
93    let stats = get_pool_stats(pool);
94    let health = check_connection_health(pool).await?;
95    let issues = diagnose_issues(&stats, &health);
96    let recommendations = generate_recommendations(&stats, &health, &issues);
97
98    Ok(PoolDiagnostics {
99        stats,
100        health,
101        issues,
102        recommendations,
103    })
104}
105
106/// Get current pool statistics
107pub fn get_pool_stats(pool: &PgPool) -> PoolStats {
108    let size = pool.size();
109    let idle = pool.num_idle() as u32;
110    let max = pool.options().get_max_connections();
111
112    let active = size.saturating_sub(idle);
113    let utilization = if max > 0 {
114        (active as f64 / max as f64) * 100.0
115    } else {
116        0.0
117    };
118
119    PoolStats {
120        connections_active: active,
121        connections_idle: idle,
122        connections_max: max,
123        utilization_percent: utilization,
124        avg_acquisition_time_ms: None, // Would need tracking
125    }
126}
127
128/// Check connection health
129pub async fn check_connection_health(pool: &PgPool) -> Result<ConnectionHealth> {
130    // Try a simple query to check connectivity
131    let result = sqlx::query_scalar::<_, i32>("SELECT 1")
132        .fetch_one(pool)
133        .await;
134
135    let is_healthy = result.is_ok();
136    let status = if is_healthy {
137        "Healthy".to_string()
138    } else {
139        format!("Unhealthy: {}", result.unwrap_err())
140    };
141
142    Ok(ConnectionHealth {
143        is_healthy,
144        status,
145        last_successful_connection: if is_healthy { Some(Utc::now()) } else { None },
146        recent_failures: if is_healthy { 0 } else { 1 },
147    })
148}
149
150/// Diagnose issues based on stats and health
151fn diagnose_issues(stats: &PoolStats, health: &ConnectionHealth) -> Vec<ConnectionIssue> {
152    let mut issues = Vec::new();
153    let now = Utc::now();
154
155    // Check for high utilization
156    if stats.utilization_percent > 90.0 {
157        issues.push(ConnectionIssue {
158            timestamp: now,
159            issue_type: IssueType::HighUtilization,
160            description: format!(
161                "Pool utilization is {:.1}% - consider increasing max connections",
162                stats.utilization_percent
163            ),
164            severity: Severity::Warning,
165        });
166    }
167
168    // Check for pool exhaustion
169    if stats.connections_idle == 0 && stats.connections_active >= stats.connections_max {
170        issues.push(ConnectionIssue {
171            timestamp: now,
172            issue_type: IssueType::PoolExhausted,
173            description: "Connection pool is exhausted - all connections are in use".to_string(),
174            severity: Severity::Critical,
175        });
176    }
177
178    // Check for unhealthy state
179    if !health.is_healthy {
180        issues.push(ConnectionIssue {
181            timestamp: now,
182            issue_type: IssueType::ConnectionFailed,
183            description: health.status.clone(),
184            severity: Severity::Critical,
185        });
186    }
187
188    issues
189}
190
191/// Generate recommendations based on diagnostics
192fn generate_recommendations(
193    stats: &PoolStats,
194    health: &ConnectionHealth,
195    issues: &[ConnectionIssue],
196) -> Vec<String> {
197    let mut recommendations = Vec::new();
198
199    // Check if pool is too small
200    if stats.utilization_percent > 80.0 {
201        recommendations.push(format!(
202            "Consider increasing max_connections from {} to {} for better headroom",
203            stats.connections_max,
204            stats.connections_max * 2
205        ));
206    }
207
208    // Check if pool is too large
209    if stats.utilization_percent < 20.0 && stats.connections_max > 10 {
210        recommendations.push(format!(
211            "Pool utilization is low ({:.1}%) - consider reducing max_connections to save resources",
212            stats.utilization_percent
213        ));
214    }
215
216    // Check for critical issues
217    let has_critical = issues.iter().any(|i| i.severity == Severity::Critical);
218    if has_critical {
219        recommendations.push(
220            "Critical issues detected - investigate immediately to prevent service disruption"
221                .to_string(),
222        );
223    }
224
225    // Connection health recommendations
226    if !health.is_healthy {
227        recommendations.push("Database connectivity issues detected - check network, credentials, and database status".to_string());
228    }
229
230    // If no idle connections but utilization is high
231    if stats.connections_idle == 0 && stats.utilization_percent > 70.0 {
232        recommendations.push(
233            "No idle connections available - increase pool size or optimize query performance"
234                .to_string(),
235        );
236    }
237
238    recommendations
239}
240
241/// Get active connections from PostgreSQL
242pub async fn get_active_connections(pool: &PgPool) -> Result<Vec<ActiveConnection>> {
243    let connections =
244        sqlx::query_as::<_, (i32, String, Option<String>, Option<DateTime<Utc>>, String)>(
245            r#"
246        SELECT
247            pid,
248            usename,
249            application_name,
250            query_start,
251            state
252        FROM pg_stat_activity
253        WHERE datname = current_database()
254        AND pid != pg_backend_pid()
255        ORDER BY query_start DESC NULLS LAST
256        "#,
257        )
258        .fetch_all(pool)
259        .await?;
260
261    Ok(connections
262        .into_iter()
263        .map(|c| ActiveConnection {
264            pid: c.0,
265            username: c.1,
266            application_name: c.2,
267            query_start: c.3,
268            state: c.4,
269            duration: c.3.map(|start| {
270                let now = Utc::now();
271                now.signed_duration_since(start).num_milliseconds() as u64
272            }),
273        })
274        .collect())
275}
276
277/// Information about an active database connection
278#[derive(Debug, Clone, Serialize, Deserialize)]
279pub struct ActiveConnection {
280    /// Process ID
281    pub pid: i32,
282    /// Connected username
283    pub username: String,
284    /// Application name
285    pub application_name: Option<String>,
286    /// When the current query started
287    pub query_start: Option<DateTime<Utc>>,
288    /// Connection state
289    pub state: String,
290    /// Query duration in milliseconds
291    pub duration: Option<u64>,
292}
293
294/// Find long-running queries
295pub async fn find_long_running_queries(
296    pool: &PgPool,
297    threshold_ms: u64,
298) -> Result<Vec<LongRunningQuery>> {
299    let threshold_interval = format!("{} milliseconds", threshold_ms);
300
301    let queries = sqlx::query_as::<_, (i32, String, String, DateTime<Utc>, String)>(
302        r#"
303        SELECT
304            pid,
305            usename,
306            query,
307            query_start,
308            state
309        FROM pg_stat_activity
310        WHERE datname = current_database()
311        AND state = 'active'
312        AND query_start < NOW() - $1::interval
313        AND query NOT LIKE '%pg_stat_activity%'
314        ORDER BY query_start ASC
315        "#,
316    )
317    .bind(threshold_interval)
318    .fetch_all(pool)
319    .await?;
320
321    Ok(queries
322        .into_iter()
323        .map(|q| {
324            let duration = Utc::now().signed_duration_since(q.3).num_milliseconds() as u64;
325
326            LongRunningQuery {
327                pid: q.0,
328                username: q.1,
329                query: q.2,
330                started_at: q.3,
331                duration_ms: duration,
332                state: q.4,
333            }
334        })
335        .collect())
336}
337
338/// Information about a long-running query
339#[derive(Debug, Clone, Serialize, Deserialize)]
340pub struct LongRunningQuery {
341    /// Process ID
342    pub pid: i32,
343    /// Username
344    pub username: String,
345    /// Query text
346    pub query: String,
347    /// When the query started
348    pub started_at: DateTime<Utc>,
349    /// Query duration in milliseconds
350    pub duration_ms: u64,
351    /// Query state
352    pub state: String,
353}
354
355#[cfg(test)]
356mod tests {
357    use super::*;
358
359    #[test]
360    fn test_pool_stats_structure() {
361        let stats = PoolStats {
362            connections_active: 5,
363            connections_idle: 15,
364            connections_max: 20,
365            utilization_percent: 25.0,
366            avg_acquisition_time_ms: Some(1.5),
367        };
368
369        assert_eq!(stats.connections_active, 5);
370        assert_eq!(stats.utilization_percent, 25.0);
371    }
372
373    #[test]
374    fn test_connection_health_structure() {
375        let health = ConnectionHealth {
376            is_healthy: true,
377            status: "OK".to_string(),
378            last_successful_connection: Some(Utc::now()),
379            recent_failures: 0,
380        };
381
382        assert!(health.is_healthy);
383        assert_eq!(health.recent_failures, 0);
384    }
385
386    #[test]
387    fn test_connection_issue_structure() {
388        let issue = ConnectionIssue {
389            timestamp: Utc::now(),
390            issue_type: IssueType::PoolExhausted,
391            description: "Pool full".to_string(),
392            severity: Severity::Critical,
393        };
394
395        assert_eq!(issue.issue_type, IssueType::PoolExhausted);
396        assert_eq!(issue.severity, Severity::Critical);
397    }
398
399    #[test]
400    fn test_diagnose_high_utilization() {
401        let stats = PoolStats {
402            connections_active: 19,
403            connections_idle: 1,
404            connections_max: 20,
405            utilization_percent: 95.0,
406            avg_acquisition_time_ms: None,
407        };
408
409        let health = ConnectionHealth {
410            is_healthy: true,
411            status: "OK".to_string(),
412            last_successful_connection: Some(Utc::now()),
413            recent_failures: 0,
414        };
415
416        let issues = diagnose_issues(&stats, &health);
417        assert!(issues
418            .iter()
419            .any(|i| i.issue_type == IssueType::HighUtilization));
420    }
421
422    #[test]
423    fn test_diagnose_pool_exhaustion() {
424        let stats = PoolStats {
425            connections_active: 20,
426            connections_idle: 0,
427            connections_max: 20,
428            utilization_percent: 100.0,
429            avg_acquisition_time_ms: None,
430        };
431
432        let health = ConnectionHealth {
433            is_healthy: true,
434            status: "OK".to_string(),
435            last_successful_connection: Some(Utc::now()),
436            recent_failures: 0,
437        };
438
439        let issues = diagnose_issues(&stats, &health);
440        assert!(issues
441            .iter()
442            .any(|i| i.issue_type == IssueType::PoolExhausted));
443        assert!(issues.iter().any(|i| i.severity == Severity::Critical));
444    }
445
446    #[test]
447    fn test_generate_recommendations_high_utilization() {
448        let stats = PoolStats {
449            connections_active: 18,
450            connections_idle: 2,
451            connections_max: 20,
452            utilization_percent: 90.0,
453            avg_acquisition_time_ms: None,
454        };
455
456        let health = ConnectionHealth {
457            is_healthy: true,
458            status: "OK".to_string(),
459            last_successful_connection: Some(Utc::now()),
460            recent_failures: 0,
461        };
462
463        let issues = vec![];
464        let recommendations = generate_recommendations(&stats, &health, &issues);
465
466        assert!(!recommendations.is_empty());
467        assert!(recommendations
468            .iter()
469            .any(|r| r.contains("increasing max_connections")));
470    }
471
472    #[test]
473    fn test_generate_recommendations_low_utilization() {
474        let stats = PoolStats {
475            connections_active: 2,
476            connections_idle: 18,
477            connections_max: 20,
478            utilization_percent: 10.0,
479            avg_acquisition_time_ms: None,
480        };
481
482        let health = ConnectionHealth {
483            is_healthy: true,
484            status: "OK".to_string(),
485            last_successful_connection: Some(Utc::now()),
486            recent_failures: 0,
487        };
488
489        let issues = vec![];
490        let recommendations = generate_recommendations(&stats, &health, &issues);
491
492        assert!(recommendations
493            .iter()
494            .any(|r| r.contains("reducing max_connections")));
495    }
496
497    #[test]
498    fn test_severity_ordering() {
499        assert!(Severity::Info < Severity::Warning);
500        assert!(Severity::Warning < Severity::Critical);
501    }
502
503    #[test]
504    fn test_pool_diagnostics_serialization() {
505        let diagnostics = PoolDiagnostics {
506            stats: PoolStats {
507                connections_active: 5,
508                connections_idle: 5,
509                connections_max: 10,
510                utilization_percent: 50.0,
511                avg_acquisition_time_ms: None,
512            },
513            health: ConnectionHealth {
514                is_healthy: true,
515                status: "OK".to_string(),
516                last_successful_connection: None,
517                recent_failures: 0,
518            },
519            issues: vec![],
520            recommendations: vec![],
521        };
522
523        let json = serde_json::to_string(&diagnostics).unwrap();
524        let deserialized: PoolDiagnostics = serde_json::from_str(&json).unwrap();
525
526        assert_eq!(deserialized.stats.connections_active, 5);
527        assert!(deserialized.health.is_healthy);
528    }
529}