1use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11use crate::config::HealthCheckConfig;
12use crate::message::InboundMessage;
13use crate::server::AppState;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum HealthState {
18 Healthy,
20 Degraded,
22 Down,
24 Recovering,
26}
27
28impl std::fmt::Display for HealthState {
29 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30 match self {
31 HealthState::Healthy => write!(f, "healthy"),
32 HealthState::Degraded => write!(f, "degraded"),
33 HealthState::Down => write!(f, "down"),
34 HealthState::Recovering => write!(f, "recovering"),
35 }
36 }
37}
38
39pub struct HealthMonitor {
41 pub state: RwLock<HealthState>,
43 pub failure_count: RwLock<u32>,
45 pub last_healthy: RwLock<Option<Instant>>,
47 pub buffer: RwLock<VecDeque<InboundMessage>>,
49 pub max_buffer_size: usize,
51}
52
53impl HealthMonitor {
54 pub fn new(max_buffer_size: usize) -> Self {
55 Self {
56 state: RwLock::new(HealthState::Healthy),
57 failure_count: RwLock::new(0),
58 last_healthy: RwLock::new(Some(Instant::now())),
59 buffer: RwLock::new(VecDeque::new()),
60 max_buffer_size,
61 }
62 }
63
64 pub async fn get_state(&self) -> HealthState {
66 *self.state.read().await
67 }
68
69 pub async fn record_success(&self) -> Option<HealthState> {
71 let old_state = *self.state.read().await;
72
73 *self.failure_count.write().await = 0;
74 *self.last_healthy.write().await = Some(Instant::now());
75
76 let new_state = match old_state {
77 HealthState::Down => HealthState::Recovering,
78 HealthState::Recovering => HealthState::Healthy,
79 _ => HealthState::Healthy,
80 };
81
82 if new_state != old_state {
83 *self.state.write().await = new_state;
84 tracing::info!(
85 old_state = %old_state,
86 new_state = %new_state,
87 "Health state changed"
88 );
89 Some(new_state)
90 } else {
91 None
92 }
93 }
94
95 pub async fn record_failure(&self, alert_threshold: u32) -> Option<HealthState> {
97 let old_state = *self.state.read().await;
98
99 let mut count = self.failure_count.write().await;
100 *count += 1;
101 let failure_count = *count;
102 drop(count);
103
104 let new_state = if failure_count >= alert_threshold {
105 HealthState::Down
106 } else {
107 HealthState::Degraded
108 };
109
110 if new_state != old_state {
111 *self.state.write().await = new_state;
112 tracing::warn!(
113 old_state = %old_state,
114 new_state = %new_state,
115 failure_count = failure_count,
116 "Health state changed"
117 );
118 Some(new_state)
119 } else {
120 None
121 }
122 }
123
124 pub async fn buffer_message(&self, message: InboundMessage) -> bool {
126 let mut buffer = self.buffer.write().await;
127
128 if buffer.len() >= self.max_buffer_size {
129 buffer.pop_front();
131 tracing::warn!("Message buffer full, dropping oldest message");
132 }
133
134 buffer.push_back(message);
135 tracing::debug!(buffer_size = buffer.len(), "Message buffered");
136 true
137 }
138
139 pub async fn drain_buffer(&self) -> Vec<InboundMessage> {
141 let mut buffer = self.buffer.write().await;
142 let messages: Vec<_> = buffer.drain(..).collect();
143 tracing::info!(count = messages.len(), "Draining buffered messages");
144 messages
145 }
146
147 pub async fn buffer_size(&self) -> usize {
149 self.buffer.read().await.len()
150 }
151
152 pub async fn last_healthy_ago(&self) -> Option<Duration> {
154 self.last_healthy.read().await.map(|t| t.elapsed())
155 }
156}
157
158pub async fn start_health_check(
160 state: Arc<AppState>,
161 config_name: String,
162 config: HealthCheckConfig,
163) {
164 let interval = Duration::from_secs(config.interval_seconds as u64);
165 let client = reqwest::Client::builder()
166 .timeout(Duration::from_secs(10))
167 .build()
168 .unwrap();
169
170 tracing::info!(
171 name = %config_name,
172 url = %config.url,
173 interval_secs = config.interval_seconds,
174 alert_after = config.alert_after_failures,
175 "Starting health check"
176 );
177
178 loop {
179 tokio::time::sleep(interval).await;
180
181 let result = client.get(&config.url).send().await;
182
183 match result {
184 Ok(resp) if resp.status().is_success() => {
185 let state_change = state.health_monitor.record_success().await;
186
187 if let Some(new_state) = state_change {
188 match new_state {
189 HealthState::Recovering => {
190 let messages = state.health_monitor.drain_buffer().await;
192 if !messages.is_empty() {
193 drain_buffered_messages(&state, messages).await;
194 }
195 }
196 HealthState::Healthy => {
197 send_recovery_notification(&state, &config).await;
199 }
200 _ => {}
201 }
202 }
203
204 tracing::debug!(name = %config_name, "Health check passed");
205 }
206 Ok(resp) => {
207 tracing::warn!(
208 name = %config_name,
209 status = %resp.status(),
210 "Health check failed (non-2xx)"
211 );
212
213 let state_change = state
214 .health_monitor
215 .record_failure(config.alert_after_failures)
216 .await;
217
218 if let Some(HealthState::Down) = state_change {
219 send_emergency_alert(&state, &config).await;
220 }
221 }
222 Err(e) => {
223 tracing::warn!(
224 name = %config_name,
225 error = %e,
226 "Health check failed (network error)"
227 );
228
229 let state_change = state
230 .health_monitor
231 .record_failure(config.alert_after_failures)
232 .await;
233
234 if let Some(HealthState::Down) = state_change {
235 send_emergency_alert(&state, &config).await;
236 }
237 }
238 }
239 }
240}
241
242async fn send_emergency_alert(state: &AppState, config: &HealthCheckConfig) {
244 let app_config = state.config.read().await;
245
246 let last_healthy = state
247 .health_monitor
248 .last_healthy_ago()
249 .await
250 .map(|d| format!("{:.0}s ago", d.as_secs_f64()))
251 .unwrap_or_else(|| "unknown".to_string());
252
253 let message = format!(
254 "🚨 ALERT: Target server is unreachable!\n\
255 Last healthy: {}\n\
256 Messages are being buffered.",
257 last_healthy
258 );
259
260 for cred_id in &config.notify_credentials {
261 if let Some(cred) = app_config.credentials.get(cred_id)
262 && cred.active
263 && cred.emergency
264 {
265 tracing::info!(
266 credential_id = %cred_id,
267 "Sending emergency alert"
268 );
269
270 if cred.adapter == "generic" {
274 tracing::warn!(
275 credential_id = %cred_id,
276 "Cannot send emergency alert via generic adapter (no persistent connection)"
277 );
278 } else {
279 tracing::info!(
281 credential_id = %cred_id,
282 adapter = %cred.adapter,
283 message = %message,
284 "Emergency alert (would be sent via adapter)"
285 );
286 }
287 }
288 }
289}
290
291async fn send_recovery_notification(state: &AppState, config: &HealthCheckConfig) {
293 let app_config = state.config.read().await;
294
295 let message = "✅ Target server has recovered. All systems operational.";
296
297 for cred_id in &config.notify_credentials {
298 if let Some(cred) = app_config.credentials.get(cred_id)
299 && cred.active
300 && cred.emergency
301 {
302 tracing::info!(
303 credential_id = %cred_id,
304 "Sending recovery notification"
305 );
306
307 if cred.adapter == "generic" {
308 tracing::warn!(
309 credential_id = %cred_id,
310 "Cannot send recovery notification via generic adapter"
311 );
312 } else {
313 tracing::info!(
315 credential_id = %cred_id,
316 adapter = %cred.adapter,
317 message = %message,
318 "Recovery notification (would be sent via adapter)"
319 );
320 }
321 }
322 }
323}
324
325async fn drain_buffered_messages(state: &AppState, messages: Vec<InboundMessage>) {
326 use crate::backend::{create_adapter, resolve_backend_name};
327
328 let config = state.config.read().await;
329 let gateway_ctx = crate::backend::GatewayContext {
330 gateway_url: format!("http://{}", config.gateway.listen),
331 send_token: config.auth.send_token.clone(),
332 };
333 drop(config);
334
335 for message in messages {
336 let config = state.config.read().await;
337 let adapter = if let Some(credential) = config.credentials.get(&message.credential_id) {
338 let backend_name = match resolve_backend_name(credential, &config.gateway) {
339 Some(name) => name,
340 None => {
341 tracing::error!(
342 credential_id = %message.credential_id,
343 "No backend configured for buffered message credential"
344 );
345 continue;
346 }
347 };
348 let backend_cfg = match config.backends.get(&backend_name) {
349 Some(cfg) => cfg,
350 None => {
351 tracing::error!(
352 credential_id = %message.credential_id,
353 backend = %backend_name,
354 "Backend not found for buffered message"
355 );
356 continue;
357 }
358 };
359 match create_adapter(
360 backend_cfg,
361 Some(&gateway_ctx),
362 credential.config.as_ref().or(backend_cfg.config.as_ref()),
363 ) {
364 Ok(a) => a,
365 Err(e) => {
366 tracing::error!(
367 message_id = %message.source.message_id,
368 credential_id = %message.credential_id,
369 error = %e,
370 "Failed to create backend adapter for buffered message"
371 );
372 continue;
373 }
374 }
375 } else {
376 tracing::warn!(
377 message_id = %message.source.message_id,
378 credential_id = %message.credential_id,
379 "Credential no longer exists, dropping buffered message"
380 );
381 continue;
382 };
383 drop(config);
384
385 match adapter.send_message(&message).await {
386 Ok(()) => {
387 tracing::debug!(
388 message_id = %message.source.message_id,
389 "Buffered message delivered"
390 );
391 }
392 Err(e) => {
393 tracing::error!(
394 message_id = %message.source.message_id,
395 error = %e,
396 "Failed to deliver buffered message"
397 );
398 }
399 }
400 }
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406
407 #[test]
408 fn test_health_state_display() {
409 assert_eq!(format!("{}", HealthState::Healthy), "healthy");
410 assert_eq!(format!("{}", HealthState::Degraded), "degraded");
411 assert_eq!(format!("{}", HealthState::Down), "down");
412 assert_eq!(format!("{}", HealthState::Recovering), "recovering");
413 }
414
415 #[tokio::test]
416 async fn test_health_monitor_new() {
417 let monitor = HealthMonitor::new(100);
418 assert_eq!(monitor.get_state().await, HealthState::Healthy);
419 assert_eq!(monitor.buffer_size().await, 0);
420 assert_eq!(monitor.max_buffer_size, 100);
421 }
422
423 #[tokio::test]
424 async fn test_health_monitor_record_success() {
425 let monitor = HealthMonitor::new(100);
426
427 assert_eq!(monitor.get_state().await, HealthState::Healthy);
429
430 let state_change = monitor.record_success().await;
432 assert!(state_change.is_none()); assert_eq!(monitor.get_state().await, HealthState::Healthy);
434 }
435
436 #[tokio::test]
437 async fn test_health_monitor_record_failure() {
438 let monitor = HealthMonitor::new(100);
439 let alert_threshold = 3;
440
441 let change1 = monitor.record_failure(alert_threshold).await;
443 assert_eq!(change1, Some(HealthState::Degraded));
444 assert_eq!(monitor.get_state().await, HealthState::Degraded);
445
446 let change2 = monitor.record_failure(alert_threshold).await;
448 assert!(change2.is_none());
449 assert_eq!(monitor.get_state().await, HealthState::Degraded);
450
451 let change3 = monitor.record_failure(alert_threshold).await;
453 assert_eq!(change3, Some(HealthState::Down));
454 assert_eq!(monitor.get_state().await, HealthState::Down);
455 }
456
457 #[tokio::test]
458 async fn test_health_monitor_recovery() {
459 let monitor = HealthMonitor::new(100);
460 let alert_threshold = 1;
461
462 monitor.record_failure(alert_threshold).await;
464 assert_eq!(monitor.get_state().await, HealthState::Down);
465
466 let change1 = monitor.record_success().await;
468 assert_eq!(change1, Some(HealthState::Recovering));
469 assert_eq!(monitor.get_state().await, HealthState::Recovering);
470
471 let change2 = monitor.record_success().await;
473 assert_eq!(change2, Some(HealthState::Healthy));
474 assert_eq!(monitor.get_state().await, HealthState::Healthy);
475 }
476
477 #[tokio::test]
478 async fn test_health_monitor_buffer() {
479 use crate::message::{InboundMessage, MessageSource, UserInfo};
480 use chrono::Utc;
481
482 let monitor = HealthMonitor::new(2); let make_message = |id: &str| InboundMessage {
485 route: serde_json::json!("test"),
486 credential_id: "cred1".to_string(),
487 source: MessageSource {
488 protocol: "test".to_string(),
489 chat_id: "chat1".to_string(),
490 message_id: id.to_string(),
491 reply_to_message_id: None,
492 from: UserInfo {
493 id: "user1".to_string(),
494 username: None,
495 display_name: None,
496 },
497 },
498 text: "test message".to_string(),
499 attachments: vec![],
500 timestamp: Utc::now(),
501 extra_data: None,
502 };
503
504 assert!(monitor.buffer_message(make_message("msg1")).await);
506 assert_eq!(monitor.buffer_size().await, 1);
507
508 assert!(monitor.buffer_message(make_message("msg2")).await);
510 assert_eq!(monitor.buffer_size().await, 2);
511
512 assert!(monitor.buffer_message(make_message("msg3")).await);
514 assert_eq!(monitor.buffer_size().await, 2);
515
516 let messages = monitor.drain_buffer().await;
518 assert_eq!(messages.len(), 2);
519 assert_eq!(messages[0].source.message_id, "msg2"); assert_eq!(messages[1].source.message_id, "msg3");
521
522 assert_eq!(monitor.buffer_size().await, 0);
524 }
525
526 #[tokio::test]
527 async fn test_last_healthy_ago() {
528 let monitor = HealthMonitor::new(100);
529
530 let duration = monitor.last_healthy_ago().await;
532 assert!(duration.is_some());
533 assert!(duration.unwrap().as_secs() < 1);
535
536 tokio::time::sleep(Duration::from_millis(10)).await;
538 monitor.record_success().await;
539 let duration2 = monitor.last_healthy_ago().await;
540 assert!(duration2.is_some());
541 assert!(duration2.unwrap().as_millis() < 500);
543 }
544
545 #[tokio::test]
546 async fn test_health_state_equality() {
547 assert_eq!(HealthState::Healthy, HealthState::Healthy);
549 assert_eq!(HealthState::Degraded, HealthState::Degraded);
550 assert_eq!(HealthState::Down, HealthState::Down);
551 assert_eq!(HealthState::Recovering, HealthState::Recovering);
552
553 assert_ne!(HealthState::Healthy, HealthState::Degraded);
554 assert_ne!(HealthState::Down, HealthState::Recovering);
555 }
556
557 #[tokio::test]
558 #[allow(clippy::clone_on_copy)]
559 async fn test_health_state_clone_copy() {
560 let state = HealthState::Healthy;
562 let cloned = state.clone(); let copied = state;
564
565 assert_eq!(state, cloned);
566 assert_eq!(state, copied);
567 }
568
569 #[tokio::test]
570 async fn test_multiple_failures_staying_down() {
571 let monitor = HealthMonitor::new(100);
572 let alert_threshold = 2;
573
574 monitor.record_failure(alert_threshold).await;
576 monitor.record_failure(alert_threshold).await;
577 assert_eq!(monitor.get_state().await, HealthState::Down);
578
579 let change = monitor.record_failure(alert_threshold).await;
581 assert!(change.is_none());
582 assert_eq!(monitor.get_state().await, HealthState::Down);
583
584 let change = monitor.record_failure(alert_threshold).await;
585 assert!(change.is_none());
586 assert_eq!(monitor.get_state().await, HealthState::Down);
587 }
588
589 #[tokio::test]
590 async fn test_failure_count_reset_on_success() {
591 let monitor = HealthMonitor::new(100);
592 let alert_threshold = 3;
593
594 monitor.record_failure(alert_threshold).await;
596 monitor.record_failure(alert_threshold).await;
597 assert_eq!(monitor.get_state().await, HealthState::Degraded);
598
599 monitor.record_success().await;
601 assert_eq!(monitor.get_state().await, HealthState::Healthy);
602
603 let change = monitor.record_failure(alert_threshold).await;
605 assert_eq!(change, Some(HealthState::Degraded));
606
607 let change = monitor.record_failure(alert_threshold).await;
609 assert!(change.is_none()); let change = monitor.record_failure(alert_threshold).await;
612 assert_eq!(change, Some(HealthState::Down));
613 }
614
615 #[tokio::test]
616 async fn test_drain_empty_buffer() {
617 let monitor = HealthMonitor::new(100);
618
619 let messages = monitor.drain_buffer().await;
621 assert!(messages.is_empty());
622 }
623
624 #[tokio::test]
625 async fn test_buffer_exactly_at_max_size() {
626 use crate::message::{InboundMessage, MessageSource, UserInfo};
627 use chrono::Utc;
628
629 let monitor = HealthMonitor::new(3);
630
631 let make_message = |id: &str| InboundMessage {
632 route: serde_json::json!("test"),
633 credential_id: "cred1".to_string(),
634 source: MessageSource {
635 protocol: "test".to_string(),
636 chat_id: "chat1".to_string(),
637 message_id: id.to_string(),
638 reply_to_message_id: None,
639 from: UserInfo {
640 id: "user1".to_string(),
641 username: None,
642 display_name: None,
643 },
644 },
645 text: "test message".to_string(),
646 attachments: vec![],
647 timestamp: Utc::now(),
648 extra_data: None,
649 };
650
651 monitor.buffer_message(make_message("msg1")).await;
653 monitor.buffer_message(make_message("msg2")).await;
654 monitor.buffer_message(make_message("msg3")).await;
655 assert_eq!(monitor.buffer_size().await, 3);
656
657 monitor.buffer_message(make_message("msg4")).await;
659 assert_eq!(monitor.buffer_size().await, 3);
660
661 let messages = monitor.drain_buffer().await;
662 assert_eq!(messages.len(), 3);
663 assert_eq!(messages[0].source.message_id, "msg2");
664 assert_eq!(messages[1].source.message_id, "msg3");
665 assert_eq!(messages[2].source.message_id, "msg4");
666 }
667
668 #[tokio::test]
669 async fn test_recovery_from_degraded_state() {
670 let monitor = HealthMonitor::new(100);
671 let alert_threshold = 5;
672
673 monitor.record_failure(alert_threshold).await;
675 assert_eq!(monitor.get_state().await, HealthState::Degraded);
676
677 let change = monitor.record_success().await;
679 assert_eq!(change, Some(HealthState::Healthy));
680 assert_eq!(monitor.get_state().await, HealthState::Healthy);
681 }
682
683 #[tokio::test]
684 async fn test_health_state_debug() {
685 let state = HealthState::Healthy;
687 let debug_str = format!("{:?}", state);
688 assert_eq!(debug_str, "Healthy");
689
690 let state = HealthState::Down;
691 let debug_str = format!("{:?}", state);
692 assert_eq!(debug_str, "Down");
693 }
694
695 #[tokio::test]
696 async fn test_threshold_of_one() {
697 let monitor = HealthMonitor::new(100);
698 let alert_threshold = 1;
699
700 let change = monitor.record_failure(alert_threshold).await;
702 assert_eq!(change, Some(HealthState::Down));
703 assert_eq!(monitor.get_state().await, HealthState::Down);
704 }
705
706 #[tokio::test]
707 async fn test_concurrent_buffer_access() {
708 use crate::message::{InboundMessage, MessageSource, UserInfo};
709 use chrono::Utc;
710 use std::sync::Arc;
711
712 let monitor = Arc::new(HealthMonitor::new(100));
713
714 let make_message = |id: &str| InboundMessage {
715 route: serde_json::json!("test"),
716 credential_id: "cred1".to_string(),
717 source: MessageSource {
718 protocol: "test".to_string(),
719 chat_id: "chat1".to_string(),
720 message_id: id.to_string(),
721 reply_to_message_id: None,
722 from: UserInfo {
723 id: "user1".to_string(),
724 username: None,
725 display_name: None,
726 },
727 },
728 text: "test message".to_string(),
729 attachments: vec![],
730 timestamp: Utc::now(),
731 extra_data: None,
732 };
733
734 let mut handles = vec![];
736 for i in 0..10 {
737 let monitor_clone = Arc::clone(&monitor);
738 let msg = make_message(&format!("msg{}", i));
739 handles.push(tokio::spawn(async move {
740 monitor_clone.buffer_message(msg).await
741 }));
742 }
743
744 for handle in handles {
746 handle.await.unwrap();
747 }
748
749 assert_eq!(monitor.buffer_size().await, 10);
751 }
752}