bloop_server_framework/health_monitor/
mod.rs

1use crate::event::Event;
2#[cfg(test)]
3use crate::test_utils::Utc;
4use chrono::DateTime;
5#[cfg(not(test))]
6use chrono::Utc;
7use serde::Serialize;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::net::IpAddr;
10use std::time::Duration;
11use thiserror::Error;
12use tokio::sync::broadcast::{self, error::RecvError};
13use tokio::{select, time};
14use tokio_graceful_shutdown::{FutureExt, IntoSubsystem, SubsystemHandle};
15use tracing::{debug, warn};
16
17#[cfg(feature = "health-monitor-telegram")]
18pub mod telegram;
19
20/// The kind of client event.
21#[derive(Debug, Serialize, Clone, Copy)]
22#[serde(rename_all = "camelCase")]
23pub enum ClientEventKind {
24    Connect,
25    Disconnect,
26    ConnectionLoss,
27}
28
29/// Represents a client event with type, connection ID, and timestamp.
30#[derive(Debug, Serialize)]
31#[serde(rename_all = "camelCase")]
32#[cfg_attr(test, derive(Clone))]
33pub struct ClientEvent {
34    /// The type of event.
35    pub kind: ClientEventKind,
36    /// The identifier of the client connection.
37    pub conn_id: usize,
38    /// When the event occurred.
39    pub timestamp: DateTime<chrono::Utc>,
40}
41
42/// The health status of a client.
43#[derive(Debug, Serialize, Ord, PartialOrd, Eq, PartialEq, Clone, Copy)]
44#[serde(rename_all = "camelCase")]
45pub enum Health {
46    Healthy,
47    Unstable,
48    Offline,
49}
50
51/// Current status information for a client.
52#[derive(Debug, Serialize)]
53#[serde(rename_all = "camelCase")]
54#[cfg_attr(test, derive(Clone))]
55pub struct ClientStatus {
56    /// The current health status of the client.
57    pub health: Health,
58    /// The timestamp since this status is valid.
59    pub since: DateTime<chrono::Utc>,
60    /// The local IP address of the client, if known.
61    pub local_ip: Option<IpAddr>,
62    /// A queue of recent events for this client.
63    pub events: VecDeque<ClientEvent>,
64}
65
66impl Default for ClientStatus {
67    fn default() -> Self {
68        Self {
69            health: Health::Offline,
70            since: Utc::now(),
71            local_ip: None,
72            events: VecDeque::new(),
73        }
74    }
75}
76
77/// A report containing the status of multiple clients.
78#[derive(Debug, Serialize)]
79#[cfg_attr(test, derive(Clone))]
80pub struct HealthReport {
81    pub clients: HashMap<String, ClientStatus>,
82}
83
84impl HealthReport {
85    fn new(client_ids: HashSet<String>) -> Self {
86        Self {
87            clients: client_ids
88                .into_iter()
89                .map(|client_id| (client_id, ClientStatus::default()))
90                .collect(),
91        }
92    }
93
94    /// Formats the health report as a human-readable text summary.
95    ///
96    /// Clients are sorted by health status, then by client ID. Displays an
97    /// icon, client ID, IP address, health status, and duration.
98    pub fn as_text_report(&self) -> String {
99        let mut clients = self
100            .clients
101            .iter()
102            .collect::<Vec<(&String, &ClientStatus)>>();
103
104        clients.sort_by(|&a, &b| {
105            if a.1.health == b.1.health {
106                return a.0.cmp(b.0);
107            }
108
109            a.1.health.cmp(&b.1.health)
110        });
111
112        let now = Utc::now();
113
114        clients
115            .iter()
116            .map(|(client_id, status)| {
117                let mut text = String::new();
118                let local_ip = match status.local_ip {
119                    Some(local_ip) => local_ip.to_string(),
120                    None => "unknown".to_string(),
121                };
122                let (emoji, label) = match status.health {
123                    Health::Healthy => ('🟢', "Healthy"),
124                    Health::Unstable => ('🔴', "Unhealthy"),
125                    Health::Offline => ('âš«', "Offline"),
126                };
127
128                let status_duration = (now - status.since).num_minutes();
129                text.push_str(&format!(
130                    "{emoji} {client_id} ({local_ip}): {label} for {status_duration} minutes"
131                ));
132
133                text
134            })
135            .collect::<Vec<String>>()
136            .join("\n")
137    }
138}
139
140/// A trait for sending health reports asynchronously.
141pub trait HealthReportSender {
142    type Error: std::fmt::Debug + std::fmt::Display + Send + Sync + 'static;
143
144    /// Sends a health report asynchronously.
145    ///
146    /// The `report` argument is the health report to send. If `silent` is true,
147    /// notifications should be suppressed if possible.
148    ///
149    /// Returns `Ok(())` on success, or an error of type `Self::Error` on failure.
150    fn send(
151        &mut self,
152        report: &HealthReport,
153        silent: bool,
154    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
155}
156
157#[derive(Debug, Error)]
158pub enum BuilderError {
159    #[error("missing field: {0}")]
160    MissingField(&'static str),
161
162    #[error("invalid value: {0}")]
163    InvalidValue(&'static str),
164}
165
166/// Builder for creating a [`HealthMonitor`] instance.
167///
168/// Allows configuring optional parameters with sensible defaults. Required
169/// parameters must be set before calling [`build()`].
170///
171/// # Examples
172///
173/// ```
174/// use std::convert::Infallible;
175/// use tokio::sync::broadcast;
176/// use bloop_server_framework::health_monitor::{
177///     HealthMonitorBuilder,
178///     HealthReport,
179///     HealthReportSender
180/// };
181///
182/// struct DummySender;
183///
184/// impl HealthReportSender for DummySender {
185///     type Error = Infallible;
186///
187///     async fn send(
188///         &mut self,
189///         report: &HealthReport,
190///         silent: bool
191///     ) -> Result<(), Self::Error> {
192///          Ok(())
193///     }
194/// }
195///
196/// let (_, event_rx) = broadcast::channel(512);
197///
198/// let monitor = HealthMonitorBuilder::new()
199///     .sender(DummySender)
200///     .event_rx(event_rx)
201///     .max_client_events(100)
202///     .build()
203///     .expect("failed to build HealthMonitor");
204/// ```
205#[derive(Debug, Default)]
206pub struct HealthMonitorBuilder<T: HealthReportSender> {
207    sender: Option<T>,
208    event_rx: Option<broadcast::Receiver<Event>>,
209    client_ids: HashSet<String>,
210    max_client_events: usize,
211    connection_loss_limit: usize,
212    check_interval: Duration,
213    reminder_interval: Duration,
214    offline_grace_period: Duration,
215    connection_loss_window: Duration,
216    recovery_grace_period: Duration,
217}
218
219impl<T: HealthReportSender> HealthMonitorBuilder<T> {
220    /// Creates a new [`HealthMonitorBuilder`] with default configuration values.
221    ///
222    /// You must provide at least a sender and an event receiver before calling
223    /// [`build()`].
224    pub fn new() -> Self {
225        Self {
226            sender: None,
227            event_rx: None,
228            client_ids: HashSet::new(),
229            max_client_events: 50,
230            connection_loss_limit: 3,
231            check_interval: Duration::from_secs(30),
232            reminder_interval: Duration::from_secs(1800),
233            offline_grace_period: Duration::from_secs(120),
234            connection_loss_window: Duration::from_secs(600),
235            recovery_grace_period: Duration::from_secs(1200),
236        }
237    }
238
239    /// Sets the report sender implementation.
240    ///
241    /// This is a required field. The sender is used to transmit health reports
242    /// during monitoring.
243    pub fn sender(mut self, sender: T) -> Self {
244        self.sender = Some(sender);
245        self
246    }
247
248    /// Sets the event receiver used to receive client health-related events.
249    ///
250    /// This is a required field. The monitor listens to this channel for incoming
251    /// events.
252    pub fn event_rx(mut self, event_rx: broadcast::Receiver<Event>) -> Self {
253        self.event_rx = Some(event_rx);
254        self
255    }
256
257    /// Sets the initial list of known client IDs to track in the health report.
258    ///
259    /// Defaults to an empty set if not provided.
260    pub fn client_ids(mut self, client_ids: HashSet<String>) -> Self {
261        self.client_ids = client_ids;
262        self
263    }
264
265    /// Sets the maximum number of recent events to store per client.
266    ///
267    /// Defaults to `50`. Events beyond this limit are discarded (oldest first).
268    pub fn max_client_events(mut self, max_client_events: usize) -> Self {
269        self.max_client_events = max_client_events;
270        self
271    }
272
273    /// Sets the number of recent `ConnectionLoss` events required to consider a
274    /// client unstable.
275    ///
276    /// Defaults to `3`.
277    pub fn connection_loss_limit(mut self, connection_loss_limit: usize) -> Self {
278        self.connection_loss_limit = connection_loss_limit;
279        self
280    }
281
282    /// Sets how frequently the system checks for health status updates.
283    ///
284    /// Defaults to `30 seconds`.
285    pub fn check_interval(mut self, check_interval: Duration) -> Self {
286        self.check_interval = check_interval;
287        self
288    }
289
290    /// Sets the interval for sending health reminders if unhealthy clients are
291    /// still present.
292    ///
293    /// Defaults to `30 minutes`.
294    pub fn reminder_interval(mut self, reminder_interval: Duration) -> Self {
295        self.reminder_interval = reminder_interval;
296        self
297    }
298
299    /// Sets the grace period after the last connection loss before a client is
300    /// considered offline.
301    ///
302    /// Defaults to `2 minutes`.
303    pub fn offline_grace_period(mut self, offline_grace_period: Duration) -> Self {
304        self.offline_grace_period = offline_grace_period;
305        self
306    }
307
308    /// Sets the window of time used to count recent connection loss events.
309    ///
310    /// Defaults to `10 minutes`.
311    pub fn connection_loss_window(mut self, connection_loss_window: Duration) -> Self {
312        self.connection_loss_window = connection_loss_window;
313        self
314    }
315
316    /// Sets the duration a client must remain stable before being promoted to
317    /// healthy.
318    ///
319    /// Defaults to `20 minutes`.
320    pub fn recovery_grace_period(mut self, recovery_grace_period: Duration) -> Self {
321        self.recovery_grace_period = recovery_grace_period;
322        self
323    }
324
325    /// Builds a [`HealthMonitor`] instance using the configured values.
326    ///
327    /// Returns an error if required fields like the sender or event receiver are
328    /// missing.
329    pub fn build(self) -> Result<HealthMonitor<T>, BuilderError> {
330        let sender = self.sender.ok_or(BuilderError::MissingField("sender"))?;
331        let event_rx = self.event_rx.ok_or(BuilderError::MissingField("event_r"))?;
332
333        if self.max_client_events < 2 {
334            return Err(BuilderError::InvalidValue(
335                "max_client_events must be at least 2",
336            ));
337        }
338
339        if self.connection_loss_limit < 1 {
340            return Err(BuilderError::InvalidValue(
341                "connection_loss_limit must be at least 1",
342            ));
343        }
344
345        Ok(HealthMonitor {
346            sender,
347            event_rx,
348            report: HealthReport::new(self.client_ids),
349            max_client_events: self.max_client_events,
350            connection_loss_limit: self.connection_loss_limit,
351            check_interval: self.check_interval,
352            reminder_interval: self.reminder_interval,
353            offline_grace_period: self.offline_grace_period,
354            connection_loss_window: self.connection_loss_window,
355            recovery_grace_period: self.recovery_grace_period,
356            #[cfg(test)]
357            test_notify_event_processed: std::sync::Arc::new(tokio::sync::Notify::new()),
358        })
359    }
360}
361
362/// A runtime health monitoring service.
363///
364/// The `HealthMonitor` tracks client health statuses by processing events such
365/// as connections, disconnections, and connection losses. It periodically
366/// checks client states and sends health reports using the configured
367/// [`HealthReportSender`].
368#[derive(Debug)]
369pub struct HealthMonitor<T: HealthReportSender> {
370    sender: T,
371    event_rx: broadcast::Receiver<Event>,
372    report: HealthReport,
373    max_client_events: usize,
374    connection_loss_limit: usize,
375    check_interval: Duration,
376    reminder_interval: Duration,
377    offline_grace_period: Duration,
378    connection_loss_window: Duration,
379    recovery_grace_period: Duration,
380    #[cfg(test)]
381    pub test_notify_event_processed: std::sync::Arc<tokio::sync::Notify>,
382}
383
384impl<T: HealthReportSender> HealthMonitor<T> {
385    /// Starts the health monitoring event loop.
386    ///
387    /// This async method continuously listens for incoming events, periodic checks,
388    /// and reminders. It processes client health updates based on received events
389    /// and predefined timing intervals.
390    ///
391    /// The loop will run until the event stream is closed or an exit condition is
392    /// met.
393    ///
394    /// # Behavior
395    ///
396    /// - Sends the initial health report immediately upon start.
397    /// - Periodically checks client health status every `check_interval`.
398    /// - Sends reminder reports every `reminder_interval` if any clients are not
399    ///   healthy.
400    /// - Processes events received from the event receiver to update client states.
401    pub async fn listen(&mut self) {
402        let _ = self.sender.send(&self.report, false).await;
403        let mut check_interval = time::interval(self.check_interval);
404        let mut reminder_interval = time::interval(self.reminder_interval);
405
406        check_interval.tick().await;
407        reminder_interval.tick().await;
408
409        loop {
410            let should_continue = select! {
411                _ = check_interval.tick() => {
412                    self.check_clients().await;
413                    true
414                }
415                _ = reminder_interval.tick() => {
416                    self.send_reminder().await;
417                    true
418                }
419                result = self.event_rx.recv() => {
420                    self.handle_event_recv(result).await
421                }
422            };
423
424            if !should_continue {
425                break;
426            }
427        }
428    }
429
430    async fn check_clients(&mut self) {
431        let now = Utc::now();
432        let offline_cutoff = now - self.offline_grace_period;
433        let connection_loss_cutoff = now - self.connection_loss_window;
434        let recovery_cutoff = now - self.recovery_grace_period;
435
436        let mut silent_updates: usize = 0;
437        let mut alert_updates: usize = 0;
438
439        for client in self.report.clients.values_mut() {
440            if matches!(client.health, Health::Offline) {
441                continue;
442            }
443
444            let mut recent_events: Vec<_> = client.events.iter().take(2).collect();
445            recent_events.sort_by(|a, b| b.conn_id.cmp(&a.conn_id));
446
447            let Some(latest_event) = recent_events.first() else {
448                continue;
449            };
450
451            match (client.health, latest_event.kind) {
452                (Health::Healthy | Health::Unstable, ClientEventKind::ConnectionLoss)
453                    if latest_event.timestamp <= offline_cutoff =>
454                {
455                    client.health = Health::Offline;
456                    client.since = now;
457                    alert_updates += 1;
458                }
459                (Health::Unstable, ClientEventKind::Connect)
460                    if latest_event.timestamp <= recovery_cutoff =>
461                {
462                    client.health = Health::Healthy;
463                    client.since = now;
464                    silent_updates += 1;
465                }
466                (Health::Healthy, _) => {
467                    let mut connection_loss_count: usize = 0;
468
469                    for event in client.events.iter() {
470                        if event.timestamp < connection_loss_cutoff {
471                            break;
472                        }
473
474                        if matches!(event.kind, ClientEventKind::ConnectionLoss) {
475                            connection_loss_count += 1;
476                        }
477                    }
478
479                    if connection_loss_count >= self.connection_loss_limit {
480                        client.health = Health::Unstable;
481                        client.since = now;
482                        alert_updates += 1;
483                    }
484                }
485                (_, _) => {}
486            }
487        }
488
489        if alert_updates > 0 {
490            self.send_report(false).await;
491        } else if silent_updates > 0 {
492            self.send_report(true).await;
493        }
494    }
495
496    async fn send_reminder(&mut self) {
497        if self
498            .report
499            .clients
500            .values()
501            .any(|client| !matches!(client.health, Health::Healthy))
502        {
503            self.send_report(false).await;
504        }
505    }
506
507    async fn handle_event_recv(&mut self, result: Result<Event, RecvError>) -> bool {
508        match result {
509            Ok(Event::ClientConnect {
510                client_id,
511                local_ip,
512                conn_id,
513            }) => {
514                let client = self.report.clients.entry(client_id).or_default();
515                client.local_ip = Some(local_ip);
516                let mut should_send_report = false;
517
518                if matches!(client.health, Health::Offline) {
519                    client.health = Health::Healthy;
520                    client.since = Utc::now();
521                    should_send_report = true;
522                }
523
524                client.events.push_front(ClientEvent {
525                    kind: ClientEventKind::Connect,
526                    conn_id,
527                    timestamp: Utc::now(),
528                });
529                println!("connect");
530                if client.events.len() > self.max_client_events {
531                    client.events.pop_back();
532                }
533
534                if should_send_report {
535                    self.send_report(true).await;
536                }
537
538                #[cfg(test)]
539                self.test_notify_event_processed.notify_one();
540
541                true
542            }
543            Ok(Event::ClientDisconnect { client_id, conn_id }) => {
544                let client = self.report.clients.entry(client_id).or_default();
545                client.health = Health::Offline;
546                client.since = Utc::now();
547                client.events.push_front(ClientEvent {
548                    kind: ClientEventKind::Disconnect,
549                    conn_id,
550                    timestamp: Utc::now(),
551                });
552
553                if client.events.len() > self.max_client_events {
554                    client.events.pop_back();
555                }
556
557                #[cfg(test)]
558                self.test_notify_event_processed.notify_one();
559
560                self.send_report(true).await;
561                true
562            }
563            Ok(Event::ClientConnectionLoss { client_id, conn_id }) => {
564                let client = self.report.clients.entry(client_id).or_default();
565                client.events.push_front(ClientEvent {
566                    kind: ClientEventKind::ConnectionLoss,
567                    conn_id,
568                    timestamp: Utc::now(),
569                });
570
571                if client.events.len() > self.max_client_events {
572                    client.events.pop_back();
573                }
574
575                #[cfg(test)]
576                self.test_notify_event_processed.notify_one();
577
578                true
579            }
580            Ok(_) => true,
581            Err(RecvError::Lagged(n)) => {
582                warn!("HealthMonitor lagged by {n} messages, some events were missed");
583                true
584            }
585            Err(RecvError::Closed) => {
586                debug!("HealthMonitor event stream closed, exiting event loop");
587                false
588            }
589        }
590    }
591
592    async fn send_report(&mut self, silent: bool) {
593        if let Err(err) = self.sender.send(&self.report, silent).await {
594            warn!("Failed to send health report: {:?}", err);
595        }
596    }
597}
598
599#[cfg(feature = "tokio-graceful-shutdown")]
600#[derive(Debug, Error)]
601pub enum NeverError {}
602
603#[cfg(feature = "tokio-graceful-shutdown")]
604impl<T: HealthReportSender + Send + Sync + 'static> IntoSubsystem<NeverError> for HealthMonitor<T> {
605    async fn run(mut self, subsys: &mut SubsystemHandle) -> Result<(), NeverError> {
606        let _ = self.listen().cancel_on_shutdown(subsys).await;
607
608        Ok(())
609    }
610}
611
612#[cfg(test)]
613mod tests {
614    use super::*;
615    use crate::event::Event;
616    use ntest::timeout;
617    use std::convert::Infallible;
618    use std::sync::{Arc, Mutex};
619    use std::time::Duration;
620    use tokio::sync::broadcast;
621    use tokio::time::advance;
622
623    #[derive(Default, Clone)]
624    struct MockSender {
625        last: Arc<Mutex<Option<(HealthReport, bool)>>>,
626    }
627
628    impl HealthReportSender for MockSender {
629        type Error = Infallible;
630
631        async fn send(&mut self, report: &HealthReport, silent: bool) -> Result<(), Self::Error> {
632            let mut last = self.last.lock().unwrap();
633            *last = Some((report.clone(), silent));
634            Ok(())
635        }
636    }
637
638    async fn wait_for_report(sender: &MockSender) -> (HealthReport, bool) {
639        loop {
640            if let Some(report) = sender.last.lock().unwrap().take() {
641                return report;
642            }
643
644            tokio::task::yield_now().await;
645        }
646    }
647
648    #[tokio::test(start_paused = true)]
649    #[timeout(1000)]
650    async fn sends_initial_report_on_start() {
651        let (_tx, rx) = broadcast::channel(16);
652        let sender = MockSender::default();
653
654        let mut monitor = HealthMonitorBuilder::new()
655            .sender(sender.clone())
656            .event_rx(rx)
657            .client_ids(Default::default())
658            .build()
659            .unwrap();
660
661        let handle = tokio::spawn(async move {
662            monitor.listen().await;
663        });
664
665        let (_, silent) = wait_for_report(&sender).await;
666        assert!(!silent, "Initial report should not be silent");
667
668        drop(handle);
669    }
670
671    #[tokio::test(start_paused = true)]
672    #[timeout(1000)]
673    async fn client_connect_sets_healthy() {
674        let (tx, rx) = broadcast::channel(16);
675        let sender = MockSender::default();
676
677        let mut monitor = HealthMonitorBuilder::new()
678            .sender(sender.clone())
679            .event_rx(rx)
680            .client_ids(["client1".to_string()].into_iter().collect())
681            .build()
682            .unwrap();
683        let notify = monitor.test_notify_event_processed.clone();
684
685        let handle = tokio::spawn(async move {
686            monitor.listen().await;
687        });
688        wait_for_report(&sender).await;
689
690        tx.send(Event::ClientConnect {
691            client_id: "client1".to_string(),
692            local_ip: "127.0.0.1".parse().unwrap(),
693            conn_id: 1,
694        })
695        .unwrap();
696        notify.notified().await;
697
698        let (report, _) = wait_for_report(&sender).await;
699        assert_eq!(report.clients["client1"].health, Health::Healthy);
700
701        drop(tx);
702        handle.await.unwrap();
703    }
704
705    #[tokio::test(start_paused = true)]
706    #[timeout(1000)]
707    async fn client_disconnect_sets_offline_immediately() {
708        let (tx, rx) = broadcast::channel(16);
709        let sender = MockSender::default();
710
711        let mut monitor = HealthMonitorBuilder::new()
712            .sender(sender.clone())
713            .event_rx(rx)
714            .client_ids(["client1".to_string()].into_iter().collect())
715            .build()
716            .unwrap();
717        let notify = monitor.test_notify_event_processed.clone();
718
719        let handle = tokio::spawn(async move {
720            monitor.listen().await;
721        });
722        wait_for_report(&sender).await;
723
724        tx.send(Event::ClientConnect {
725            client_id: "client1".to_string(),
726            local_ip: "127.0.0.1".parse().unwrap(),
727            conn_id: 1,
728        })
729        .unwrap();
730        notify.notified().await;
731        wait_for_report(&sender).await;
732
733        tx.send(Event::ClientDisconnect {
734            client_id: "client1".to_string(),
735            conn_id: 1,
736        })
737        .unwrap();
738        notify.notified().await;
739
740        let (report, _) = wait_for_report(&sender).await;
741        assert_eq!(report.clients["client1"].health, Health::Offline);
742
743        drop(tx);
744        handle.await.unwrap();
745    }
746
747    #[tokio::test(start_paused = true)]
748    #[timeout(1000)]
749    async fn client_connection_loss_triggers_unstable_then_offline() {
750        let (tx, rx) = broadcast::channel(16);
751        let sender = MockSender::default();
752
753        let mut monitor = HealthMonitorBuilder::new()
754            .sender(sender.clone())
755            .event_rx(rx)
756            .client_ids(["client1".to_string()].into_iter().collect())
757            .build()
758            .unwrap();
759        let notify = monitor.test_notify_event_processed.clone();
760
761        let handle = tokio::spawn(async move {
762            monitor.listen().await;
763        });
764        wait_for_report(&sender).await;
765
766        tx.send(Event::ClientConnect {
767            client_id: "client1".to_string(),
768            local_ip: "127.0.0.1".parse().unwrap(),
769            conn_id: 1,
770        })
771        .unwrap();
772        notify.notified().await;
773        wait_for_report(&sender).await;
774
775        for i in 1..=3 {
776            tx.send(Event::ClientConnectionLoss {
777                client_id: "client1".to_string(),
778                conn_id: i,
779            })
780            .unwrap();
781            notify.notified().await;
782        }
783
784        advance(Duration::from_secs(30)).await;
785        let (report, _) = wait_for_report(&sender).await;
786        assert_eq!(report.clients["client1"].health, Health::Unstable);
787
788        advance(Duration::from_secs(120)).await;
789        let (report, _) = wait_for_report(&sender).await;
790        assert_eq!(report.clients["client1"].health, Health::Offline);
791
792        drop(tx);
793        handle.await.unwrap();
794    }
795
796    #[tokio::test(start_paused = true)]
797    #[timeout(1000)]
798    async fn client_recovers_from_unstable_after_recovery_grace_period() {
799        let (tx, rx) = broadcast::channel(16);
800        let sender = MockSender::default();
801
802        let mut monitor = HealthMonitorBuilder::new()
803            .sender(sender.clone())
804            .event_rx(rx)
805            .client_ids(["client1".to_string()].into_iter().collect())
806            .connection_loss_limit(1)
807            .build()
808            .unwrap();
809        let notify = monitor.test_notify_event_processed.clone();
810
811        let handle = tokio::spawn(async move {
812            monitor.listen().await;
813        });
814        wait_for_report(&sender).await;
815
816        tx.send(Event::ClientConnect {
817            client_id: "client1".to_string(),
818            local_ip: "127.0.0.1".parse().unwrap(),
819            conn_id: 1,
820        })
821        .unwrap();
822        notify.notified().await;
823        wait_for_report(&sender).await;
824
825        tx.send(Event::ClientConnectionLoss {
826            client_id: "client1".to_string(),
827            conn_id: 1,
828        })
829        .unwrap();
830        notify.notified().await;
831        advance(Duration::from_secs(30)).await;
832        let (report, _) = wait_for_report(&sender).await;
833        assert_eq!(report.clients["client1"].health, Health::Unstable);
834
835        tx.send(Event::ClientConnect {
836            client_id: "client1".to_string(),
837            local_ip: "127.0.0.1".parse().unwrap(),
838            conn_id: 2,
839        })
840        .unwrap();
841        notify.notified().await;
842
843        advance(Duration::from_secs(1199)).await;
844        assert!(sender.last.lock().unwrap().is_none());
845
846        advance(Duration::from_secs(1)).await;
847        let (report, _) = wait_for_report(&sender).await;
848        assert_eq!(report.clients["client1"].health, Health::Healthy);
849
850        drop(tx);
851        handle.await.unwrap();
852    }
853
854    #[tokio::test(start_paused = true)]
855    #[timeout(1000)]
856    async fn sends_reminder_reports_if_any_unhealthy() {
857        let (tx, rx) = broadcast::channel(16);
858        let sender = MockSender::default();
859
860        let mut monitor = HealthMonitorBuilder::new()
861            .sender(sender.clone())
862            .event_rx(rx)
863            .client_ids(["client1".to_string()].into_iter().collect())
864            .connection_loss_limit(1)
865            .build()
866            .unwrap();
867        let notify = monitor.test_notify_event_processed.clone();
868
869        let handle = tokio::spawn(async move {
870            monitor.listen().await;
871        });
872        wait_for_report(&sender).await;
873
874        tx.send(Event::ClientConnect {
875            client_id: "client1".to_string(),
876            local_ip: "127.0.0.1".parse().unwrap(),
877            conn_id: 1,
878        })
879        .unwrap();
880        notify.notified().await;
881        wait_for_report(&sender).await;
882
883        tx.send(Event::ClientConnectionLoss {
884            client_id: "client1".to_string(),
885            conn_id: 1,
886        })
887        .unwrap();
888        notify.notified().await;
889        advance(Duration::from_secs(30)).await;
890        let (report, _) = wait_for_report(&sender).await;
891        assert_eq!(report.clients["client1"].health, Health::Unstable);
892
893        advance(Duration::from_secs(1800)).await;
894        let (report, _) = wait_for_report(&sender).await;
895        assert_ne!(report.clients["client1"].health, Health::Healthy);
896
897        drop(tx);
898        handle.await.unwrap();
899    }
900
901    #[tokio::test(start_paused = true)]
902    #[timeout(1000)]
903    async fn client_goes_offline_after_grace_period() {
904        let (tx, rx) = broadcast::channel(16);
905        let sender = MockSender::default();
906
907        let mut monitor = HealthMonitorBuilder::new()
908            .sender(sender.clone())
909            .event_rx(rx)
910            .client_ids(["client1".to_string()].into_iter().collect())
911            .build()
912            .unwrap();
913        let notify = monitor.test_notify_event_processed.clone();
914
915        let handle = tokio::spawn(async move {
916            monitor.listen().await;
917        });
918        wait_for_report(&sender).await;
919
920        tx.send(Event::ClientConnect {
921            client_id: "client1".to_string(),
922            local_ip: "127.0.0.1".parse().unwrap(),
923            conn_id: 1,
924        })
925        .unwrap();
926        notify.notified().await;
927
928        let report = wait_for_report(&sender).await;
929        assert_eq!(report.0.clients["client1"].health, Health::Healthy);
930
931        tx.send(Event::ClientConnectionLoss {
932            client_id: "client1".to_string(),
933            conn_id: 1,
934        })
935        .unwrap();
936        notify.notified().await;
937
938        advance(Duration::from_secs(120)).await;
939        let report = wait_for_report(&sender).await;
940        assert_eq!(report.0.clients["client1"].health, Health::Offline);
941
942        drop(tx);
943        handle.await.unwrap();
944    }
945}