rust_rabbit/
health.rs

1use crate::{
2    config::HealthCheckConfig,
3    connection::{ConnectionManager, ConnectionStats},
4    error::{RabbitError, Result},
5    metrics::RustRabbitMetrics,
6};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11use tokio::time::{interval, sleep};
12use tracing::{debug, error, info, warn};
13
14/// Connection status enumeration
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ConnectionStatus {
17    /// Connection is healthy and operational
18    Healthy,
19    /// Connection is degraded but still functional
20    Degraded,
21    /// Connection is unhealthy and may not work properly
22    Unhealthy,
23    /// Connection is completely down
24    Down,
25}
26
27impl ConnectionStatus {
28    /// Check if the status indicates a healthy connection
29    pub fn is_healthy(&self) -> bool {
30        matches!(self, ConnectionStatus::Healthy)
31    }
32
33    /// Check if the status indicates an operational connection
34    pub fn is_operational(&self) -> bool {
35        matches!(self, ConnectionStatus::Healthy | ConnectionStatus::Degraded)
36    }
37}
38
39/// Health check result
40#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct HealthCheckResult {
42    /// Overall connection status
43    pub status: ConnectionStatus,
44    /// Timestamp of the health check
45    pub timestamp: chrono::DateTime<chrono::Utc>,
46    /// Connection statistics
47    pub connection_stats: ConnectionStats,
48    /// Response time for the health check
49    pub response_time: Duration,
50    /// Details about the health check
51    pub details: String,
52    /// Any errors encountered during the health check
53    pub errors: Vec<String>,
54}
55
56/// Health checker for monitoring RabbitMQ connection status
57#[derive(Debug, Clone)]
58pub struct HealthChecker {
59    connection_manager: ConnectionManager,
60    config: HealthCheckConfig,
61    last_result: Arc<RwLock<Option<HealthCheckResult>>>,
62    monitoring_started: Arc<RwLock<bool>>,
63    metrics: Option<RustRabbitMetrics>,
64}
65
66impl HealthChecker {
67    /// Create a new health checker
68    pub fn new(connection_manager: ConnectionManager) -> Self {
69        let config = connection_manager.config.health_check.clone();
70
71        Self {
72            connection_manager,
73            config,
74            last_result: Arc::new(RwLock::new(None)),
75            monitoring_started: Arc::new(RwLock::new(false)),
76            metrics: None,
77        }
78    }
79
80    /// Set metrics for this health checker
81    pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
82        self.metrics = Some(metrics);
83    }
84
85    /// Start continuous health monitoring in the background
86    pub async fn start_monitoring(&self) -> Result<()> {
87        let mut started = self.monitoring_started.write().await;
88        if *started {
89            warn!("Health monitoring is already started");
90            return Ok(());
91        }
92        *started = true;
93        drop(started);
94
95        if !self.config.enabled {
96            info!("Health monitoring is disabled in configuration");
97            return Ok(());
98        }
99
100        let checker = self.clone();
101        tokio::spawn(async move {
102            checker.monitoring_loop().await;
103        });
104
105        info!(
106            "Health monitoring started with interval: {:?}",
107            self.config.check_interval
108        );
109        Ok(())
110    }
111
112    /// Stop health monitoring
113    pub async fn stop_monitoring(&self) {
114        let mut started = self.monitoring_started.write().await;
115        *started = false;
116        info!("Health monitoring stopped");
117    }
118
119    /// Perform a single health check
120    pub async fn check_health(&self) -> Result<HealthCheckResult> {
121        let start_time = Instant::now();
122        let mut errors = Vec::new();
123        let mut status = ConnectionStatus::Healthy;
124        let mut details = String::new();
125
126        // Get connection statistics
127        let connection_stats = self.connection_manager.get_stats().await;
128
129        // Check if we have any healthy connections
130        if connection_stats.healthy_connections == 0 {
131            status = ConnectionStatus::Down;
132            errors.push("No healthy connections available".to_string());
133            details.push_str("All connections are down. ");
134        } else if connection_stats.unhealthy_connections > 0 {
135            // We have some healthy connections but also some unhealthy ones
136            let unhealthy_ratio = connection_stats.unhealthy_connections as f64
137                / connection_stats.total_connections as f64;
138
139            if unhealthy_ratio > 0.5 {
140                status = ConnectionStatus::Degraded;
141                details.push_str(&format!(
142                    "More than 50% of connections are unhealthy ({}/{}). ",
143                    connection_stats.unhealthy_connections, connection_stats.total_connections
144                ));
145            } else {
146                status = ConnectionStatus::Healthy;
147                details.push_str(&format!(
148                    "Some connections are unhealthy ({}/{}). ",
149                    connection_stats.unhealthy_connections, connection_stats.total_connections
150                ));
151            }
152        }
153
154        // Try to get a connection and perform a basic operation
155        match tokio::time::timeout(self.config.check_timeout, self.test_connection_operation())
156            .await
157        {
158            Ok(Ok(_)) => {
159                if status == ConnectionStatus::Healthy {
160                    details.push_str("Connection test successful. ");
161                }
162            }
163            Ok(Err(e)) => {
164                status = ConnectionStatus::Unhealthy;
165                errors.push(format!("Connection test failed: {}", e));
166                details.push_str("Failed to perform connection test. ");
167            }
168            Err(_) => {
169                status = ConnectionStatus::Unhealthy;
170                errors.push("Connection test timed out".to_string());
171                details.push_str("Connection test timed out. ");
172            }
173        }
174
175        let response_time = start_time.elapsed();
176
177        // Additional checks based on response time
178        if response_time > Duration::from_secs(5) {
179            if status == ConnectionStatus::Healthy {
180                status = ConnectionStatus::Degraded;
181            }
182            details.push_str("Slow response time detected. ");
183        }
184
185        let result = HealthCheckResult {
186            status,
187            timestamp: chrono::Utc::now(),
188            connection_stats,
189            response_time,
190            details: details.trim().to_string(),
191            errors,
192        };
193
194        // Store the result
195        let mut last_result = self.last_result.write().await;
196        *last_result = Some(result.clone());
197
198        debug!(
199            "Health check completed: {:?} in {:?}",
200            result.status, result.response_time
201        );
202        Ok(result)
203    }
204
205    /// Get the last health check result
206    pub async fn get_last_result(&self) -> Option<HealthCheckResult> {
207        self.last_result.read().await.clone()
208    }
209
210    /// Check if the connection is currently healthy
211    pub async fn is_healthy(&self) -> bool {
212        match self.get_last_result().await {
213            Some(result) => result.status.is_healthy(),
214            None => {
215                // No previous health check, perform one now
216                match self.check_health().await {
217                    Ok(result) => result.status.is_healthy(),
218                    Err(_) => false,
219                }
220            }
221        }
222    }
223
224    /// Check if the connection is operational (healthy or degraded)
225    pub async fn is_operational(&self) -> bool {
226        match self.get_last_result().await {
227            Some(result) => result.status.is_operational(),
228            None => {
229                // No previous health check, perform one now
230                match self.check_health().await {
231                    Ok(result) => result.status.is_operational(),
232                    Err(_) => false,
233                }
234            }
235        }
236    }
237
238    /// Wait for the connection to become healthy
239    pub async fn wait_for_healthy(&self, timeout: Option<Duration>) -> Result<()> {
240        let start = Instant::now();
241        let timeout_duration = timeout.unwrap_or(Duration::from_secs(60));
242
243        loop {
244            if self.is_healthy().await {
245                return Ok(());
246            }
247
248            if start.elapsed() > timeout_duration {
249                return Err(RabbitError::HealthCheck(
250                    "Timeout waiting for healthy connection".to_string(),
251                ));
252            }
253
254            sleep(Duration::from_millis(500)).await;
255        }
256    }
257
258    /// Get health status summary
259    pub async fn get_health_summary(&self) -> HealthSummary {
260        let last_result = self.get_last_result().await;
261        let connection_stats = self.connection_manager.get_stats().await;
262
263        HealthSummary {
264            status: last_result
265                .as_ref()
266                .map(|r| r.status)
267                .unwrap_or(ConnectionStatus::Down),
268            last_check: last_result.as_ref().map(|r| r.timestamp),
269            total_connections: connection_stats.total_connections,
270            healthy_connections: connection_stats.healthy_connections,
271            unhealthy_connections: connection_stats.unhealthy_connections,
272            monitoring_enabled: self.config.enabled,
273            check_interval: self.config.check_interval,
274        }
275    }
276
277    /// Internal monitoring loop
278    async fn monitoring_loop(&self) {
279        let mut interval = interval(self.config.check_interval);
280
281        loop {
282            // Check if monitoring should continue
283            {
284                let started = self.monitoring_started.read().await;
285                if !*started {
286                    break;
287                }
288            }
289
290            interval.tick().await;
291
292            if let Err(e) = self.check_health().await {
293                error!("Health check failed: {}", e);
294            }
295        }
296
297        info!("Health monitoring loop ended");
298    }
299
300    /// Test basic connection operation
301    async fn test_connection_operation(&self) -> Result<()> {
302        let connection = self.connection_manager.get_connection().await?;
303        let channel = connection.create_channel().await?;
304
305        // Perform a simple operation to test the connection
306        // We'll declare a temporary queue and then delete it
307        let test_queue_name = format!("health-check-{}", uuid::Uuid::new_v4());
308
309        channel
310            .queue_declare(
311                &test_queue_name,
312                lapin::options::QueueDeclareOptions {
313                    passive: false,
314                    durable: false,
315                    exclusive: true,
316                    auto_delete: true,
317                    nowait: false,
318                },
319                lapin::types::FieldTable::default(),
320            )
321            .await?;
322
323        // Delete the test queue
324        channel
325            .queue_delete(
326                &test_queue_name,
327                lapin::options::QueueDeleteOptions {
328                    if_unused: false,
329                    if_empty: false,
330                    nowait: false,
331                },
332            )
333            .await?;
334
335        Ok(())
336    }
337}
338
339/// Health summary information
340#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct HealthSummary {
342    /// Current connection status
343    pub status: ConnectionStatus,
344    /// Timestamp of last health check
345    pub last_check: Option<chrono::DateTime<chrono::Utc>>,
346    /// Total number of connections
347    pub total_connections: usize,
348    /// Number of healthy connections
349    pub healthy_connections: usize,
350    /// Number of unhealthy connections
351    pub unhealthy_connections: usize,
352    /// Whether monitoring is enabled
353    pub monitoring_enabled: bool,
354    /// Health check interval
355    pub check_interval: Duration,
356}
357
358/// Health metrics for monitoring systems
359#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct HealthMetrics {
361    /// Connection uptime percentage
362    pub uptime_percentage: f64,
363    /// Average response time
364    pub average_response_time: Duration,
365    /// Number of failed health checks in the last hour
366    pub failed_checks_last_hour: u32,
367    /// Number of successful health checks in the last hour
368    pub successful_checks_last_hour: u32,
369    /// Last error message
370    pub last_error: Option<String>,
371}
372
373// Extension trait for adding health check configuration
374pub trait HealthCheckConfigExt {
375    /// Create a conservative health check configuration
376    fn conservative() -> HealthCheckConfig;
377
378    /// Create an aggressive health check configuration
379    fn aggressive() -> HealthCheckConfig;
380
381    /// Create a minimal health check configuration
382    fn minimal() -> HealthCheckConfig;
383}
384
385impl HealthCheckConfigExt for HealthCheckConfig {
386    fn conservative() -> HealthCheckConfig {
387        HealthCheckConfig {
388            check_interval: Duration::from_secs(60),
389            check_timeout: Duration::from_secs(10),
390            enabled: true,
391        }
392    }
393
394    fn aggressive() -> HealthCheckConfig {
395        HealthCheckConfig {
396            check_interval: Duration::from_secs(10),
397            check_timeout: Duration::from_secs(3),
398            enabled: true,
399        }
400    }
401
402    fn minimal() -> HealthCheckConfig {
403        HealthCheckConfig {
404            check_interval: Duration::from_secs(300), // 5 minutes
405            check_timeout: Duration::from_secs(15),
406            enabled: true,
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use super::*;
414
415    #[test]
416    fn test_connection_status() {
417        assert!(ConnectionStatus::Healthy.is_healthy());
418        assert!(ConnectionStatus::Healthy.is_operational());
419
420        assert!(!ConnectionStatus::Degraded.is_healthy());
421        assert!(ConnectionStatus::Degraded.is_operational());
422
423        assert!(!ConnectionStatus::Unhealthy.is_healthy());
424        assert!(!ConnectionStatus::Unhealthy.is_operational());
425
426        assert!(!ConnectionStatus::Down.is_healthy());
427        assert!(!ConnectionStatus::Down.is_operational());
428    }
429
430    #[test]
431    fn test_health_check_config_presets() {
432        let conservative = HealthCheckConfig::conservative();
433        assert_eq!(conservative.check_interval, Duration::from_secs(60));
434        assert_eq!(conservative.check_timeout, Duration::from_secs(10));
435        assert!(conservative.enabled);
436
437        let aggressive = HealthCheckConfig::aggressive();
438        assert_eq!(aggressive.check_interval, Duration::from_secs(10));
439        assert_eq!(aggressive.check_timeout, Duration::from_secs(3));
440        assert!(aggressive.enabled);
441
442        let minimal = HealthCheckConfig::minimal();
443        assert_eq!(minimal.check_interval, Duration::from_secs(300));
444        assert_eq!(minimal.check_timeout, Duration::from_secs(15));
445        assert!(minimal.enabled);
446    }
447}