Skip to main content

msg_gateway/
health.rs

1//! Health Check and Emergency Mode
2//!
3//! Monitors the target server (Pipelit) health and triggers emergency alerts
4//! when it becomes unreachable.
5
6use std::collections::VecDeque;
7use std::sync::Arc;
8use std::time::{Duration, Instant};
9use tokio::sync::RwLock;
10
11use crate::config::HealthCheckConfig;
12use crate::message::InboundMessage;
13use crate::server::AppState;
14
15/// Health check state
16#[derive(Debug, Clone, Copy, PartialEq, Eq)]
17pub enum HealthState {
18    /// Target server is healthy
19    Healthy,
20    /// Target server has some failures but not yet critical
21    Degraded,
22    /// Target server is down, emergency mode active
23    Down,
24    /// Target server is recovering from down state
25    Recovering,
26}
27
28impl std::fmt::Display for HealthState {
29    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
30        match self {
31            HealthState::Healthy => write!(f, "healthy"),
32            HealthState::Degraded => write!(f, "degraded"),
33            HealthState::Down => write!(f, "down"),
34            HealthState::Recovering => write!(f, "recovering"),
35        }
36    }
37}
38
39/// Health monitor state
40pub struct HealthMonitor {
41    /// Current health state
42    pub state: RwLock<HealthState>,
43    /// Consecutive failure count
44    pub failure_count: RwLock<u32>,
45    /// Last successful health check
46    pub last_healthy: RwLock<Option<Instant>>,
47    /// Buffered messages during outage
48    pub buffer: RwLock<VecDeque<InboundMessage>>,
49    /// Maximum buffer size
50    pub max_buffer_size: usize,
51}
52
53impl HealthMonitor {
54    pub fn new(max_buffer_size: usize) -> Self {
55        Self {
56            state: RwLock::new(HealthState::Healthy),
57            failure_count: RwLock::new(0),
58            last_healthy: RwLock::new(Some(Instant::now())),
59            buffer: RwLock::new(VecDeque::new()),
60            max_buffer_size,
61        }
62    }
63
64    /// Get current state
65    pub async fn get_state(&self) -> HealthState {
66        *self.state.read().await
67    }
68
69    /// Record a successful health check
70    pub async fn record_success(&self) -> Option<HealthState> {
71        let old_state = *self.state.read().await;
72
73        *self.failure_count.write().await = 0;
74        *self.last_healthy.write().await = Some(Instant::now());
75
76        let new_state = match old_state {
77            HealthState::Down => HealthState::Recovering,
78            HealthState::Recovering => HealthState::Healthy,
79            _ => HealthState::Healthy,
80        };
81
82        if new_state != old_state {
83            *self.state.write().await = new_state;
84            tracing::info!(
85                old_state = %old_state,
86                new_state = %new_state,
87                "Health state changed"
88            );
89            Some(new_state)
90        } else {
91            None
92        }
93    }
94
95    /// Record a failed health check
96    pub async fn record_failure(&self, alert_threshold: u32) -> Option<HealthState> {
97        let old_state = *self.state.read().await;
98
99        let mut count = self.failure_count.write().await;
100        *count += 1;
101        let failure_count = *count;
102        drop(count);
103
104        let new_state = if failure_count >= alert_threshold {
105            HealthState::Down
106        } else {
107            HealthState::Degraded
108        };
109
110        if new_state != old_state {
111            *self.state.write().await = new_state;
112            tracing::warn!(
113                old_state = %old_state,
114                new_state = %new_state,
115                failure_count = failure_count,
116                "Health state changed"
117            );
118            Some(new_state)
119        } else {
120            None
121        }
122    }
123
124    /// Buffer a message during outage
125    pub async fn buffer_message(&self, message: InboundMessage) -> bool {
126        let mut buffer = self.buffer.write().await;
127
128        if buffer.len() >= self.max_buffer_size {
129            // Drop oldest message
130            buffer.pop_front();
131            tracing::warn!("Message buffer full, dropping oldest message");
132        }
133
134        buffer.push_back(message);
135        tracing::debug!(buffer_size = buffer.len(), "Message buffered");
136        true
137    }
138
139    /// Drain buffered messages
140    pub async fn drain_buffer(&self) -> Vec<InboundMessage> {
141        let mut buffer = self.buffer.write().await;
142        let messages: Vec<_> = buffer.drain(..).collect();
143        tracing::info!(count = messages.len(), "Draining buffered messages");
144        messages
145    }
146
147    /// Get buffer size
148    pub async fn buffer_size(&self) -> usize {
149        self.buffer.read().await.len()
150    }
151
152    /// Get last healthy timestamp
153    pub async fn last_healthy_ago(&self) -> Option<Duration> {
154        self.last_healthy.read().await.map(|t| t.elapsed())
155    }
156}
157
158/// Start the health check loop
159pub async fn start_health_check(
160    state: Arc<AppState>,
161    config_name: String,
162    config: HealthCheckConfig,
163) {
164    let interval = Duration::from_secs(config.interval_seconds as u64);
165    let client = reqwest::Client::builder()
166        .timeout(Duration::from_secs(10))
167        .build()
168        .unwrap();
169
170    tracing::info!(
171        name = %config_name,
172        url = %config.url,
173        interval_secs = config.interval_seconds,
174        alert_after = config.alert_after_failures,
175        "Starting health check"
176    );
177
178    loop {
179        tokio::time::sleep(interval).await;
180
181        let result = client.get(&config.url).send().await;
182
183        match result {
184            Ok(resp) if resp.status().is_success() => {
185                let state_change = state.health_monitor.record_success().await;
186
187                if let Some(new_state) = state_change {
188                    match new_state {
189                        HealthState::Recovering => {
190                            // Drain buffer and send messages
191                            let messages = state.health_monitor.drain_buffer().await;
192                            if !messages.is_empty() {
193                                drain_buffered_messages(&state, messages).await;
194                            }
195                        }
196                        HealthState::Healthy => {
197                            // Send recovery notification
198                            send_recovery_notification(&state, &config).await;
199                        }
200                        _ => {}
201                    }
202                }
203
204                tracing::debug!(name = %config_name, "Health check passed");
205            }
206            Ok(resp) => {
207                tracing::warn!(
208                    name = %config_name,
209                    status = %resp.status(),
210                    "Health check failed (non-2xx)"
211                );
212
213                let state_change = state
214                    .health_monitor
215                    .record_failure(config.alert_after_failures)
216                    .await;
217
218                if let Some(HealthState::Down) = state_change {
219                    send_emergency_alert(&state, &config).await;
220                }
221            }
222            Err(e) => {
223                tracing::warn!(
224                    name = %config_name,
225                    error = %e,
226                    "Health check failed (network error)"
227                );
228
229                let state_change = state
230                    .health_monitor
231                    .record_failure(config.alert_after_failures)
232                    .await;
233
234                if let Some(HealthState::Down) = state_change {
235                    send_emergency_alert(&state, &config).await;
236                }
237            }
238        }
239    }
240}
241
242/// Send emergency alert to all emergency credentials
243async fn send_emergency_alert(state: &AppState, config: &HealthCheckConfig) {
244    let app_config = state.config.read().await;
245
246    let last_healthy = state
247        .health_monitor
248        .last_healthy_ago()
249        .await
250        .map(|d| format!("{:.0}s ago", d.as_secs_f64()))
251        .unwrap_or_else(|| "unknown".to_string());
252
253    let message = format!(
254        "🚨 ALERT: Target server is unreachable!\n\
255         Last healthy: {}\n\
256         Messages are being buffered.",
257        last_healthy
258    );
259
260    for cred_id in &config.notify_credentials {
261        if let Some(cred) = app_config.credentials.get(cred_id)
262            && cred.active
263            && cred.emergency
264        {
265            tracing::info!(
266                credential_id = %cred_id,
267                "Sending emergency alert"
268            );
269
270            // For generic adapter, we can't really send an alert
271            // since it requires a client to be connected.
272            // For external adapters, we would POST to them.
273            if cred.adapter == "generic" {
274                tracing::warn!(
275                    credential_id = %cred_id,
276                    "Cannot send emergency alert via generic adapter (no persistent connection)"
277                );
278            } else {
279                // TODO: POST to external adapter's /send endpoint
280                tracing::info!(
281                    credential_id = %cred_id,
282                    adapter = %cred.adapter,
283                    message = %message,
284                    "Emergency alert (would be sent via adapter)"
285                );
286            }
287        }
288    }
289}
290
291/// Send recovery notification
292async fn send_recovery_notification(state: &AppState, config: &HealthCheckConfig) {
293    let app_config = state.config.read().await;
294
295    let message = "✅ Target server has recovered. All systems operational.";
296
297    for cred_id in &config.notify_credentials {
298        if let Some(cred) = app_config.credentials.get(cred_id)
299            && cred.active
300            && cred.emergency
301        {
302            tracing::info!(
303                credential_id = %cred_id,
304                "Sending recovery notification"
305            );
306
307            if cred.adapter == "generic" {
308                tracing::warn!(
309                    credential_id = %cred_id,
310                    "Cannot send recovery notification via generic adapter"
311                );
312            } else {
313                // TODO: POST to external adapter's /send endpoint
314                tracing::info!(
315                    credential_id = %cred_id,
316                    adapter = %cred.adapter,
317                    message = %message,
318                    "Recovery notification (would be sent via adapter)"
319                );
320            }
321        }
322    }
323}
324
325async fn drain_buffered_messages(state: &AppState, messages: Vec<InboundMessage>) {
326    use crate::backend::{create_adapter, resolve_backend_name};
327
328    let config = state.config.read().await;
329    let gateway_ctx = crate::backend::GatewayContext {
330        gateway_url: format!("http://{}", config.gateway.listen),
331        send_token: config.auth.send_token.clone(),
332    };
333    drop(config);
334
335    for message in messages {
336        let config = state.config.read().await;
337        let adapter = if let Some(credential) = config.credentials.get(&message.credential_id) {
338            let backend_name = match resolve_backend_name(credential, &config.gateway) {
339                Some(name) => name,
340                None => {
341                    tracing::error!(
342                        credential_id = %message.credential_id,
343                        "No backend configured for buffered message credential"
344                    );
345                    continue;
346                }
347            };
348            let backend_cfg = match config.backends.get(&backend_name) {
349                Some(cfg) => cfg,
350                None => {
351                    tracing::error!(
352                        credential_id = %message.credential_id,
353                        backend = %backend_name,
354                        "Backend not found for buffered message"
355                    );
356                    continue;
357                }
358            };
359            match create_adapter(
360                backend_cfg,
361                Some(&gateway_ctx),
362                credential.config.as_ref().or(backend_cfg.config.as_ref()),
363            ) {
364                Ok(a) => a,
365                Err(e) => {
366                    tracing::error!(
367                        message_id = %message.source.message_id,
368                        credential_id = %message.credential_id,
369                        error = %e,
370                        "Failed to create backend adapter for buffered message"
371                    );
372                    continue;
373                }
374            }
375        } else {
376            tracing::warn!(
377                message_id = %message.source.message_id,
378                credential_id = %message.credential_id,
379                "Credential no longer exists, dropping buffered message"
380            );
381            continue;
382        };
383        drop(config);
384
385        match adapter.send_message(&message).await {
386            Ok(()) => {
387                tracing::debug!(
388                    message_id = %message.source.message_id,
389                    "Buffered message delivered"
390                );
391            }
392            Err(e) => {
393                tracing::error!(
394                    message_id = %message.source.message_id,
395                    error = %e,
396                    "Failed to deliver buffered message"
397                );
398            }
399        }
400    }
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406
407    #[test]
408    fn test_health_state_display() {
409        assert_eq!(format!("{}", HealthState::Healthy), "healthy");
410        assert_eq!(format!("{}", HealthState::Degraded), "degraded");
411        assert_eq!(format!("{}", HealthState::Down), "down");
412        assert_eq!(format!("{}", HealthState::Recovering), "recovering");
413    }
414
415    #[tokio::test]
416    async fn test_health_monitor_new() {
417        let monitor = HealthMonitor::new(100);
418        assert_eq!(monitor.get_state().await, HealthState::Healthy);
419        assert_eq!(monitor.buffer_size().await, 0);
420        assert_eq!(monitor.max_buffer_size, 100);
421    }
422
423    #[tokio::test]
424    async fn test_health_monitor_record_success() {
425        let monitor = HealthMonitor::new(100);
426
427        // Initial state is healthy
428        assert_eq!(monitor.get_state().await, HealthState::Healthy);
429
430        // Recording success should keep it healthy
431        let state_change = monitor.record_success().await;
432        assert!(state_change.is_none()); // No state change
433        assert_eq!(monitor.get_state().await, HealthState::Healthy);
434    }
435
436    #[tokio::test]
437    async fn test_health_monitor_record_failure() {
438        let monitor = HealthMonitor::new(100);
439        let alert_threshold = 3;
440
441        // First failure - Healthy -> Degraded
442        let change1 = monitor.record_failure(alert_threshold).await;
443        assert_eq!(change1, Some(HealthState::Degraded));
444        assert_eq!(monitor.get_state().await, HealthState::Degraded);
445
446        // Second failure - stays Degraded (no state change)
447        let change2 = monitor.record_failure(alert_threshold).await;
448        assert!(change2.is_none());
449        assert_eq!(monitor.get_state().await, HealthState::Degraded);
450
451        // Third failure - Degraded -> Down (threshold reached)
452        let change3 = monitor.record_failure(alert_threshold).await;
453        assert_eq!(change3, Some(HealthState::Down));
454        assert_eq!(monitor.get_state().await, HealthState::Down);
455    }
456
457    #[tokio::test]
458    async fn test_health_monitor_recovery() {
459        let monitor = HealthMonitor::new(100);
460        let alert_threshold = 1;
461
462        // Go to Down state
463        monitor.record_failure(alert_threshold).await;
464        assert_eq!(monitor.get_state().await, HealthState::Down);
465
466        // First success - should go to Recovering
467        let change1 = monitor.record_success().await;
468        assert_eq!(change1, Some(HealthState::Recovering));
469        assert_eq!(monitor.get_state().await, HealthState::Recovering);
470
471        // Second success - should go to Healthy
472        let change2 = monitor.record_success().await;
473        assert_eq!(change2, Some(HealthState::Healthy));
474        assert_eq!(monitor.get_state().await, HealthState::Healthy);
475    }
476
477    #[tokio::test]
478    async fn test_health_monitor_buffer() {
479        use crate::message::{InboundMessage, MessageSource, UserInfo};
480        use chrono::Utc;
481
482        let monitor = HealthMonitor::new(2); // Small buffer for testing
483
484        let make_message = |id: &str| InboundMessage {
485            route: serde_json::json!("test"),
486            credential_id: "cred1".to_string(),
487            source: MessageSource {
488                protocol: "test".to_string(),
489                chat_id: "chat1".to_string(),
490                message_id: id.to_string(),
491                reply_to_message_id: None,
492                from: UserInfo {
493                    id: "user1".to_string(),
494                    username: None,
495                    display_name: None,
496                },
497            },
498            text: "test message".to_string(),
499            attachments: vec![],
500            timestamp: Utc::now(),
501            extra_data: None,
502        };
503
504        // Buffer first message
505        assert!(monitor.buffer_message(make_message("msg1")).await);
506        assert_eq!(monitor.buffer_size().await, 1);
507
508        // Buffer second message
509        assert!(monitor.buffer_message(make_message("msg2")).await);
510        assert_eq!(monitor.buffer_size().await, 2);
511
512        // Buffer third message - should drop oldest
513        assert!(monitor.buffer_message(make_message("msg3")).await);
514        assert_eq!(monitor.buffer_size().await, 2);
515
516        // Drain buffer
517        let messages = monitor.drain_buffer().await;
518        assert_eq!(messages.len(), 2);
519        assert_eq!(messages[0].source.message_id, "msg2"); // First was dropped
520        assert_eq!(messages[1].source.message_id, "msg3");
521
522        // Buffer should be empty now
523        assert_eq!(monitor.buffer_size().await, 0);
524    }
525
526    #[tokio::test]
527    async fn test_last_healthy_ago() {
528        let monitor = HealthMonitor::new(100);
529
530        // Initially should have a last_healthy timestamp
531        let duration = monitor.last_healthy_ago().await;
532        assert!(duration.is_some());
533        // Should be very recent (less than 1 second)
534        assert!(duration.unwrap().as_secs() < 1);
535
536        // After recording success, should update timestamp
537        tokio::time::sleep(Duration::from_millis(10)).await;
538        monitor.record_success().await;
539        let duration2 = monitor.last_healthy_ago().await;
540        assert!(duration2.is_some());
541        // Use lenient threshold to avoid flaky tests on slow/busy systems
542        assert!(duration2.unwrap().as_millis() < 500);
543    }
544
545    #[tokio::test]
546    async fn test_health_state_equality() {
547        // Test PartialEq and Eq implementations
548        assert_eq!(HealthState::Healthy, HealthState::Healthy);
549        assert_eq!(HealthState::Degraded, HealthState::Degraded);
550        assert_eq!(HealthState::Down, HealthState::Down);
551        assert_eq!(HealthState::Recovering, HealthState::Recovering);
552
553        assert_ne!(HealthState::Healthy, HealthState::Degraded);
554        assert_ne!(HealthState::Down, HealthState::Recovering);
555    }
556
557    #[tokio::test]
558    #[allow(clippy::clone_on_copy)]
559    async fn test_health_state_clone_copy() {
560        // Test Clone and Copy implementations
561        let state = HealthState::Healthy;
562        let cloned = state.clone(); // Intentionally testing clone on Copy type
563        let copied = state;
564
565        assert_eq!(state, cloned);
566        assert_eq!(state, copied);
567    }
568
569    #[tokio::test]
570    async fn test_multiple_failures_staying_down() {
571        let monitor = HealthMonitor::new(100);
572        let alert_threshold = 2;
573
574        // Go to Down state
575        monitor.record_failure(alert_threshold).await;
576        monitor.record_failure(alert_threshold).await;
577        assert_eq!(monitor.get_state().await, HealthState::Down);
578
579        // Additional failures should not cause state changes
580        let change = monitor.record_failure(alert_threshold).await;
581        assert!(change.is_none());
582        assert_eq!(monitor.get_state().await, HealthState::Down);
583
584        let change = monitor.record_failure(alert_threshold).await;
585        assert!(change.is_none());
586        assert_eq!(monitor.get_state().await, HealthState::Down);
587    }
588
589    #[tokio::test]
590    async fn test_failure_count_reset_on_success() {
591        let monitor = HealthMonitor::new(100);
592        let alert_threshold = 3;
593
594        // Add some failures (but not enough to go Down)
595        monitor.record_failure(alert_threshold).await;
596        monitor.record_failure(alert_threshold).await;
597        assert_eq!(monitor.get_state().await, HealthState::Degraded);
598
599        // Success should reset failure count
600        monitor.record_success().await;
601        assert_eq!(monitor.get_state().await, HealthState::Healthy);
602
603        // Now failures should start from 0 again
604        let change = monitor.record_failure(alert_threshold).await;
605        assert_eq!(change, Some(HealthState::Degraded));
606
607        // Need 2 more failures to reach threshold, not just 1
608        let change = monitor.record_failure(alert_threshold).await;
609        assert!(change.is_none()); // Still degraded
610
611        let change = monitor.record_failure(alert_threshold).await;
612        assert_eq!(change, Some(HealthState::Down));
613    }
614
615    #[tokio::test]
616    async fn test_drain_empty_buffer() {
617        let monitor = HealthMonitor::new(100);
618
619        // Draining empty buffer should return empty vec
620        let messages = monitor.drain_buffer().await;
621        assert!(messages.is_empty());
622    }
623
624    #[tokio::test]
625    async fn test_buffer_exactly_at_max_size() {
626        use crate::message::{InboundMessage, MessageSource, UserInfo};
627        use chrono::Utc;
628
629        let monitor = HealthMonitor::new(3);
630
631        let make_message = |id: &str| InboundMessage {
632            route: serde_json::json!("test"),
633            credential_id: "cred1".to_string(),
634            source: MessageSource {
635                protocol: "test".to_string(),
636                chat_id: "chat1".to_string(),
637                message_id: id.to_string(),
638                reply_to_message_id: None,
639                from: UserInfo {
640                    id: "user1".to_string(),
641                    username: None,
642                    display_name: None,
643                },
644            },
645            text: "test message".to_string(),
646            attachments: vec![],
647            timestamp: Utc::now(),
648            extra_data: None,
649        };
650
651        // Fill buffer exactly to capacity
652        monitor.buffer_message(make_message("msg1")).await;
653        monitor.buffer_message(make_message("msg2")).await;
654        monitor.buffer_message(make_message("msg3")).await;
655        assert_eq!(monitor.buffer_size().await, 3);
656
657        // Next message should cause oldest to be dropped
658        monitor.buffer_message(make_message("msg4")).await;
659        assert_eq!(monitor.buffer_size().await, 3);
660
661        let messages = monitor.drain_buffer().await;
662        assert_eq!(messages.len(), 3);
663        assert_eq!(messages[0].source.message_id, "msg2");
664        assert_eq!(messages[1].source.message_id, "msg3");
665        assert_eq!(messages[2].source.message_id, "msg4");
666    }
667
668    #[tokio::test]
669    async fn test_recovery_from_degraded_state() {
670        let monitor = HealthMonitor::new(100);
671        let alert_threshold = 5;
672
673        // Go to Degraded state (but not Down)
674        monitor.record_failure(alert_threshold).await;
675        assert_eq!(monitor.get_state().await, HealthState::Degraded);
676
677        // Success from Degraded should go directly to Healthy
678        let change = monitor.record_success().await;
679        assert_eq!(change, Some(HealthState::Healthy));
680        assert_eq!(monitor.get_state().await, HealthState::Healthy);
681    }
682
683    #[tokio::test]
684    async fn test_health_state_debug() {
685        // Test Debug implementation
686        let state = HealthState::Healthy;
687        let debug_str = format!("{:?}", state);
688        assert_eq!(debug_str, "Healthy");
689
690        let state = HealthState::Down;
691        let debug_str = format!("{:?}", state);
692        assert_eq!(debug_str, "Down");
693    }
694
695    #[tokio::test]
696    async fn test_threshold_of_one() {
697        let monitor = HealthMonitor::new(100);
698        let alert_threshold = 1;
699
700        // First failure should immediately go to Down
701        let change = monitor.record_failure(alert_threshold).await;
702        assert_eq!(change, Some(HealthState::Down));
703        assert_eq!(monitor.get_state().await, HealthState::Down);
704    }
705
706    #[tokio::test]
707    async fn test_concurrent_buffer_access() {
708        use crate::message::{InboundMessage, MessageSource, UserInfo};
709        use chrono::Utc;
710        use std::sync::Arc;
711
712        let monitor = Arc::new(HealthMonitor::new(100));
713
714        let make_message = |id: &str| InboundMessage {
715            route: serde_json::json!("test"),
716            credential_id: "cred1".to_string(),
717            source: MessageSource {
718                protocol: "test".to_string(),
719                chat_id: "chat1".to_string(),
720                message_id: id.to_string(),
721                reply_to_message_id: None,
722                from: UserInfo {
723                    id: "user1".to_string(),
724                    username: None,
725                    display_name: None,
726                },
727            },
728            text: "test message".to_string(),
729            attachments: vec![],
730            timestamp: Utc::now(),
731            extra_data: None,
732        };
733
734        // Spawn multiple tasks to buffer messages concurrently
735        let mut handles = vec![];
736        for i in 0..10 {
737            let monitor_clone = Arc::clone(&monitor);
738            let msg = make_message(&format!("msg{}", i));
739            handles.push(tokio::spawn(async move {
740                monitor_clone.buffer_message(msg).await
741            }));
742        }
743
744        // Wait for all tasks
745        for handle in handles {
746            handle.await.unwrap();
747        }
748
749        // All messages should be buffered
750        assert_eq!(monitor.buffer_size().await, 10);
751    }
752}