1use crate::{
2 config::HealthCheckConfig,
3 connection::{ConnectionManager, ConnectionStats},
4 error::{RabbitError, Result},
5 metrics::RustRabbitMetrics,
6};
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use std::time::{Duration, Instant};
10use tokio::sync::RwLock;
11use tokio::time::{interval, sleep};
12use tracing::{debug, error, info, warn};
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
16pub enum ConnectionStatus {
17 Healthy,
19 Degraded,
21 Unhealthy,
23 Down,
25}
26
27impl ConnectionStatus {
28 pub fn is_healthy(&self) -> bool {
30 matches!(self, ConnectionStatus::Healthy)
31 }
32
33 pub fn is_operational(&self) -> bool {
35 matches!(self, ConnectionStatus::Healthy | ConnectionStatus::Degraded)
36 }
37}
38
39#[derive(Debug, Clone, Serialize, Deserialize)]
41pub struct HealthCheckResult {
42 pub status: ConnectionStatus,
44 pub timestamp: chrono::DateTime<chrono::Utc>,
46 pub connection_stats: ConnectionStats,
48 pub response_time: Duration,
50 pub details: String,
52 pub errors: Vec<String>,
54}
55
56#[derive(Debug, Clone)]
58pub struct HealthChecker {
59 connection_manager: ConnectionManager,
60 config: HealthCheckConfig,
61 last_result: Arc<RwLock<Option<HealthCheckResult>>>,
62 monitoring_started: Arc<RwLock<bool>>,
63 metrics: Option<RustRabbitMetrics>,
64}
65
66impl HealthChecker {
67 pub fn new(connection_manager: ConnectionManager) -> Self {
69 let config = connection_manager.config.health_check.clone();
70
71 Self {
72 connection_manager,
73 config,
74 last_result: Arc::new(RwLock::new(None)),
75 monitoring_started: Arc::new(RwLock::new(false)),
76 metrics: None,
77 }
78 }
79
80 pub fn set_metrics(&mut self, metrics: RustRabbitMetrics) {
82 self.metrics = Some(metrics);
83 }
84
85 pub async fn start_monitoring(&self) -> Result<()> {
87 let mut started = self.monitoring_started.write().await;
88 if *started {
89 warn!("Health monitoring is already started");
90 return Ok(());
91 }
92 *started = true;
93 drop(started);
94
95 if !self.config.enabled {
96 info!("Health monitoring is disabled in configuration");
97 return Ok(());
98 }
99
100 let checker = self.clone();
101 tokio::spawn(async move {
102 checker.monitoring_loop().await;
103 });
104
105 info!(
106 "Health monitoring started with interval: {:?}",
107 self.config.check_interval
108 );
109 Ok(())
110 }
111
112 pub async fn stop_monitoring(&self) {
114 let mut started = self.monitoring_started.write().await;
115 *started = false;
116 info!("Health monitoring stopped");
117 }
118
119 pub async fn check_health(&self) -> Result<HealthCheckResult> {
121 let start_time = Instant::now();
122 let mut errors = Vec::new();
123 let mut status = ConnectionStatus::Healthy;
124 let mut details = String::new();
125
126 let connection_stats = self.connection_manager.get_stats().await;
128
129 if connection_stats.healthy_connections == 0 {
131 status = ConnectionStatus::Down;
132 errors.push("No healthy connections available".to_string());
133 details.push_str("All connections are down. ");
134 } else if connection_stats.unhealthy_connections > 0 {
135 let unhealthy_ratio = connection_stats.unhealthy_connections as f64
137 / connection_stats.total_connections as f64;
138
139 if unhealthy_ratio > 0.5 {
140 status = ConnectionStatus::Degraded;
141 details.push_str(&format!(
142 "More than 50% of connections are unhealthy ({}/{}). ",
143 connection_stats.unhealthy_connections, connection_stats.total_connections
144 ));
145 } else {
146 status = ConnectionStatus::Healthy;
147 details.push_str(&format!(
148 "Some connections are unhealthy ({}/{}). ",
149 connection_stats.unhealthy_connections, connection_stats.total_connections
150 ));
151 }
152 }
153
154 match tokio::time::timeout(self.config.check_timeout, self.test_connection_operation())
156 .await
157 {
158 Ok(Ok(_)) => {
159 if status == ConnectionStatus::Healthy {
160 details.push_str("Connection test successful. ");
161 }
162 }
163 Ok(Err(e)) => {
164 status = ConnectionStatus::Unhealthy;
165 errors.push(format!("Connection test failed: {}", e));
166 details.push_str("Failed to perform connection test. ");
167 }
168 Err(_) => {
169 status = ConnectionStatus::Unhealthy;
170 errors.push("Connection test timed out".to_string());
171 details.push_str("Connection test timed out. ");
172 }
173 }
174
175 let response_time = start_time.elapsed();
176
177 if response_time > Duration::from_secs(5) {
179 if status == ConnectionStatus::Healthy {
180 status = ConnectionStatus::Degraded;
181 }
182 details.push_str("Slow response time detected. ");
183 }
184
185 let result = HealthCheckResult {
186 status,
187 timestamp: chrono::Utc::now(),
188 connection_stats,
189 response_time,
190 details: details.trim().to_string(),
191 errors,
192 };
193
194 let mut last_result = self.last_result.write().await;
196 *last_result = Some(result.clone());
197
198 debug!(
199 "Health check completed: {:?} in {:?}",
200 result.status, result.response_time
201 );
202 Ok(result)
203 }
204
205 pub async fn get_last_result(&self) -> Option<HealthCheckResult> {
207 self.last_result.read().await.clone()
208 }
209
210 pub async fn is_healthy(&self) -> bool {
212 match self.get_last_result().await {
213 Some(result) => result.status.is_healthy(),
214 None => {
215 match self.check_health().await {
217 Ok(result) => result.status.is_healthy(),
218 Err(_) => false,
219 }
220 }
221 }
222 }
223
224 pub async fn is_operational(&self) -> bool {
226 match self.get_last_result().await {
227 Some(result) => result.status.is_operational(),
228 None => {
229 match self.check_health().await {
231 Ok(result) => result.status.is_operational(),
232 Err(_) => false,
233 }
234 }
235 }
236 }
237
238 pub async fn wait_for_healthy(&self, timeout: Option<Duration>) -> Result<()> {
240 let start = Instant::now();
241 let timeout_duration = timeout.unwrap_or(Duration::from_secs(60));
242
243 loop {
244 if self.is_healthy().await {
245 return Ok(());
246 }
247
248 if start.elapsed() > timeout_duration {
249 return Err(RabbitError::HealthCheck(
250 "Timeout waiting for healthy connection".to_string(),
251 ));
252 }
253
254 sleep(Duration::from_millis(500)).await;
255 }
256 }
257
258 pub async fn get_health_summary(&self) -> HealthSummary {
260 let last_result = self.get_last_result().await;
261 let connection_stats = self.connection_manager.get_stats().await;
262
263 HealthSummary {
264 status: last_result
265 .as_ref()
266 .map(|r| r.status)
267 .unwrap_or(ConnectionStatus::Down),
268 last_check: last_result.as_ref().map(|r| r.timestamp),
269 total_connections: connection_stats.total_connections,
270 healthy_connections: connection_stats.healthy_connections,
271 unhealthy_connections: connection_stats.unhealthy_connections,
272 monitoring_enabled: self.config.enabled,
273 check_interval: self.config.check_interval,
274 }
275 }
276
277 async fn monitoring_loop(&self) {
279 let mut interval = interval(self.config.check_interval);
280
281 loop {
282 {
284 let started = self.monitoring_started.read().await;
285 if !*started {
286 break;
287 }
288 }
289
290 interval.tick().await;
291
292 if let Err(e) = self.check_health().await {
293 error!("Health check failed: {}", e);
294 }
295 }
296
297 info!("Health monitoring loop ended");
298 }
299
300 async fn test_connection_operation(&self) -> Result<()> {
302 let connection = self.connection_manager.get_connection().await?;
303 let channel = connection.create_channel().await?;
304
305 let test_queue_name = format!("health-check-{}", uuid::Uuid::new_v4());
308
309 channel
310 .queue_declare(
311 &test_queue_name,
312 lapin::options::QueueDeclareOptions {
313 passive: false,
314 durable: false,
315 exclusive: true,
316 auto_delete: true,
317 nowait: false,
318 },
319 lapin::types::FieldTable::default(),
320 )
321 .await?;
322
323 channel
325 .queue_delete(
326 &test_queue_name,
327 lapin::options::QueueDeleteOptions {
328 if_unused: false,
329 if_empty: false,
330 nowait: false,
331 },
332 )
333 .await?;
334
335 Ok(())
336 }
337}
338
339#[derive(Debug, Clone, Serialize, Deserialize)]
341pub struct HealthSummary {
342 pub status: ConnectionStatus,
344 pub last_check: Option<chrono::DateTime<chrono::Utc>>,
346 pub total_connections: usize,
348 pub healthy_connections: usize,
350 pub unhealthy_connections: usize,
352 pub monitoring_enabled: bool,
354 pub check_interval: Duration,
356}
357
358#[derive(Debug, Clone, Serialize, Deserialize)]
360pub struct HealthMetrics {
361 pub uptime_percentage: f64,
363 pub average_response_time: Duration,
365 pub failed_checks_last_hour: u32,
367 pub successful_checks_last_hour: u32,
369 pub last_error: Option<String>,
371}
372
373pub trait HealthCheckConfigExt {
375 fn conservative() -> HealthCheckConfig;
377
378 fn aggressive() -> HealthCheckConfig;
380
381 fn minimal() -> HealthCheckConfig;
383}
384
385impl HealthCheckConfigExt for HealthCheckConfig {
386 fn conservative() -> HealthCheckConfig {
387 HealthCheckConfig {
388 check_interval: Duration::from_secs(60),
389 check_timeout: Duration::from_secs(10),
390 enabled: true,
391 }
392 }
393
394 fn aggressive() -> HealthCheckConfig {
395 HealthCheckConfig {
396 check_interval: Duration::from_secs(10),
397 check_timeout: Duration::from_secs(3),
398 enabled: true,
399 }
400 }
401
402 fn minimal() -> HealthCheckConfig {
403 HealthCheckConfig {
404 check_interval: Duration::from_secs(300), check_timeout: Duration::from_secs(15),
406 enabled: true,
407 }
408 }
409}
410
411#[cfg(test)]
412mod tests {
413 use super::*;
414
415 #[test]
416 fn test_connection_status() {
417 assert!(ConnectionStatus::Healthy.is_healthy());
418 assert!(ConnectionStatus::Healthy.is_operational());
419
420 assert!(!ConnectionStatus::Degraded.is_healthy());
421 assert!(ConnectionStatus::Degraded.is_operational());
422
423 assert!(!ConnectionStatus::Unhealthy.is_healthy());
424 assert!(!ConnectionStatus::Unhealthy.is_operational());
425
426 assert!(!ConnectionStatus::Down.is_healthy());
427 assert!(!ConnectionStatus::Down.is_operational());
428 }
429
430 #[test]
431 fn test_health_check_config_presets() {
432 let conservative = HealthCheckConfig::conservative();
433 assert_eq!(conservative.check_interval, Duration::from_secs(60));
434 assert_eq!(conservative.check_timeout, Duration::from_secs(10));
435 assert!(conservative.enabled);
436
437 let aggressive = HealthCheckConfig::aggressive();
438 assert_eq!(aggressive.check_interval, Duration::from_secs(10));
439 assert_eq!(aggressive.check_timeout, Duration::from_secs(3));
440 assert!(aggressive.enabled);
441
442 let minimal = HealthCheckConfig::minimal();
443 assert_eq!(minimal.check_interval, Duration::from_secs(300));
444 assert_eq!(minimal.check_timeout, Duration::from_secs(15));
445 assert!(minimal.enabled);
446 }
447}